scovich commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2182702938


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,355 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+
+    /// Returns the number of rows that can be added to this decoder before it 
is full.
+    pub fn capacity(&self) -> usize {
+        self.batch_size.saturating_sub(self.decoded_rows)
+    }
+
+    /// Returns true if the decoder has reached its capacity for the current 
batch.
+    pub fn batch_is_full(&self) -> bool {
+        self.capacity() == 0
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
+        let root_field = AvroField::try_from(schema)?;
+        RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )
+    }
+
+    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
+        let header = read_header(reader)?;
+        let record_decoder = if let Some(schema) = &self.schema {
+            self.make_record_decoder(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            self.make_record_decoder(&avro_schema)?
+        };
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok((header, decoder))
+    }
+
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let (header, decoder) = self.build_impl(&mut reader)?;
+        Ok(Reader {
+            reader,
+            header,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            block_cursor: 0,
+            finished: false,
+        })
+    }
+
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        match self.schema {
+            Some(ref schema) => {
+                let record_decoder = self.make_record_decoder(schema)?;
+                Ok(Decoder::new(record_decoder, self.batch_size))
+            }
+            None => {
+                let (_, decoder) = self.build_impl(&mut reader)?;
+                Ok(decoder)
+            }
         }
     }
+}
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R: BufRead> {
+    reader: R,
+    header: Header,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    block_cursor: usize,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R: BufRead> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
 
-    let mut try_next = move || {
-        loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        'outer: while !self.finished && !self.decoder.batch_is_full() {
+            while self.block_cursor == self.block_data.len() {
+                let buf = self.reader.fill_buf()?;
+                if buf.is_empty() {
+                    self.finished = true;
+                    break 'outer;
+                }
+                // Try to decode another block from the buffered reader.
+                let consumed = self.block_decoder.decode(buf)?;
+                self.reader.consume(consumed);
+                if let Some(block) = self.block_decoder.flush() {
+                    // Successfully decoded a block.
+                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
+                        codec.decompress(&block.data)?
+                    } else {
+                        block.data
+                    };
+                    self.block_data = block_data;
+                    self.block_cursor = 0;
+                } else if consumed == 0 {
+                    // The block decoder made no progress on a non-empty 
buffer.
+                    return Err(ArrowError::ParseError(
+                        "Could not decode next Avro block from partial 
data".to_string(),
+                    ));
+                }
             }
-            let read = buf.len();
-            let decoded = decoder.decode(buf)?;
-            reader.consume(decoded);
-            if decoded != read {
-                break;
+            // Try to decode more rows from the current block.
+            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+            if consumed == 0 && self.block_cursor < self.block_data.len() {
+                self.block_cursor = self.block_data.len();
+            } else {
+                self.block_cursor += consumed;
             }
         }
-        Ok(decoder.flush())
-    };
-    std::iter::from_fn(move || try_next().transpose())
+        self.decoder.flush()
+    }
+}
+
+impl<R: BufRead> Iterator for Reader<R> {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        match self.read() {
+            Ok(Some(batch)) => Some(Ok(batch)),
+            Ok(None) => None,
+            Err(e) => Some(Err(e)),
+        }
+    }
+}
+
+impl<R: BufRead> RecordBatchReader for Reader<R> {
+    fn schema(&self) -> SchemaRef {
+        self.schema()
+    }
 }
 
 #[cfg(test)]
 mod test {
     use crate::codec::{AvroDataType, AvroField, Codec};
     use crate::compression::CompressionCodec;
     use crate::reader::record::RecordDecoder;
-    use crate::reader::{read_blocks, read_header};
+    use crate::reader::vlq::VLQDecoder;
+    use crate::reader::{read_header, Decoder, ReaderBuilder};
     use crate::test_util::arrow_test_data;
     use arrow_array::*;
-    use arrow_schema::{DataType, Field};
+    use arrow_schema::{ArrowError, DataType, Field, Schema};
+    use bytes::{Buf, BufMut, Bytes};
+    use futures::executor::block_on;
+    use futures::{stream, Stream, StreamExt, TryStreamExt};
     use std::collections::HashMap;
+    use std::fs;
     use std::fs::File;
-    use std::io::BufReader;
+    use std::io::{BufReader, Cursor, Read};
     use std::sync::Arc;
+    use std::task::{ready, Poll};
 
-    fn read_file(file: &str, batch_size: usize) -> RecordBatch {
-        read_file_with_options(file, batch_size, 
&crate::ReadOptions::default())
+    fn read_file(path: &str, batch_size: usize, utf8_view: bool) -> 
RecordBatch {
+        let file = File::open(path).unwrap();
+        let reader = ReaderBuilder::new()
+            .with_batch_size(batch_size)
+            .with_utf8_view(utf8_view)
+            .build(BufReader::new(file))
+            .unwrap();
+        let schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
+        arrow::compute::concat_batches(&schema, &batches).unwrap()
     }
 
-    fn read_file_with_options(
-        file: &str,
-        batch_size: usize,
-        options: &crate::ReadOptions,
-    ) -> RecordBatch {
-        let file = File::open(file).unwrap();
-        let mut reader = BufReader::new(file);
-        let header = read_header(&mut reader).unwrap();
-        let compression = header.compression().unwrap();
-        let schema = header.schema().unwrap().unwrap();
-        let root = AvroField::try_from(&schema).unwrap();
-
-        let mut decoder =
-            RecordDecoder::try_new_with_options(root.data_type(), 
options.clone()).unwrap();
-
-        for result in read_blocks(reader) {
-            let block = result.unwrap();
-            assert_eq!(block.sync, header.sync());
-            if let Some(c) = compression {
-                let decompressed = c.decompress(&block.data).unwrap();
-
-                let mut offset = 0;
-                let mut remaining = block.count;
-                while remaining > 0 {
-                    let to_read = remaining.max(batch_size);
-                    offset += decoder
-                        .decode(&decompressed[offset..], block.count)
-                        .unwrap();
-
-                    remaining -= to_read;
+    fn decode_stream<S: Stream<Item = Bytes> + Unpin>(

Review Comment:
   This seems like a competent async reader implementation... any reason it 
shouldn't be part of the public API?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
     pub fn avro_header(&self) -> &Header {
         &self.header
     }
-}
 
-impl<R: BufRead> Reader<R> {
     /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
     fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.finished {
-            return Ok(None);
-        }
-        loop {
-            if !self.block_data.is_empty() {
-                let consumed = self.decoder.decode(&self.block_data)?;
-                if consumed > 0 {
-                    self.block_data.drain(..consumed);
-                }
-                match self.decoder.flush()? {
-                    None => {
-                        if !self.block_data.is_empty() {
-                            break;
-                        }
-                    }
-                    Some(batch) => {
-                        return Ok(Some(batch));
-                    }
-                }
-            }
-            let maybe_block = {
+        'outer: while !self.finished && !self.decoder.batch_is_full() {
+            while self.block_cursor == self.block_data.len() {
                 let buf = self.reader.fill_buf()?;
                 if buf.is_empty() {
-                    None
-                } else {
-                    let read_len = buf.len();
-                    let consumed_len = self.block_decoder.decode(buf)?;
-                    self.reader.consume(consumed_len);
-                    if consumed_len == 0 && read_len != 0 {
-                        return Err(ArrowError::ParseError(
-                            "Could not decode next Avro block from partial 
data".to_string(),
-                        ));
-                    }
-                    self.block_decoder.flush()
+                    self.finished = true;
+                    break 'outer;
                 }
-            };
-            match maybe_block {
-                Some(block) => {
-                    let block_data = if let Some(ref codec) = self.compression 
{
+                // Try to decode another block from the buffered reader.
+                let consumed = self.block_decoder.decode(buf)?;
+                self.reader.consume(consumed);
+                if let Some(block) = self.block_decoder.flush() {
+                    // Successfully decoded a block.
+                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
                         codec.decompress(&block.data)?
                     } else {
                         block.data
                     };
                     self.block_data = block_data;
+                    self.block_cursor = 0;
+                } else if consumed == 0 {
+                    // The block decoder made no progress on a non-empty 
buffer.
+                    return Err(ArrowError::ParseError(
+                        "Could not decode next Avro block from partial 
data".to_string(),
+                    ));
                 }
-                None => {
-                    self.finished = true;
-                    if !self.block_data.is_empty() {
-                        let consumed = self.decoder.decode(&self.block_data)?;
-                        self.block_data.drain(..consumed);
-                    }
-                    return self.decoder.flush();
-                }
+            }
+            // Try to decode more rows from the current block.
+            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+            if consumed == 0 && self.block_cursor < self.block_data.len() {
+                self.block_cursor = self.block_data.len();

Review Comment:
   (part of me wonders if this might be related to the header decoding question 
on the other comment thread -- are we trying to skip the unwanted header bytes 
here, in order to reach the data of interest?)



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,355 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+
+    /// Returns the number of rows that can be added to this decoder before it 
is full.
+    pub fn capacity(&self) -> usize {
+        self.batch_size.saturating_sub(self.decoded_rows)
+    }
+
+    /// Returns true if the decoder has reached its capacity for the current 
batch.
+    pub fn batch_is_full(&self) -> bool {
+        self.capacity() == 0
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
+        let root_field = AvroField::try_from(schema)?;
+        RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )
+    }
+
+    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
+        let header = read_header(reader)?;
+        let record_decoder = if let Some(schema) = &self.schema {
+            self.make_record_decoder(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            self.make_record_decoder(&avro_schema)?
+        };
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok((header, decoder))
+    }
+
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let (header, decoder) = self.build_impl(&mut reader)?;
+        Ok(Reader {
+            reader,
+            header,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            block_cursor: 0,
+            finished: false,
+        })
+    }
+
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        match self.schema {
+            Some(ref schema) => {
+                let record_decoder = self.make_record_decoder(schema)?;
+                Ok(Decoder::new(record_decoder, self.batch_size))

Review Comment:
   Actually, a bigger question -- the same `reader` provides the header and the 
data. Does the caller have to keep track of whether the header was already 
consumed? 
   * If this method already consumed it, can they re-read it some other way 
later?
   * If this method did _not_ consume it, will that confuse the block decoder?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,355 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+
+    /// Returns the number of rows that can be added to this decoder before it 
is full.
+    pub fn capacity(&self) -> usize {
+        self.batch_size.saturating_sub(self.decoded_rows)
+    }
+
+    /// Returns true if the decoder has reached its capacity for the current 
batch.
+    pub fn batch_is_full(&self) -> bool {
+        self.capacity() == 0
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
+        let root_field = AvroField::try_from(schema)?;
+        RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )
+    }
+
+    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
+        let header = read_header(reader)?;
+        let record_decoder = if let Some(schema) = &self.schema {
+            self.make_record_decoder(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            self.make_record_decoder(&avro_schema)?
+        };
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok((header, decoder))
+    }
+
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let (header, decoder) = self.build_impl(&mut reader)?;
+        Ok(Reader {
+            reader,
+            header,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            block_cursor: 0,
+            finished: false,
+        })
+    }
+
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        match self.schema {
+            Some(ref schema) => {
+                let record_decoder = self.make_record_decoder(schema)?;
+                Ok(Decoder::new(record_decoder, self.batch_size))
+            }
+            None => {
+                let (_, decoder) = self.build_impl(&mut reader)?;
+                Ok(decoder)
+            }
         }
     }
+}
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R: BufRead> {
+    reader: R,
+    header: Header,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    block_cursor: usize,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R: BufRead> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
 
-    let mut try_next = move || {
-        loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        'outer: while !self.finished && !self.decoder.batch_is_full() {
+            while self.block_cursor == self.block_data.len() {
+                let buf = self.reader.fill_buf()?;
+                if buf.is_empty() {
+                    self.finished = true;
+                    break 'outer;
+                }
+                // Try to decode another block from the buffered reader.
+                let consumed = self.block_decoder.decode(buf)?;
+                self.reader.consume(consumed);
+                if let Some(block) = self.block_decoder.flush() {
+                    // Successfully decoded a block.
+                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
+                        codec.decompress(&block.data)?
+                    } else {
+                        block.data
+                    };
+                    self.block_data = block_data;
+                    self.block_cursor = 0;
+                } else if consumed == 0 {
+                    // The block decoder made no progress on a non-empty 
buffer.
+                    return Err(ArrowError::ParseError(
+                        "Could not decode next Avro block from partial 
data".to_string(),
+                    ));
+                }
             }
-            let read = buf.len();
-            let decoded = decoder.decode(buf)?;
-            reader.consume(decoded);
-            if decoded != read {
-                break;
+            // Try to decode more rows from the current block.
+            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+            if consumed == 0 && self.block_cursor < self.block_data.len() {
+                self.block_cursor = self.block_data.len();
+            } else {
+                self.block_cursor += consumed;
             }
         }
-        Ok(decoder.flush())
-    };
-    std::iter::from_fn(move || try_next().transpose())
+        self.decoder.flush()
+    }
+}
+
+impl<R: BufRead> Iterator for Reader<R> {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        match self.read() {
+            Ok(Some(batch)) => Some(Ok(batch)),
+            Ok(None) => None,
+            Err(e) => Some(Err(e)),
+        }
+    }
+}
+
+impl<R: BufRead> RecordBatchReader for Reader<R> {
+    fn schema(&self) -> SchemaRef {
+        self.schema()
+    }
 }
 
 #[cfg(test)]
 mod test {
     use crate::codec::{AvroDataType, AvroField, Codec};
     use crate::compression::CompressionCodec;
     use crate::reader::record::RecordDecoder;
-    use crate::reader::{read_blocks, read_header};
+    use crate::reader::vlq::VLQDecoder;
+    use crate::reader::{read_header, Decoder, ReaderBuilder};
     use crate::test_util::arrow_test_data;
     use arrow_array::*;
-    use arrow_schema::{DataType, Field};
+    use arrow_schema::{ArrowError, DataType, Field, Schema};
+    use bytes::{Buf, BufMut, Bytes};
+    use futures::executor::block_on;
+    use futures::{stream, Stream, StreamExt, TryStreamExt};
     use std::collections::HashMap;
+    use std::fs;
     use std::fs::File;
-    use std::io::BufReader;
+    use std::io::{BufReader, Cursor, Read};
     use std::sync::Arc;
+    use std::task::{ready, Poll};
 
-    fn read_file(file: &str, batch_size: usize) -> RecordBatch {
-        read_file_with_options(file, batch_size, 
&crate::ReadOptions::default())
+    fn read_file(path: &str, batch_size: usize, utf8_view: bool) -> 
RecordBatch {
+        let file = File::open(path).unwrap();
+        let reader = ReaderBuilder::new()
+            .with_batch_size(batch_size)
+            .with_utf8_view(utf8_view)
+            .build(BufReader::new(file))
+            .unwrap();
+        let schema = reader.schema();
+        let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
+        arrow::compute::concat_batches(&schema, &batches).unwrap()
     }
 
-    fn read_file_with_options(
-        file: &str,
-        batch_size: usize,
-        options: &crate::ReadOptions,
-    ) -> RecordBatch {
-        let file = File::open(file).unwrap();
-        let mut reader = BufReader::new(file);
-        let header = read_header(&mut reader).unwrap();
-        let compression = header.compression().unwrap();
-        let schema = header.schema().unwrap().unwrap();
-        let root = AvroField::try_from(&schema).unwrap();
-
-        let mut decoder =
-            RecordDecoder::try_new_with_options(root.data_type(), 
options.clone()).unwrap();
-
-        for result in read_blocks(reader) {
-            let block = result.unwrap();
-            assert_eq!(block.sync, header.sync());
-            if let Some(c) = compression {
-                let decompressed = c.decompress(&block.data).unwrap();
-
-                let mut offset = 0;
-                let mut remaining = block.count;
-                while remaining > 0 {
-                    let to_read = remaining.max(batch_size);
-                    offset += decoder
-                        .decode(&decompressed[offset..], block.count)
-                        .unwrap();
-
-                    remaining -= to_read;
+    fn decode_stream<S: Stream<Item = Bytes> + Unpin>(

Review Comment:
   Ah -- because not everyone wants to use the `futures` crate



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
     pub fn avro_header(&self) -> &Header {
         &self.header
     }
-}
 
-impl<R: BufRead> Reader<R> {
     /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
     fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.finished {
-            return Ok(None);
-        }
-        loop {
-            if !self.block_data.is_empty() {
-                let consumed = self.decoder.decode(&self.block_data)?;
-                if consumed > 0 {
-                    self.block_data.drain(..consumed);
-                }
-                match self.decoder.flush()? {
-                    None => {
-                        if !self.block_data.is_empty() {
-                            break;
-                        }
-                    }
-                    Some(batch) => {
-                        return Ok(Some(batch));
-                    }
-                }
-            }
-            let maybe_block = {
+        'outer: while !self.finished && !self.decoder.batch_is_full() {
+            while self.block_cursor == self.block_data.len() {
                 let buf = self.reader.fill_buf()?;
                 if buf.is_empty() {
-                    None
-                } else {
-                    let read_len = buf.len();
-                    let consumed_len = self.block_decoder.decode(buf)?;
-                    self.reader.consume(consumed_len);
-                    if consumed_len == 0 && read_len != 0 {
-                        return Err(ArrowError::ParseError(
-                            "Could not decode next Avro block from partial 
data".to_string(),
-                        ));
-                    }
-                    self.block_decoder.flush()
+                    self.finished = true;
+                    break 'outer;
                 }
-            };
-            match maybe_block {
-                Some(block) => {
-                    let block_data = if let Some(ref codec) = self.compression 
{
+                // Try to decode another block from the buffered reader.
+                let consumed = self.block_decoder.decode(buf)?;
+                self.reader.consume(consumed);
+                if let Some(block) = self.block_decoder.flush() {
+                    // Successfully decoded a block.
+                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
                         codec.decompress(&block.data)?
                     } else {
                         block.data
                     };
                     self.block_data = block_data;
+                    self.block_cursor = 0;
+                } else if consumed == 0 {
+                    // The block decoder made no progress on a non-empty 
buffer.
+                    return Err(ArrowError::ParseError(
+                        "Could not decode next Avro block from partial 
data".to_string(),
+                    ));
                 }
-                None => {
-                    self.finished = true;
-                    if !self.block_data.is_empty() {
-                        let consumed = self.decoder.decode(&self.block_data)?;
-                        self.block_data.drain(..consumed);
-                    }
-                    return self.decoder.flush();
-                }
+            }
+            // Try to decode more rows from the current block.
+            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+            if consumed == 0 && self.block_cursor < self.block_data.len() {
+                self.block_cursor = self.block_data.len();

Review Comment:
   This doesn't seem quite right? If we failed to make progress on a non-empty 
block and a non-full batch, wouldn't that indicate something is wrong, similar 
to L359 above? Why would it be correct to just skip the rest of the block and 
try to keep going?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,355 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+
+    /// Returns the number of rows that can be added to this decoder before it 
is full.
+    pub fn capacity(&self) -> usize {
+        self.batch_size.saturating_sub(self.decoded_rows)
+    }
+
+    /// Returns true if the decoder has reached its capacity for the current 
batch.
+    pub fn batch_is_full(&self) -> bool {
+        self.capacity() == 0
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
+        let root_field = AvroField::try_from(schema)?;
+        RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )
+    }
+
+    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
+        let header = read_header(reader)?;
+        let record_decoder = if let Some(schema) = &self.schema {
+            self.make_record_decoder(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            self.make_record_decoder(&avro_schema)?
+        };
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok((header, decoder))
+    }
+
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let (header, decoder) = self.build_impl(&mut reader)?;
+        Ok(Reader {
+            reader,
+            header,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            block_cursor: 0,
+            finished: false,
+        })
+    }
+
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        match self.schema {
+            Some(ref schema) => {
+                let record_decoder = self.make_record_decoder(schema)?;
+                Ok(Decoder::new(record_decoder, self.batch_size))

Review Comment:
   Coming from the parquet world, I'm a bit surprised the decoder doesn't need 
access to the header? 
   But looking at this code, the header only seems to provide schema and 
compression info? 
   And the decoder leaves compression to the reader for some reason?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to