jecsand838 commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2697681278


##########
arrow-avro/src/lib.rs:
##########
@@ -123,16 +123,56 @@
 //! # Ok(()) }
 //! ```
 //!
+//! ## `async` Reading (`async` feature)
+//!
+//! The [`reader`] module provides async APIs for reading Avro files when the 
`async`
+//! feature is enabled.
+//!
+//! [`AsyncAvroFileReader`] implements `Stream<Item = Result<RecordBatch, 
ArrowError>>`,
+//! allowing efficient async streaming of record batches. When the 
`object_store` feature
+//! is enabled, [`AvroObjectReader`] provides integration with object storage 
services
+//! such as S3 via the [object_store] crate.
+//!
+//! ```ignore

Review Comment:
   We should drop the `ignore` imo.
   
   ```suggestion
   //! ```
   ```



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::DecodingBlock {
+                    mut reader,
+                    mut data,
+                } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = self.block_decoder.decode(&data)?;
+                    if consumed == 0 {
+                        // If the last block was exactly at the end of the 
file,
+                        // we're simply done reading.
+                        if data.is_empty() {
+                            let final_batch = self.decoder.flush();
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(final_batch.transpose());
+                        }
+
+                        // If we've tried the following stage before, and 
still can't decode,
+                        // this means the file is truncated or corrupted.
+                        if self.finishing_partial_block {
+                            return Poll::Ready(Some(Err(ArrowError::AvroError(
+                                "Unexpected EOF while reading last Avro 
block".into(),
+                            ))));
+                        }
+
+                        // Avro splitting case: block is incomplete, we need 
to:
+                        // 1. Parse the length so we know how much to read
+                        // 2. Fetch more data from the object store
+                        // 3. Create a new block data from the remaining slice 
and the newly fetched data
+                        // 4. Continue decoding until end of block
+                        self.finishing_partial_block = true;
+
+                        let (size, vlq_header_len) = {
+                            let mut vlq = VLQDecoder::default();
+                            let mut vlq_buf = &data[..];
+                            let original_len = vlq_buf.len();
+
+                            let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
count".into(),
+                                )
+                            })?;
+
+                            let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
size".into(),
+                                )
+                            })? as u64;
+
+                            // Calculate how many bytes were consumed by the 
two VLQ integers
+                            let header_len = 
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+                            (size, header_len as u64)
+                        };
+
+                        // Two longs: count and size have already been read, 
but using our vlq,
+                        // meaning they were not consumed.
+                        let total_block_size = size + vlq_header_len;
+                        let remaining_to_fetch =
+                            total_block_size.checked_sub(data.len() as 
u64).unwrap();

Review Comment:
   ```suggestion
                           let remaining_to_fetch =
                               total_block_size.checked_sub(data.len() as 
u64).ok_or_else(|| ArrowError::AvroError("Invalid block size: data exceeds 
expected block size".into()))?;
   ```



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::DecodingBlock {
+                    mut reader,
+                    mut data,
+                } => {

Review Comment:
   I was thinking about it and you may want to consider a decode loop similar 
to the sync `Reader::read` method's, specifically this logic:
   
   ```rust
   let consumed = self.block_decoder.decode(buf)?;
   self.reader.consume(consumed);
   if let Some(block) = self.block_decoder.flush() {
       // Block complete - use it
   } else if consumed == 0 {
       // Stuck on non-empty buffer - error
       return Err(ArrowError::ParseError(...));
   }
   // Otherwise: made progress, loop for more data
   ```
   
   From an architectural perspective the advantages would be:
   1. Always calls `flush()` after `decode()` to check for complete blocks
   2. Only errors when stuck (`consumed == 0` on non-empty buffer AND `flush() 
== None`)
   3. Trusts `BlockDecoder` to handle partial data incrementally
   
   Maybe it could resemble something like this pseduo-code?
   
   ```rust
    ReaderState::DecodingBlock { mut reader, mut data } = {
        let consumed = self.block_decoder.decode(&data)?;
        data = data.slice(consumed..);  // Equivalent to reader.consume()
   
        if let Some(block) = self.block_decoder.flush() {
            // Block complete - proceed to ReadingBatches
            let block_data = Bytes::from_owner(if let Some(ref codec) = 
self.codec {
                codec.decompress(&block.data)?
            } else {
                block.data
            });
            self.reader_state = ReaderState::ReadingBatches {
                reader, data, block_data,
                remaining_in_block: block.count,
            };
            continue;
        }
   
        // No complete block yet
        if consumed == 0 && !data.is_empty() {
            // Stuck - no progress on non-empty buffer = corrupted data
            return Poll::Ready(Some(Err(ArrowError::ParseError(
                "Could not decode next Avro block from partial data".into()
            ))));
        }
   
        if data.is_empty() {
            // Buffer exhausted, block incomplete
            if self.finishing_partial_block {
                return Poll::Ready(Some(Err(ArrowError::AvroError(
                    "Unexpected EOF while reading last Avro block".into()
                ))));
            }
            // Fetch more data (range end case) or finish
            // ... simplified fetch logic here ...
        } else {
            // Made progress but not complete - continue decoding
            self.reader_state = ReaderState::DecodingBlock { reader, data };
        }
    }
   ```



##########
arrow-avro/src/reader/async_reader/builder.rs:
##########
@@ -0,0 +1,163 @@
+use crate::codec::AvroFieldBuilder;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm};
+use arrow_schema::ArrowError;
+use indexmap::IndexMap;
+use std::ops::Range;
+
+const DEFAULT_HEADER_SIZE_HINT: u64 = 64 * 1024; // 64 KB
+
+/// Builder for an asynchronous Avro file reader.
+pub struct AsyncAvroFileReaderBuilder<R: AsyncFileReader> {
+    reader: R,
+    file_size: u64,
+    batch_size: usize,
+    range: Option<Range<u64>>,
+    reader_schema: Option<AvroSchema>,
+    header_size_hint: Option<u64>,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReaderBuilder<R> {
+    pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
+        Self {
+            reader,
+            file_size,
+            batch_size,
+            range: None,
+            reader_schema: None,
+            header_size_hint: None,
+        }
+    }
+
+    /// Specify a byte range to read from the Avro file.
+    /// If this is provided, the reader will read all the blocks within the 
specified range,
+    /// if the range ends mid-block, it will attempt to fetch the remaining 
bytes to complete the block,
+    /// but no further blocks will be read.
+    /// If this is omitted, the full file will be read.
+    pub fn with_range(self, range: Range<u64>) -> Self {
+        Self {
+            range: Some(range),
+            ..self
+        }
+    }
+
+    /// Specify a reader schema to use when reading the Avro file.
+    /// This can be useful to project specific columns or handle schema 
evolution.
+    /// If this is not provided, the schema will be derived from the Arrow 
schema provided.
+    pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
+        Self {
+            reader_schema: Some(reader_schema),
+            ..self
+        }
+    }
+
+    /// Provide a hint for the expected size of the Avro header in bytes.
+    /// This can help optimize the initial read operation when fetching the 
header.
+    pub fn with_header_size_hint(self, hint: u64) -> Self {
+        Self {
+            header_size_hint: Some(hint),
+            ..self
+        }
+    }
+
+    async fn read_header(&mut self) -> Result<(Header, u64), ArrowError> {
+        let mut decoder = HeaderDecoder::default();
+        let mut position = 0;
+        loop {
+            let range_to_fetch = position
+                ..(position + 
self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
+                    .min(self.file_size);
+            let current_data = 
self.reader.get_bytes(range_to_fetch).await.map_err(|err| {
+                ArrowError::AvroError(format!(
+                    "Error fetching Avro header from object store: {err}"
+                ))
+            })?;
+            if current_data.is_empty() {
+                break;
+            }
+            let read = current_data.len();
+            let decoded = decoder.decode(&current_data)?;
+            if decoded != read {
+                position += decoded as u64;
+                break;
+            }
+            position += read as u64;
+        }
+
+        decoder
+            .flush()
+            .map(|header| (header, position))
+            .ok_or_else(|| ArrowError::AvroError("Unexpected EOF while reading 
Avro header".into()))
+    }
+
+    /// Build the asynchronous Avro reader with the provided parameters.
+    /// This reads the header first to initialize the reader state.
+    pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, 
ArrowError> {
+        if self.file_size == 0 {
+            return Err(ArrowError::AvroError("File size cannot be 0".into()));
+        }
+
+        // Start by reading the header from the beginning of the avro file
+        let (header, header_len) = self.read_header().await?;
+        let writer_schema = header
+            .schema()
+            .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+            .ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".into())
+            })?;
+
+        let root = {
+            let field_builder = AvroFieldBuilder::new(&writer_schema);
+            if let Some(provided_schema) = self.reader_schema.as_ref() {
+                let reader_schema = provided_schema.schema()?;
+                field_builder.with_reader_schema(&reader_schema).build()
+            } else {
+                field_builder.build()
+            }
+        }?;
+
+        let record_decoder = 
RecordDecoder::try_new_with_options(root.data_type())?;
+
+        let decoder = Decoder::from_parts(
+            self.batch_size,
+            record_decoder,
+            None,
+            IndexMap::new(),
+            FingerprintAlgorithm::Rabin,
+        );
+        let range = match self.range {
+            Some(r) => {
+                // If this PartitionedFile's range starts at 0, we need to 
skip the header bytes.
+                // But then we need to seek back 16 bytes to include the sync 
marker for the first block,
+                // as the logic in this reader searches the data for the first 
sync marker(after which a block starts),
+                // then reads blocks from the count, size etc.
+                let start = r.start.max(header_len.checked_sub(16).unwrap());
+                let end = r.end.max(start).min(self.file_size); // Ensure end 
is not less than start, worst case range is empty
+                start..end
+            }
+            None => 0..self.file_size,

Review Comment:
   The builder reads the header in `read_header()` (lines 66-94), potentially 
fetching multiple chunks. Then `AsyncAvroFileReader` fetches range 
`0..file_size` again in `FirstFetch`, re-reading the header bytes. 
   
   What do you think about passing the leftover bytes from header parsing to 
avoid redundant I/O?
   
   ```rust
     async fn read_header(&mut self) -> Result<(Header, u64, Option<Bytes>), 
ArrowError> {                                                                   
                                                                         
         // ...                                                                 
                                                                                
                                      
                                                                                
                                                                                
                                                                      
         // Return any leftover bytes after header                              
                                                                                
                                                                      
         let leftover = if decoded < current_data.len() {                       
                                                                                
                                                                      
             Some(current_data.slice(decoded..))                                
                                                                                
                                                                      
         } else {                                                               
                                                                                
                                                                      
             None                                                               
                                                                                
                                                                      
         };                                                                     
                                                                                
                                                                      
                                                                                
                                                                                
                                                                      
         decoder.flush()                                                        
                                                                                
                                                                      
             .map(|header| (header, position, leftover))                        
                                                                                
                                                                      
             .ok_or_else(|| ...)                                                
                                                                                
                                                                      
     }                                                                          
                                                                                
                                       
   ```
                                                                                
                                                                                
                                                                                
               
   Then you could add an `InitialData` state variant to `ReaderState` and 
initialize with leftover bytes when available:
   ```rust                                                                      
                                                            
     let reader_state = match leftover_bytes {                                  
                                                                                
                                                                      
         Some(bytes) if !bytes.is_empty() => ReaderState::InitialData {         
                                                                                
                                                                      
             reader: self.reader,                                               
                                                                                
                                                                      
             initial_bytes: bytes                                               
                                                                                
                                                                      
         },                                                                     
                                                                                
                                                                      
         _ if range.start == range.end => ReaderState::Finished,                
                                                                                
                                                                      
         _ => ReaderState::Idle { reader: self.reader },                        
                                                                                
                                                                      
     };  
   ```



##########
arrow-avro/README.md:
##########
@@ -156,6 +193,11 @@ See the crate docs for runnable SOE and Confluent 
round‑trip examples.
   ```toml
   arrow-avro = { version = "56", default-features = false, features = 
["deflate", "snappy", "zstd"] }
   ```
+* Async reading from object stores (S3, GCS, etc.):
+
+  ```toml
+  arrow-avro = { version = "56", features = ["object_store"] }

Review Comment:
   We should probably bump this one (and the preceding example) upto v58,
   
   ```suggestion
     arrow-avro = { version = "58", features = ["object_store"] }
   ```



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::DecodingBlock {
+                    mut reader,
+                    mut data,
+                } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = self.block_decoder.decode(&data)?;
+                    if consumed == 0 {

Review Comment:
     I think there maybe an issue with using `consumed == 0` as the signal for 
detecting incomplete blocks here.                                               
                                                                                
                  
                                                                                
                                                                                
                                                                      
     Looking at `BlockDecoder::decode` in block.rs lines 78-129, it returns 0 
only when:                                                                      
                                                                                
     - The input buffer is empty at the start, OR                               
                                                                                
                                                                      
     - The decoder is already in Finished state                                 
                                                                                
                                                                      
                                                                                
                                                                                
                                                                      For a 
truly incomplete block, `decode()` consumes all available bytes (returns 
`data.len()`) and `flush()` returns `None`. The current logic likely never 
triggers when it should.                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                         
    You may want to consider changing the detection logic to check `flush()` 
first:                                                                          
                                                                                
                
     
     ```rust                                                                    
                                                                                
                                                               
     ReaderState::DecodingBlock { mut reader, mut data } => {                   
                                                                                
                                                                      
         let consumed = self.block_decoder.decode(&data)?;                      
                                                                                
                                                                      
         data = data.slice(consumed..);                                         
                                                                                
                                                                      
                                                                                
                                                                                
                                                                      
         // Check for complete block FIRST                                      
                                                                                
                                                                      
         if let Some(block) = self.block_decoder.flush() {                      
                                                                                
                                                                      
             let block_data = Bytes::from_owner(if let Some(ref codec) = 
self.codec {                                                                    
                                                                             
                 codec.decompress(&block.data)?                                 
                                                                                
                                                                      
             } else {                                                           
                                                                                
                                                                      
                 block.data                                                     
                                                                                
                                                                      
             });                                                                
                                                                                
                                                                      
             self.reader_state = ReaderState::ReadingBatches {                  
                                                                                
                                                                      
                 reader, data, block_data,                                      
                                                                                
                                                                      
                 remaining_in_block: block.count,                               
                                                                                
                                                                      
             };                                                                 
                                                                                
                                                                      
             continue;                                                          
                                                                                
                                                                      
         }                                                                      
                                                                                
                                                                      
                                                                                
                                                                                
                                                                      
         // No complete block                                                   
                                                                                
                                            
         if data.is_empty() && consumed == 0 {                                  
                                                                                
                                                                      
             // No progress on empty buffer = EOF                               
                                                                                
                                                                      
             let final_batch = self.decoder.flush();                            
                                                                                
                                                                      
             self.reader_state = ReaderState::Finished;                         
                                                                                
                                                                      
             return Poll::Ready(final_batch.transpose());                       
                                                                                
                                                                      
         }                                                                      
                                                                                
                                                                      
                                                                                
                                                                                
                                                                      
         if data.is_empty() {                                                   
                                                                                
                                                                      
             // All data consumed but block incomplete - need more bytes        
                                                                                
                                                                      
             // (incomplete block handling logic here)                          
                                                                                
                                                                      
         } else {                                                               
                                                                                
                                                                      
             // Still have data to process                                      
                                                                                
                                                                      
             self.reader_state = ReaderState::DecodingBlock { reader, data };   
                                                                                
                                                                      
         }                                                                      
                                                                                
                                                                      
     } 
     ```



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;

Review Comment:
   I wonder if there's a way around fetching the entire requested range into 
memory to reduce potential memory pressure. Maybe (in a future PR) we could use 
a `BytesMut` buffer or similar to implement chunked reads?



##########
arrow-avro/src/reader/async_reader/store.rs:
##########
@@ -0,0 +1,89 @@
+use crate::reader::async_reader::AsyncFileReader;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, TryFutureExt};
+use object_store::ObjectStore;
+use object_store::path::Path;
+use std::error::Error;
+use std::ops::Range;
+use std::sync::Arc;
+use tokio::runtime::Handle;
+
+/// An implementation of an AsyncFileReader using the [`ObjectStore`] API.
+pub struct AvroObjectReader {
+    store: Arc<dyn ObjectStore>,
+    path: Path,
+    runtime: Option<Handle>,
+}
+
+impl AvroObjectReader {
+    /// Creates a new [`Self`] from a store implementation and file location.
+    pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+        Self {
+            store,
+            path,
+            runtime: None,
+        }
+    }
+
+    /// Perform IO on the provided tokio runtime
+    ///
+    /// Tokio is a cooperative scheduler, and relies on tasks yielding in a 
timely manner
+    /// to service IO. Therefore, running IO and CPU-bound tasks, such as avro 
decoding,
+    /// on the same tokio runtime can lead to degraded throughput, dropped 
connections and
+    /// other issues. For more information see [here].
+    ///
+    /// [here]: 
https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+    pub fn with_runtime(self, handle: Handle) -> Self {
+        Self {
+            runtime: Some(handle),
+            ..self
+        }
+    }
+
+    fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O, ArrowError>>
+    where
+        F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a, 
Result<O, E>>
+            + Send
+            + 'static,
+        O: Send + 'static,
+        E: Error + Send + 'static,
+    {
+        match &self.runtime {
+            Some(handle) => {
+                let path = self.path.clone();
+                let store = Arc::clone(&self.store);
+                handle
+                    .spawn(async move { f(&store, &path).await })
+                    .map_ok_or_else(
+                        |e| match e.try_into_panic() {
+                            Err(e) => 
Err(ArrowError::AvroError(e.to_string())),
+                            Ok(p) => std::panic::resume_unwind(p),
+                        },
+                        |res| res.map_err(|e| 
ArrowError::AvroError(e.to_string())),
+                    )
+                    .boxed()
+            }
+            None => f(&self.store, &self.path)
+                .map_err(|e| ArrowError::AvroError(e.to_string()))
+                .boxed(),
+        }
+    }

Review Comment:
   This is sick!



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::DecodingBlock {
+                    mut reader,
+                    mut data,
+                } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = self.block_decoder.decode(&data)?;
+                    if consumed == 0 {
+                        // If the last block was exactly at the end of the 
file,
+                        // we're simply done reading.
+                        if data.is_empty() {
+                            let final_batch = self.decoder.flush();
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(final_batch.transpose());
+                        }
+
+                        // If we've tried the following stage before, and 
still can't decode,
+                        // this means the file is truncated or corrupted.
+                        if self.finishing_partial_block {
+                            return Poll::Ready(Some(Err(ArrowError::AvroError(
+                                "Unexpected EOF while reading last Avro 
block".into(),
+                            ))));
+                        }
+
+                        // Avro splitting case: block is incomplete, we need 
to:
+                        // 1. Parse the length so we know how much to read
+                        // 2. Fetch more data from the object store
+                        // 3. Create a new block data from the remaining slice 
and the newly fetched data
+                        // 4. Continue decoding until end of block
+                        self.finishing_partial_block = true;
+
+                        let (size, vlq_header_len) = {
+                            let mut vlq = VLQDecoder::default();
+                            let mut vlq_buf = &data[..];
+                            let original_len = vlq_buf.len();
+
+                            let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
count".into(),
+                                )
+                            })?;
+
+                            let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
size".into(),
+                                )
+                            })? as u64;
+
+                            // Calculate how many bytes were consumed by the 
two VLQ integers
+                            let header_len = 
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+                            (size, header_len as u64)
+                        };
+
+                        // Two longs: count and size have already been read, 
but using our vlq,
+                        // meaning they were not consumed.
+                        let total_block_size = size + vlq_header_len;

Review Comment:
   Is there any risks from the calculation omitting the 16-byte sync marker 
here?



##########
arrow-avro/Cargo.toml:
##########
@@ -45,16 +45,26 @@ sha256 = ["dep:sha2"]
 small_decimals = []
 avro_custom_types = ["dep:arrow-select"]
 
+# Enable async APIs
+async = ["futures", "tokio"]
+# Enable object_store integration
+object_store = ["dep:object_store", "async"]
+
 [dependencies]
 arrow-schema = { workspace = true }
 arrow-buffer = { workspace = true }
 arrow-array = { workspace = true }
 arrow-select = { workspace = true, optional = true }
+
+object_store = { version = "0.12.0", default-features = false, optional = true 
}

Review Comment:
   You may just want to consider dropping the patch version here. (something I 
need to not do as well imo)
   
   ```suggestion
   object_store = { version = "0.12", default-features = false, optional = true 
}
   ```



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::DecodingBlock {
+                    mut reader,
+                    mut data,
+                } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = self.block_decoder.decode(&data)?;
+                    if consumed == 0 {
+                        // If the last block was exactly at the end of the 
file,
+                        // we're simply done reading.
+                        if data.is_empty() {
+                            let final_batch = self.decoder.flush();
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(final_batch.transpose());
+                        }
+
+                        // If we've tried the following stage before, and 
still can't decode,
+                        // this means the file is truncated or corrupted.
+                        if self.finishing_partial_block {
+                            return Poll::Ready(Some(Err(ArrowError::AvroError(
+                                "Unexpected EOF while reading last Avro 
block".into(),
+                            ))));
+                        }
+
+                        // Avro splitting case: block is incomplete, we need 
to:
+                        // 1. Parse the length so we know how much to read
+                        // 2. Fetch more data from the object store
+                        // 3. Create a new block data from the remaining slice 
and the newly fetched data
+                        // 4. Continue decoding until end of block
+                        self.finishing_partial_block = true;
+
+                        let (size, vlq_header_len) = {
+                            let mut vlq = VLQDecoder::default();
+                            let mut vlq_buf = &data[..];
+                            let original_len = vlq_buf.len();
+
+                            let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
count".into(),
+                                )
+                            })?;
+
+                            let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
size".into(),
+                                )
+                            })? as u64;
+
+                            // Calculate how many bytes were consumed by the 
two VLQ integers
+                            let header_len = 
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+                            (size, header_len as u64)
+                        };
+
+                        // Two longs: count and size have already been read, 
but using our vlq,
+                        // meaning they were not consumed.
+                        let total_block_size = size + vlq_header_len;
+                        let remaining_to_fetch =
+                            total_block_size.checked_sub(data.len() as 
u64).unwrap();
+
+                        let range_to_fetch = self.range.end..(self.range.end + 
remaining_to_fetch);
+
+                        let future = async move {
+                            let data = reader.get_bytes(range_to_fetch).await?;
+                            Ok((reader, data))
+                        }
+                        .boxed();
+                        self.reader_state = ReaderState::ReadingFinalBlock {
+                            current_data: data,
+                            future,
+                        };
+                        continue;
+                    }
+
+                    // Slice off the consumed data
+                    data = data.slice(consumed..);
+
+                    // Decompress the block if needed, prepare it for decoding.
+                    if let Some(block) = self.block_decoder.flush() {
+                        // Successfully decoded a block.
+                        let block_data = Bytes::from_owner(if let Some(ref 
codec) = self.codec {
+                            codec.decompress(&block.data)?
+                        } else {
+                            block.data
+                        });
+
+                        // Since we have an active block, move to reading 
batches
+                        self.reader_state = ReaderState::ReadingBatches {
+                            reader,
+                            data,
+                            block_data,
+                            remaining_in_block: block.count,
+                        };
+                    } else {
+                        // Block not finished yet, try to decode more in the 
next iteration
+                        self.reader_state = ReaderState::DecodingBlock { 
reader, data };
+                    }
+                }
+                ReaderState::ReadingBatches {
+                    reader,
+                    data,
+                    mut block_data,
+                    mut remaining_in_block,
+                } => {
+                    let (consumed, records_decoded) =
+                        self.decoder.decode_block(&block_data, 
remaining_in_block)?;
+
+                    remaining_in_block -= records_decoded;
+
+                    if remaining_in_block == 0 {
+                        // Finished this block, move to decode next block in 
the next iteration
+                        self.reader_state = ReaderState::DecodingBlock { 
reader, data };
+                    } else {
+                        // Still more records to decode in this block, slice 
the already-read data and stay in this state
+                        block_data = block_data.slice(consumed..);
+                        self.reader_state = ReaderState::ReadingBatches {
+                            reader,
+                            data,
+                            block_data,
+                            remaining_in_block,
+                        };
+                    }
+
+                    // We have a full batch ready, emit it
+                    // (This is not mutually exclusive with the block being 
finished, so the state change is valid)
+                    if self.decoder.batch_is_full() {
+                        let batch_res = self.decoder.flush();
+                        return Poll::Ready(batch_res.transpose());
+                    }
+                }
+                ReaderState::ReadingFinalBlock {
+                    current_data,
+                    mut future,
+                } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::ReadingFinalBlock 
{
+                                current_data,
+                                future,
+                            };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    // If data already exists, it means we have a partial 
block,
+                    // Append the newly fetched chunk to the existing buffered 
data.
+                    let combined = Bytes::from_owner([current_data, 
data_chunk].concat());

Review Comment:
   I'm thinking we should be able to remove this concatenation since 
`BlockDecoder` maintains internal state across `decode()` calls. By doing so we 
could get rid of the allocation for `combined`, the copying of `current_data` 
and `data_chunk` and the re-feeding of already consumed bytes to `BlockDecoder`.



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::DecodingBlock {
+                    mut reader,
+                    mut data,
+                } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = self.block_decoder.decode(&data)?;
+                    if consumed == 0 {
+                        // If the last block was exactly at the end of the 
file,
+                        // we're simply done reading.
+                        if data.is_empty() {
+                            let final_batch = self.decoder.flush();
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(final_batch.transpose());
+                        }
+
+                        // If we've tried the following stage before, and 
still can't decode,
+                        // this means the file is truncated or corrupted.
+                        if self.finishing_partial_block {
+                            return Poll::Ready(Some(Err(ArrowError::AvroError(
+                                "Unexpected EOF while reading last Avro 
block".into(),
+                            ))));
+                        }
+
+                        // Avro splitting case: block is incomplete, we need 
to:
+                        // 1. Parse the length so we know how much to read
+                        // 2. Fetch more data from the object store
+                        // 3. Create a new block data from the remaining slice 
and the newly fetched data
+                        // 4. Continue decoding until end of block
+                        self.finishing_partial_block = true;
+
+                        let (size, vlq_header_len) = {
+                            let mut vlq = VLQDecoder::default();
+                            let mut vlq_buf = &data[..];
+                            let original_len = vlq_buf.len();
+
+                            let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
count".into(),
+                                )
+                            })?;
+
+                            let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
size".into(),
+                                )
+                            })? as u64;
+
+                            // Calculate how many bytes were consumed by the 
two VLQ integers
+                            let header_len = 
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+                            (size, header_len as u64)
+                        };
+
+                        // Two longs: count and size have already been read, 
but using our vlq,
+                        // meaning they were not consumed.
+                        let total_block_size = size + vlq_header_len;
+                        let remaining_to_fetch =
+                            total_block_size.checked_sub(data.len() as 
u64).unwrap();
+
+                        let range_to_fetch = self.range.end..(self.range.end + 
remaining_to_fetch);

Review Comment:
   Also it maybe worth clamping `range_to_fetch` to `file_size` to prevent out 
of bound requests.
   
   Maybe something like this?
   
   ```rust
     let fetch_end = 
(self.range.end.saturating_add(remaining_to_fetch)).min(self.file_size);        
                                                                                
                                                 
     let range_to_fetch = self.range.end..fetch_end;                            
                                                                                
                                                                      
                                                                                
                                                                                
                                                                      
     if range_to_fetch.is_empty() {                                             
                                                                                
                                                                      
         return Poll::Ready(Some(Err(ArrowError::AvroError(                     
                                                                                
                                                                      
             "Cannot complete block: insufficient data remaining in 
file".into()                                                                    
                                                                                
  
         ))));                                                                  
                                                                                
                                                                      
     }  
   ```



##########
parquet/src/arrow/array_reader/mod.rs:
##########
@@ -51,7 +51,7 @@ mod test_util;
 
 // Note that this crate is public under the `experimental` feature flag.
 use crate::file::metadata::RowGroupMetaData;
-pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
+pub use builder::{ArrayReaderBuilder, CacheOptionsBuilder};

Review Comment:
   Also I'm curious if these changes are relevant to this PR and should be 
included?



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::DecodingBlock {
+                    mut reader,
+                    mut data,
+                } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = self.block_decoder.decode(&data)?;
+                    if consumed == 0 {
+                        // If the last block was exactly at the end of the 
file,
+                        // we're simply done reading.
+                        if data.is_empty() {
+                            let final_batch = self.decoder.flush();
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(final_batch.transpose());
+                        }
+
+                        // If we've tried the following stage before, and 
still can't decode,
+                        // this means the file is truncated or corrupted.
+                        if self.finishing_partial_block {
+                            return Poll::Ready(Some(Err(ArrowError::AvroError(
+                                "Unexpected EOF while reading last Avro 
block".into(),
+                            ))));
+                        }
+
+                        // Avro splitting case: block is incomplete, we need 
to:
+                        // 1. Parse the length so we know how much to read
+                        // 2. Fetch more data from the object store
+                        // 3. Create a new block data from the remaining slice 
and the newly fetched data
+                        // 4. Continue decoding until end of block
+                        self.finishing_partial_block = true;
+
+                        let (size, vlq_header_len) = {
+                            let mut vlq = VLQDecoder::default();
+                            let mut vlq_buf = &data[..];
+                            let original_len = vlq_buf.len();
+
+                            let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
count".into(),
+                                )
+                            })?;
+
+                            let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
size".into(),
+                                )
+                            })? as u64;
+
+                            // Calculate how many bytes were consumed by the 
two VLQ integers
+                            let header_len = 
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+                            (size, header_len as u64)

Review Comment:
   ```suggestion
                               let header_len = 
original_len.checked_sub(vlq_buf.len()).ok_or_else(|| 
ArrowError::AvroError("Invalid VLQ header: consumed more bytes than 
available".into()))? as u64;
                               (size, header_len)
   ```



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }

Review Comment:
   Is the `ReaderState::Limbo` variant really necessary? Could we use 
`Finished` and if a bug causes an early return without setting state, the 
stream just ends (which is safer than panicking)?



##########
parquet/src/arrow/schema/extension.rs:
##########
@@ -36,6 +35,7 @@ use arrow_schema::extension::ExtensionType;
 /// Arrow DataType, and instead are represented by an Arrow ExtensionType.
 /// Extension types are attached to Arrow Fields via metadata.
 pub(crate) fn try_add_extension_type(
+    #[cfg_attr(all(not(feature = "variant_experimental"), not(feature = 
"arrow_canonical_extension_types"), not(feature = "geospatial")), 
allow(unused_mut))]

Review Comment:
   Also I'm curious if these changes are relevant to this PR and should be 
included?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to