scovich commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2180547334


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {
+            let root_field = AvroField::try_from(&schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        } else {
+            let header = read_header(&mut reader)?;
+            let avro_schema = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                })?;
+            let root_field = AvroField::try_from(&avro_schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        };
+        Ok(Decoder::new(record_decoder, self.batch_size))
+    }
+}
+
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R> {
+    reader: R,
+    header: Header,
+    compression: Option<crate::compression::CompressionCodec>,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+}
 
-    let mut try_next = move || {
+impl<R: BufRead> Reader<R> {
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.finished {
+            return Ok(None);
+        }
         loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+            if !self.block_data.is_empty() {
+                let consumed = self.decoder.decode(&self.block_data)?;
+                if consumed > 0 {
+                    self.block_data.drain(..consumed);

Review Comment:
   This shifts remaining bytes into the gap; is there a way to avoid that cost?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {
+            let root_field = AvroField::try_from(&schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        } else {
+            let header = read_header(&mut reader)?;
+            let avro_schema = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                })?;
+            let root_field = AvroField::try_from(&avro_schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        };
+        Ok(Decoder::new(record_decoder, self.batch_size))
+    }
+}
+
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R> {
+    reader: R,
+    header: Header,
+    compression: Option<crate::compression::CompressionCodec>,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+}
 
-    let mut try_next = move || {
+impl<R: BufRead> Reader<R> {
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.finished {
+            return Ok(None);
+        }
         loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+            if !self.block_data.is_empty() {
+                let consumed = self.decoder.decode(&self.block_data)?;
+                if consumed > 0 {
+                    self.block_data.drain(..consumed);
+                }
+                match self.decoder.flush()? {
+                    None => {
+                        if !self.block_data.is_empty() {
+                            break;
+                        }
+                    }
+                    Some(batch) => {
+                        return Ok(Some(batch));
+                    }
+                }
             }
-            let read = buf.len();
-            let decoded = decoder.decode(buf)?;
-            reader.consume(decoded);
-            if decoded != read {
-                break;
+            let maybe_block = {
+                let buf = self.reader.fill_buf()?;
+                if buf.is_empty() {
+                    None
+                } else {
+                    let read_len = buf.len();
+                    let consumed_len = self.block_decoder.decode(buf)?;
+                    self.reader.consume(consumed_len);
+                    if consumed_len == 0 && read_len != 0 {
+                        return Err(ArrowError::ParseError(
+                            "Could not decode next Avro block from partial 
data".to_string(),
+                        ));
+                    }
+                    self.block_decoder.flush()
+                }
+            };
+            match maybe_block {
+                Some(block) => {

Review Comment:
   This seems like a good spot for:
   ```rust
   let Some(block) = maybe_block else {
       self.finished = true;
         ...
       break;
   }
   ```
   (breaking the loop anyway causes the function to return the result of 
`self.decoder.flush()`)



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {
+            let root_field = AvroField::try_from(&schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        } else {
+            let header = read_header(&mut reader)?;
+            let avro_schema = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                })?;
+            let root_field = AvroField::try_from(&avro_schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        };
+        Ok(Decoder::new(record_decoder, self.batch_size))
+    }
+}
+
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R> {
+    reader: R,
+    header: Header,
+    compression: Option<crate::compression::CompressionCodec>,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+}
 
-    let mut try_next = move || {
+impl<R: BufRead> Reader<R> {
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.finished {
+            return Ok(None);
+        }
         loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+            if !self.block_data.is_empty() {
+                let consumed = self.decoder.decode(&self.block_data)?;
+                if consumed > 0 {
+                    self.block_data.drain(..consumed);
+                }
+                match self.decoder.flush()? {
+                    None => {
+                        if !self.block_data.is_empty() {
+                            break;
+                        }
+                    }
+                    Some(batch) => {
+                        return Ok(Some(batch));
+                    }
+                }
             }
-            let read = buf.len();
-            let decoded = decoder.decode(buf)?;
-            reader.consume(decoded);
-            if decoded != read {
-                break;
+            let maybe_block = {
+                let buf = self.reader.fill_buf()?;
+                if buf.is_empty() {
+                    None
+                } else {
+                    let read_len = buf.len();
+                    let consumed_len = self.block_decoder.decode(buf)?;
+                    self.reader.consume(consumed_len);
+                    if consumed_len == 0 && read_len != 0 {

Review Comment:
   Doesn't L359 guarantee that read_len is non-zero?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {
+            let root_field = AvroField::try_from(&schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        } else {
+            let header = read_header(&mut reader)?;
+            let avro_schema = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                })?;
+            let root_field = AvroField::try_from(&avro_schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        };
+        Ok(Decoder::new(record_decoder, self.batch_size))
+    }
+}
+
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R> {
+    reader: R,
+    header: Header,
+    compression: Option<crate::compression::CompressionCodec>,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+}
 
-    let mut try_next = move || {
+impl<R: BufRead> Reader<R> {
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.finished {
+            return Ok(None);
+        }
         loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+            if !self.block_data.is_empty() {
+                let consumed = self.decoder.decode(&self.block_data)?;
+                if consumed > 0 {
+                    self.block_data.drain(..consumed);
+                }
+                match self.decoder.flush()? {
+                    None => {
+                        if !self.block_data.is_empty() {

Review Comment:
   Under what circumstances could `flush` return None (= no rows available) and 
yet there are still bytes available to decode? And why would we want to break 
out of the loop in order to immediately call `flush` a second time? Shouldn't 
we rather try to `decode` more bytes?
   
   



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {

Review Comment:
   If I'm not mistaken, we could eliminate a lot of duplicated code by defining 
a private `build_impl` method that returns both header and decoder:
   ```rust
   fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), 
ArrowError> {
   ```
   ... and then `build_decoder` simplifies to just:
   ```rust
   let (_, decoder) = self.build_impl(&mut reader)?;
   decoder
   ```
   
   while `build` simplifies to just:
   ```rust
   let (header, decoder) = self.build_impl(&mut reader)?
   let compression = header.compression()?;
   Ok(Reader { ... })
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {
+            let root_field = AvroField::try_from(&schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        } else {
+            let header = read_header(&mut reader)?;
+            let avro_schema = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                })?;
+            let root_field = AvroField::try_from(&avro_schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        };
+        Ok(Decoder::new(record_decoder, self.batch_size))
+    }
+}
+
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R> {
+    reader: R,
+    header: Header,
+    compression: Option<crate::compression::CompressionCodec>,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+}
 
-    let mut try_next = move || {
+impl<R: BufRead> Reader<R> {
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.finished {
+            return Ok(None);
+        }
         loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+            if !self.block_data.is_empty() {
+                let consumed = self.decoder.decode(&self.block_data)?;
+                if consumed > 0 {
+                    self.block_data.drain(..consumed);
+                }
+                match self.decoder.flush()? {

Review Comment:
   Actually... no bytes consumed guarantees the batch is complete (but it also 
means we paid to fetch a block we didn't need yet).
   
   But that means we have a problem -- this "tentative" `flush` call would have 
the side effect of producing large numbers of tiny batches, no? It seems like 
we need a way to detect the batch is complete without the side effects of 
calling `flush`. For example, the CSV reader provides a (strangely named IMO) 
`capacity` method for this purpose. The JSON reader doesn't try to optimize for 
this corner case at all.



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {
+            let root_field = AvroField::try_from(&schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        } else {
+            let header = read_header(&mut reader)?;
+            let avro_schema = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                })?;
+            let root_field = AvroField::try_from(&avro_schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        };
+        Ok(Decoder::new(record_decoder, self.batch_size))
+    }
+}
+
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R> {
+    reader: R,
+    header: Header,
+    compression: Option<crate::compression::CompressionCodec>,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+}
 
-    let mut try_next = move || {
+impl<R: BufRead> Reader<R> {
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.finished {
+            return Ok(None);
+        }
         loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+            if !self.block_data.is_empty() {
+                let consumed = self.decoder.decode(&self.block_data)?;
+                if consumed > 0 {
+                    self.block_data.drain(..consumed);
+                }
+                match self.decoder.flush()? {

Review Comment:
   Seems odd to flush even if no bytes were consumed?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {
+            let root_field = AvroField::try_from(&schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        } else {
+            let header = read_header(&mut reader)?;
+            let avro_schema = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                })?;
+            let root_field = AvroField::try_from(&avro_schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        };
+        Ok(Decoder::new(record_decoder, self.batch_size))
+    }
+}
+
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R> {
+    reader: R,
+    header: Header,
+    compression: Option<crate::compression::CompressionCodec>,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R> Reader<R> {
+    /// Return the Arrow schema discovered from the Avro file header
+    pub fn schema(&self) -> SchemaRef {
+        self.decoder.schema()
+    }
+
+    /// Return the Avro container-file header
+    pub fn avro_header(&self) -> &Header {
+        &self.header
+    }
+}
 
-    let mut try_next = move || {
+impl<R: BufRead> Reader<R> {
+    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.finished {
+            return Ok(None);
+        }
         loop {
-            let buf = reader.fill_buf()?;
-            if buf.is_empty() {
-                break;
+            if !self.block_data.is_empty() {
+                let consumed = self.decoder.decode(&self.block_data)?;
+                if consumed > 0 {
+                    self.block_data.drain(..consumed);
+                }
+                match self.decoder.flush()? {
+                    None => {
+                        if !self.block_data.is_empty() {

Review Comment:
   I'm having trouble following the state machine here, but I almost wonder if 
the control flow could be expressed more cleanly like this?
   ```rust
   'outer: while !self.finished && !self.decoder.batch_is_full() {
       // make sure we have a block to work with
       while self.block_cursor == self.block_data.len() {
           let buf = self.reader.fill_buf()?;
           if buf.is_empty() {
               // EOF... hopefully the batch was complete (flush will decide)
               self.finished = true;
               break 'outer;
           }
   
           // Try to decode another block, with three possible outcomes:
           // 1. We successfully decode a block => inner loop exits
           // 2. We consume at least one byte => inner loop continues (fetch 
more data)
           // 3. We fail to consume any bytes => return an error
           let consumed = self.block_decoder.decode(buf)?;
           self.reader.consume(consumed);
           if let Some(block) = self.block_decoder.flush() {
               let block_data = if let Some(ref codec) = self.compression {
                   codec.decompress(&block.data)?
               } else {
                   block.data
               };
               self.block_data = block_data;
               self.block_cursor = 0;
           } else if consumed == 0 {
               return Err(ArrowError::ParseError(
                   "Could not decode next Avro block from partial 
data".to_string(),
               ));
           }
       }
       
       // Try to decode more rows from the block, with three possible outcomes:
       // 1. Block empty, incomplete batch => loop continues
       //    * Next loop iteration will fetch a new block
       // 2. Block empty, complete batch => loop exits (flush)
       //    * Next read will fetch a new block
       // 3. Block non-empty => batch must be complete => loop exits (flush)
       //    * Next read will keep consuming from this same block
       let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
       self.block_cursor += consumed;
   }
   
   self.decoder.flush()
   ```
   



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {
+            let root_field = AvroField::try_from(&schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        } else {
+            let header = read_header(&mut reader)?;
+            let avro_schema = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                })?;
+            let root_field = AvroField::try_from(&avro_schema)?;
+            RecordDecoder::try_new_with_options(
+                root_field.data_type(),
+                self.utf8_view,
+                self.strict_mode,
+            )?
+        };
+        Ok(Decoder::new(record_decoder, self.batch_size))
+    }
+}
+
+/// A high-level Avro `Reader` that reads container-file blocks
+/// and feeds them into a row-level [`Decoder`].
+#[derive(Debug)]
+pub struct Reader<R> {
+    reader: R,
+    header: Header,
+    compression: Option<crate::compression::CompressionCodec>,
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    block_data: Vec<u8>,
+    finished: bool,
 }
 
-/// Return an iterator of [`Block`] from the provided [`BufRead`]
-fn read_blocks<R: BufRead>(mut reader: R) -> impl Iterator<Item = 
Result<Block, ArrowError>> {
-    let mut decoder = BlockDecoder::default();
+impl<R> Reader<R> {

Review Comment:
   I understand that the two methods defined in this impl block don't depend on 
the structure of `R`... but is there any benefit to separating them out like 
this? It's impossible to do anything useful with a `Reader` unless `R: BufRead`?
   
   I guess the real question would be: Why do we not require`Reader<R: 
BufRead>`?



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,373 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
-        }
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let header = read_header(&mut reader)?;
+        let compression = header.compression()?;
+        let root_field = if let Some(schema) = &self.schema {
+            AvroField::try_from(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            AvroField::try_from(&avro_schema)?
+        };
+        let record_decoder = RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )?;
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok(Reader {
+            reader,
+            header,
+            compression,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            finished: false,
+        })
     }
 
-    decoder
-        .flush()
-        .ok_or_else(|| ArrowError::ParseError("Unexpected EOF".to_string()))
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        let record_decoder = if let Some(schema) = self.schema {

Review Comment:
   Aside: The code from `build` has less redundancy than that of 
`build_decoder`, and is probably the better starting point of the two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to