jecsand838 commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2697681278
##########
arrow-avro/src/lib.rs:
##########
@@ -123,16 +123,56 @@
//! # Ok(()) }
//! ```
//!
+//! ## `async` Reading (`async` feature)
+//!
+//! The [`reader`] module provides async APIs for reading Avro files when the
`async`
+//! feature is enabled.
+//!
+//! [`AsyncAvroFileReader`] implements `Stream<Item = Result<RecordBatch,
ArrowError>>`,
+//! allowing efficient async streaming of record batches. When the
`object_store` feature
+//! is enabled, [`AvroObjectReader`] provides integration with object storage
services
+//! such as S3 via the [object_store] crate.
+//!
+//! ```ignore
Review Comment:
We should drop the `ignore` imo.
```suggestion
//! ```
```
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
+ ReaderState::DecodingBlock {
+ mut reader,
+ mut data,
+ } => {
+ // Try to decode another block from the buffered reader.
+ let consumed = self.block_decoder.decode(&data)?;
+ if consumed == 0 {
+ // If the last block was exactly at the end of the
file,
+ // we're simply done reading.
+ if data.is_empty() {
+ let final_batch = self.decoder.flush();
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(final_batch.transpose());
+ }
+
+ // If we've tried the following stage before, and
still can't decode,
+ // this means the file is truncated or corrupted.
+ if self.finishing_partial_block {
+ return Poll::Ready(Some(Err(ArrowError::AvroError(
+ "Unexpected EOF while reading last Avro
block".into(),
+ ))));
+ }
+
+ // Avro splitting case: block is incomplete, we need
to:
+ // 1. Parse the length so we know how much to read
+ // 2. Fetch more data from the object store
+ // 3. Create a new block data from the remaining slice
and the newly fetched data
+ // 4. Continue decoding until end of block
+ self.finishing_partial_block = true;
+
+ let (size, vlq_header_len) = {
+ let mut vlq = VLQDecoder::default();
+ let mut vlq_buf = &data[..];
+ let original_len = vlq_buf.len();
+
+ let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
count".into(),
+ )
+ })?;
+
+ let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
size".into(),
+ )
+ })? as u64;
+
+ // Calculate how many bytes were consumed by the
two VLQ integers
+ let header_len =
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+ (size, header_len as u64)
+ };
+
+ // Two longs: count and size have already been read,
but using our vlq,
+ // meaning they were not consumed.
+ let total_block_size = size + vlq_header_len;
+ let remaining_to_fetch =
+ total_block_size.checked_sub(data.len() as
u64).unwrap();
Review Comment:
```suggestion
let remaining_to_fetch =
total_block_size.checked_sub(data.len() as
u64).ok_or_else(|| ArrowError::AvroError("Invalid block size: data exceeds
expected block size".into()))?;
```
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
+ ReaderState::DecodingBlock {
+ mut reader,
+ mut data,
+ } => {
Review Comment:
I was thinking about it and you may want to consider a decode loop similar
to the sync `Reader::read` method's, specifically this logic:
```rust
let consumed = self.block_decoder.decode(buf)?;
self.reader.consume(consumed);
if let Some(block) = self.block_decoder.flush() {
// Block complete - use it
} else if consumed == 0 {
// Stuck on non-empty buffer - error
return Err(ArrowError::ParseError(...));
}
// Otherwise: made progress, loop for more data
```
From an architectural perspective the advantages would be:
1. Always calls `flush()` after `decode()` to check for complete blocks
2. Only errors when stuck (`consumed == 0` on non-empty buffer AND `flush()
== None`)
3. Trusts `BlockDecoder` to handle partial data incrementally
Maybe it could resemble something like this pseduo-code?
```rust
ReaderState::DecodingBlock { mut reader, mut data } = {
let consumed = self.block_decoder.decode(&data)?;
data = data.slice(consumed..); // Equivalent to reader.consume()
if let Some(block) = self.block_decoder.flush() {
// Block complete - proceed to ReadingBatches
let block_data = Bytes::from_owner(if let Some(ref codec) =
self.codec {
codec.decompress(&block.data)?
} else {
block.data
});
self.reader_state = ReaderState::ReadingBatches {
reader, data, block_data,
remaining_in_block: block.count,
};
continue;
}
// No complete block yet
if consumed == 0 && !data.is_empty() {
// Stuck - no progress on non-empty buffer = corrupted data
return Poll::Ready(Some(Err(ArrowError::ParseError(
"Could not decode next Avro block from partial data".into()
))));
}
if data.is_empty() {
// Buffer exhausted, block incomplete
if self.finishing_partial_block {
return Poll::Ready(Some(Err(ArrowError::AvroError(
"Unexpected EOF while reading last Avro block".into()
))));
}
// Fetch more data (range end case) or finish
// ... simplified fetch logic here ...
} else {
// Made progress but not complete - continue decoding
self.reader_state = ReaderState::DecodingBlock { reader, data };
}
}
```
##########
arrow-avro/src/reader/async_reader/builder.rs:
##########
@@ -0,0 +1,163 @@
+use crate::codec::AvroFieldBuilder;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm};
+use arrow_schema::ArrowError;
+use indexmap::IndexMap;
+use std::ops::Range;
+
+const DEFAULT_HEADER_SIZE_HINT: u64 = 64 * 1024; // 64 KB
+
+/// Builder for an asynchronous Avro file reader.
+pub struct AsyncAvroFileReaderBuilder<R: AsyncFileReader> {
+ reader: R,
+ file_size: u64,
+ batch_size: usize,
+ range: Option<Range<u64>>,
+ reader_schema: Option<AvroSchema>,
+ header_size_hint: Option<u64>,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReaderBuilder<R> {
+ pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
+ Self {
+ reader,
+ file_size,
+ batch_size,
+ range: None,
+ reader_schema: None,
+ header_size_hint: None,
+ }
+ }
+
+ /// Specify a byte range to read from the Avro file.
+ /// If this is provided, the reader will read all the blocks within the
specified range,
+ /// if the range ends mid-block, it will attempt to fetch the remaining
bytes to complete the block,
+ /// but no further blocks will be read.
+ /// If this is omitted, the full file will be read.
+ pub fn with_range(self, range: Range<u64>) -> Self {
+ Self {
+ range: Some(range),
+ ..self
+ }
+ }
+
+ /// Specify a reader schema to use when reading the Avro file.
+ /// This can be useful to project specific columns or handle schema
evolution.
+ /// If this is not provided, the schema will be derived from the Arrow
schema provided.
+ pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
+ Self {
+ reader_schema: Some(reader_schema),
+ ..self
+ }
+ }
+
+ /// Provide a hint for the expected size of the Avro header in bytes.
+ /// This can help optimize the initial read operation when fetching the
header.
+ pub fn with_header_size_hint(self, hint: u64) -> Self {
+ Self {
+ header_size_hint: Some(hint),
+ ..self
+ }
+ }
+
+ async fn read_header(&mut self) -> Result<(Header, u64), ArrowError> {
+ let mut decoder = HeaderDecoder::default();
+ let mut position = 0;
+ loop {
+ let range_to_fetch = position
+ ..(position +
self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
+ .min(self.file_size);
+ let current_data =
self.reader.get_bytes(range_to_fetch).await.map_err(|err| {
+ ArrowError::AvroError(format!(
+ "Error fetching Avro header from object store: {err}"
+ ))
+ })?;
+ if current_data.is_empty() {
+ break;
+ }
+ let read = current_data.len();
+ let decoded = decoder.decode(¤t_data)?;
+ if decoded != read {
+ position += decoded as u64;
+ break;
+ }
+ position += read as u64;
+ }
+
+ decoder
+ .flush()
+ .map(|header| (header, position))
+ .ok_or_else(|| ArrowError::AvroError("Unexpected EOF while reading
Avro header".into()))
+ }
+
+ /// Build the asynchronous Avro reader with the provided parameters.
+ /// This reads the header first to initialize the reader state.
+ pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>,
ArrowError> {
+ if self.file_size == 0 {
+ return Err(ArrowError::AvroError("File size cannot be 0".into()));
+ }
+
+ // Start by reading the header from the beginning of the avro file
+ let (header, header_len) = self.read_header().await?;
+ let writer_schema = header
+ .schema()
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+ .ok_or_else(|| {
+ ArrowError::ParseError("No Avro schema present in file
header".into())
+ })?;
+
+ let root = {
+ let field_builder = AvroFieldBuilder::new(&writer_schema);
+ if let Some(provided_schema) = self.reader_schema.as_ref() {
+ let reader_schema = provided_schema.schema()?;
+ field_builder.with_reader_schema(&reader_schema).build()
+ } else {
+ field_builder.build()
+ }
+ }?;
+
+ let record_decoder =
RecordDecoder::try_new_with_options(root.data_type())?;
+
+ let decoder = Decoder::from_parts(
+ self.batch_size,
+ record_decoder,
+ None,
+ IndexMap::new(),
+ FingerprintAlgorithm::Rabin,
+ );
+ let range = match self.range {
+ Some(r) => {
+ // If this PartitionedFile's range starts at 0, we need to
skip the header bytes.
+ // But then we need to seek back 16 bytes to include the sync
marker for the first block,
+ // as the logic in this reader searches the data for the first
sync marker(after which a block starts),
+ // then reads blocks from the count, size etc.
+ let start = r.start.max(header_len.checked_sub(16).unwrap());
+ let end = r.end.max(start).min(self.file_size); // Ensure end
is not less than start, worst case range is empty
+ start..end
+ }
+ None => 0..self.file_size,
Review Comment:
The builder reads the header in `read_header()` (lines 66-94), potentially
fetching multiple chunks. Then `AsyncAvroFileReader` fetches range
`0..file_size` again in `FirstFetch`, re-reading the header bytes.
What do you think about passing the leftover bytes from header parsing to
avoid redundant I/O?
```rust
async fn read_header(&mut self) -> Result<(Header, u64, Option<Bytes>),
ArrowError> {
// ...
// Return any leftover bytes after header
let leftover = if decoded < current_data.len() {
Some(current_data.slice(decoded..))
} else {
None
};
decoder.flush()
.map(|header| (header, position, leftover))
.ok_or_else(|| ...)
}
```
Then you could add an `InitialData` state variant to `ReaderState` and
initialize with leftover bytes when available:
```rust
let reader_state = match leftover_bytes {
Some(bytes) if !bytes.is_empty() => ReaderState::InitialData {
reader: self.reader,
initial_bytes: bytes
},
_ if range.start == range.end => ReaderState::Finished,
_ => ReaderState::Idle { reader: self.reader },
};
```
##########
arrow-avro/README.md:
##########
@@ -156,6 +193,11 @@ See the crate docs for runnable SOE and Confluent
round‑trip examples.
```toml
arrow-avro = { version = "56", default-features = false, features =
["deflate", "snappy", "zstd"] }
```
+* Async reading from object stores (S3, GCS, etc.):
+
+ ```toml
+ arrow-avro = { version = "56", features = ["object_store"] }
Review Comment:
We should probably bump this one (and the preceding example) upto v58,
```suggestion
arrow-avro = { version = "58", features = ["object_store"] }
```
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
+ ReaderState::DecodingBlock {
+ mut reader,
+ mut data,
+ } => {
+ // Try to decode another block from the buffered reader.
+ let consumed = self.block_decoder.decode(&data)?;
+ if consumed == 0 {
Review Comment:
I think there maybe an issue with using `consumed == 0` as the signal for
detecting incomplete blocks here.
Looking at `BlockDecoder::decode` in block.rs lines 78-129, it returns 0
only when:
- The input buffer is empty at the start, OR
- The decoder is already in Finished state
For a
truly incomplete block, `decode()` consumes all available bytes (returns
`data.len()`) and `flush()` returns `None`. The current logic likely never
triggers when it should.
You may want to consider changing the detection logic to check `flush()`
first:
```rust
ReaderState::DecodingBlock { mut reader, mut data } => {
let consumed = self.block_decoder.decode(&data)?;
data = data.slice(consumed..);
// Check for complete block FIRST
if let Some(block) = self.block_decoder.flush() {
let block_data = Bytes::from_owner(if let Some(ref codec) =
self.codec {
codec.decompress(&block.data)?
} else {
block.data
});
self.reader_state = ReaderState::ReadingBatches {
reader, data, block_data,
remaining_in_block: block.count,
};
continue;
}
// No complete block
if data.is_empty() && consumed == 0 {
// No progress on empty buffer = EOF
let final_batch = self.decoder.flush();
self.reader_state = ReaderState::Finished;
return Poll::Ready(final_batch.transpose());
}
if data.is_empty() {
// All data consumed but block incomplete - need more bytes
// (incomplete block handling logic here)
} else {
// Still have data to process
self.reader_state = ReaderState::DecodingBlock { reader, data };
}
}
```
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
Review Comment:
I wonder if there's a way around fetching the entire requested range into
memory to reduce potential memory pressure. Maybe (in a future PR) we could use
a `BytesMut` buffer or similar to implement chunked reads?
##########
arrow-avro/src/reader/async_reader/store.rs:
##########
@@ -0,0 +1,89 @@
+use crate::reader::async_reader::AsyncFileReader;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, TryFutureExt};
+use object_store::ObjectStore;
+use object_store::path::Path;
+use std::error::Error;
+use std::ops::Range;
+use std::sync::Arc;
+use tokio::runtime::Handle;
+
+/// An implementation of an AsyncFileReader using the [`ObjectStore`] API.
+pub struct AvroObjectReader {
+ store: Arc<dyn ObjectStore>,
+ path: Path,
+ runtime: Option<Handle>,
+}
+
+impl AvroObjectReader {
+ /// Creates a new [`Self`] from a store implementation and file location.
+ pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+ Self {
+ store,
+ path,
+ runtime: None,
+ }
+ }
+
+ /// Perform IO on the provided tokio runtime
+ ///
+ /// Tokio is a cooperative scheduler, and relies on tasks yielding in a
timely manner
+ /// to service IO. Therefore, running IO and CPU-bound tasks, such as avro
decoding,
+ /// on the same tokio runtime can lead to degraded throughput, dropped
connections and
+ /// other issues. For more information see [here].
+ ///
+ /// [here]:
https://www.influxdata.com/blog/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/
+ pub fn with_runtime(self, handle: Handle) -> Self {
+ Self {
+ runtime: Some(handle),
+ ..self
+ }
+ }
+
+ fn spawn<F, O, E>(&self, f: F) -> BoxFuture<'_, Result<O, ArrowError>>
+ where
+ F: for<'a> FnOnce(&'a Arc<dyn ObjectStore>, &'a Path) -> BoxFuture<'a,
Result<O, E>>
+ + Send
+ + 'static,
+ O: Send + 'static,
+ E: Error + Send + 'static,
+ {
+ match &self.runtime {
+ Some(handle) => {
+ let path = self.path.clone();
+ let store = Arc::clone(&self.store);
+ handle
+ .spawn(async move { f(&store, &path).await })
+ .map_ok_or_else(
+ |e| match e.try_into_panic() {
+ Err(e) =>
Err(ArrowError::AvroError(e.to_string())),
+ Ok(p) => std::panic::resume_unwind(p),
+ },
+ |res| res.map_err(|e|
ArrowError::AvroError(e.to_string())),
+ )
+ .boxed()
+ }
+ None => f(&self.store, &self.path)
+ .map_err(|e| ArrowError::AvroError(e.to_string()))
+ .boxed(),
+ }
+ }
Review Comment:
This is sick!
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
+ ReaderState::DecodingBlock {
+ mut reader,
+ mut data,
+ } => {
+ // Try to decode another block from the buffered reader.
+ let consumed = self.block_decoder.decode(&data)?;
+ if consumed == 0 {
+ // If the last block was exactly at the end of the
file,
+ // we're simply done reading.
+ if data.is_empty() {
+ let final_batch = self.decoder.flush();
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(final_batch.transpose());
+ }
+
+ // If we've tried the following stage before, and
still can't decode,
+ // this means the file is truncated or corrupted.
+ if self.finishing_partial_block {
+ return Poll::Ready(Some(Err(ArrowError::AvroError(
+ "Unexpected EOF while reading last Avro
block".into(),
+ ))));
+ }
+
+ // Avro splitting case: block is incomplete, we need
to:
+ // 1. Parse the length so we know how much to read
+ // 2. Fetch more data from the object store
+ // 3. Create a new block data from the remaining slice
and the newly fetched data
+ // 4. Continue decoding until end of block
+ self.finishing_partial_block = true;
+
+ let (size, vlq_header_len) = {
+ let mut vlq = VLQDecoder::default();
+ let mut vlq_buf = &data[..];
+ let original_len = vlq_buf.len();
+
+ let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
count".into(),
+ )
+ })?;
+
+ let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
size".into(),
+ )
+ })? as u64;
+
+ // Calculate how many bytes were consumed by the
two VLQ integers
+ let header_len =
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+ (size, header_len as u64)
+ };
+
+ // Two longs: count and size have already been read,
but using our vlq,
+ // meaning they were not consumed.
+ let total_block_size = size + vlq_header_len;
Review Comment:
Is there any risks from the calculation omitting the 16-byte sync marker
here?
##########
arrow-avro/Cargo.toml:
##########
@@ -45,16 +45,26 @@ sha256 = ["dep:sha2"]
small_decimals = []
avro_custom_types = ["dep:arrow-select"]
+# Enable async APIs
+async = ["futures", "tokio"]
+# Enable object_store integration
+object_store = ["dep:object_store", "async"]
+
[dependencies]
arrow-schema = { workspace = true }
arrow-buffer = { workspace = true }
arrow-array = { workspace = true }
arrow-select = { workspace = true, optional = true }
+
+object_store = { version = "0.12.0", default-features = false, optional = true
}
Review Comment:
You may just want to consider dropping the patch version here. (something I
need to not do as well imo)
```suggestion
object_store = { version = "0.12", default-features = false, optional = true
}
```
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
+ ReaderState::DecodingBlock {
+ mut reader,
+ mut data,
+ } => {
+ // Try to decode another block from the buffered reader.
+ let consumed = self.block_decoder.decode(&data)?;
+ if consumed == 0 {
+ // If the last block was exactly at the end of the
file,
+ // we're simply done reading.
+ if data.is_empty() {
+ let final_batch = self.decoder.flush();
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(final_batch.transpose());
+ }
+
+ // If we've tried the following stage before, and
still can't decode,
+ // this means the file is truncated or corrupted.
+ if self.finishing_partial_block {
+ return Poll::Ready(Some(Err(ArrowError::AvroError(
+ "Unexpected EOF while reading last Avro
block".into(),
+ ))));
+ }
+
+ // Avro splitting case: block is incomplete, we need
to:
+ // 1. Parse the length so we know how much to read
+ // 2. Fetch more data from the object store
+ // 3. Create a new block data from the remaining slice
and the newly fetched data
+ // 4. Continue decoding until end of block
+ self.finishing_partial_block = true;
+
+ let (size, vlq_header_len) = {
+ let mut vlq = VLQDecoder::default();
+ let mut vlq_buf = &data[..];
+ let original_len = vlq_buf.len();
+
+ let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
count".into(),
+ )
+ })?;
+
+ let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
size".into(),
+ )
+ })? as u64;
+
+ // Calculate how many bytes were consumed by the
two VLQ integers
+ let header_len =
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+ (size, header_len as u64)
+ };
+
+ // Two longs: count and size have already been read,
but using our vlq,
+ // meaning they were not consumed.
+ let total_block_size = size + vlq_header_len;
+ let remaining_to_fetch =
+ total_block_size.checked_sub(data.len() as
u64).unwrap();
+
+ let range_to_fetch = self.range.end..(self.range.end +
remaining_to_fetch);
+
+ let future = async move {
+ let data = reader.get_bytes(range_to_fetch).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+ self.reader_state = ReaderState::ReadingFinalBlock {
+ current_data: data,
+ future,
+ };
+ continue;
+ }
+
+ // Slice off the consumed data
+ data = data.slice(consumed..);
+
+ // Decompress the block if needed, prepare it for decoding.
+ if let Some(block) = self.block_decoder.flush() {
+ // Successfully decoded a block.
+ let block_data = Bytes::from_owner(if let Some(ref
codec) = self.codec {
+ codec.decompress(&block.data)?
+ } else {
+ block.data
+ });
+
+ // Since we have an active block, move to reading
batches
+ self.reader_state = ReaderState::ReadingBatches {
+ reader,
+ data,
+ block_data,
+ remaining_in_block: block.count,
+ };
+ } else {
+ // Block not finished yet, try to decode more in the
next iteration
+ self.reader_state = ReaderState::DecodingBlock {
reader, data };
+ }
+ }
+ ReaderState::ReadingBatches {
+ reader,
+ data,
+ mut block_data,
+ mut remaining_in_block,
+ } => {
+ let (consumed, records_decoded) =
+ self.decoder.decode_block(&block_data,
remaining_in_block)?;
+
+ remaining_in_block -= records_decoded;
+
+ if remaining_in_block == 0 {
+ // Finished this block, move to decode next block in
the next iteration
+ self.reader_state = ReaderState::DecodingBlock {
reader, data };
+ } else {
+ // Still more records to decode in this block, slice
the already-read data and stay in this state
+ block_data = block_data.slice(consumed..);
+ self.reader_state = ReaderState::ReadingBatches {
+ reader,
+ data,
+ block_data,
+ remaining_in_block,
+ };
+ }
+
+ // We have a full batch ready, emit it
+ // (This is not mutually exclusive with the block being
finished, so the state change is valid)
+ if self.decoder.batch_is_full() {
+ let batch_res = self.decoder.flush();
+ return Poll::Ready(batch_res.transpose());
+ }
+ }
+ ReaderState::ReadingFinalBlock {
+ current_data,
+ mut future,
+ } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::ReadingFinalBlock
{
+ current_data,
+ future,
+ };
+ return Poll::Pending;
+ }
+ };
+
+ // If data already exists, it means we have a partial
block,
+ // Append the newly fetched chunk to the existing buffered
data.
+ let combined = Bytes::from_owner([current_data,
data_chunk].concat());
Review Comment:
I'm thinking we should be able to remove this concatenation since
`BlockDecoder` maintains internal state across `decode()` calls. By doing so we
could get rid of the allocation for `combined`, the copying of `current_data`
and `data_chunk` and the re-feeding of already consumed bytes to `BlockDecoder`.
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
+ ReaderState::DecodingBlock {
+ mut reader,
+ mut data,
+ } => {
+ // Try to decode another block from the buffered reader.
+ let consumed = self.block_decoder.decode(&data)?;
+ if consumed == 0 {
+ // If the last block was exactly at the end of the
file,
+ // we're simply done reading.
+ if data.is_empty() {
+ let final_batch = self.decoder.flush();
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(final_batch.transpose());
+ }
+
+ // If we've tried the following stage before, and
still can't decode,
+ // this means the file is truncated or corrupted.
+ if self.finishing_partial_block {
+ return Poll::Ready(Some(Err(ArrowError::AvroError(
+ "Unexpected EOF while reading last Avro
block".into(),
+ ))));
+ }
+
+ // Avro splitting case: block is incomplete, we need
to:
+ // 1. Parse the length so we know how much to read
+ // 2. Fetch more data from the object store
+ // 3. Create a new block data from the remaining slice
and the newly fetched data
+ // 4. Continue decoding until end of block
+ self.finishing_partial_block = true;
+
+ let (size, vlq_header_len) = {
+ let mut vlq = VLQDecoder::default();
+ let mut vlq_buf = &data[..];
+ let original_len = vlq_buf.len();
+
+ let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
count".into(),
+ )
+ })?;
+
+ let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
size".into(),
+ )
+ })? as u64;
+
+ // Calculate how many bytes were consumed by the
two VLQ integers
+ let header_len =
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+ (size, header_len as u64)
+ };
+
+ // Two longs: count and size have already been read,
but using our vlq,
+ // meaning they were not consumed.
+ let total_block_size = size + vlq_header_len;
+ let remaining_to_fetch =
+ total_block_size.checked_sub(data.len() as
u64).unwrap();
+
+ let range_to_fetch = self.range.end..(self.range.end +
remaining_to_fetch);
Review Comment:
Also it maybe worth clamping `range_to_fetch` to `file_size` to prevent out
of bound requests.
Maybe something like this?
```rust
let fetch_end =
(self.range.end.saturating_add(remaining_to_fetch)).min(self.file_size);
let range_to_fetch = self.range.end..fetch_end;
if range_to_fetch.is_empty() {
return Poll::Ready(Some(Err(ArrowError::AvroError(
"Cannot complete block: insufficient data remaining in
file".into()
))));
}
```
##########
parquet/src/arrow/array_reader/mod.rs:
##########
@@ -51,7 +51,7 @@ mod test_util;
// Note that this crate is public under the `experimental` feature flag.
use crate::file::metadata::RowGroupMetaData;
-pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
+pub use builder::{ArrayReaderBuilder, CacheOptionsBuilder};
Review Comment:
Also I'm curious if these changes are relevant to this PR and should be
included?
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
+ ReaderState::DecodingBlock {
+ mut reader,
+ mut data,
+ } => {
+ // Try to decode another block from the buffered reader.
+ let consumed = self.block_decoder.decode(&data)?;
+ if consumed == 0 {
+ // If the last block was exactly at the end of the
file,
+ // we're simply done reading.
+ if data.is_empty() {
+ let final_batch = self.decoder.flush();
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(final_batch.transpose());
+ }
+
+ // If we've tried the following stage before, and
still can't decode,
+ // this means the file is truncated or corrupted.
+ if self.finishing_partial_block {
+ return Poll::Ready(Some(Err(ArrowError::AvroError(
+ "Unexpected EOF while reading last Avro
block".into(),
+ ))));
+ }
+
+ // Avro splitting case: block is incomplete, we need
to:
+ // 1. Parse the length so we know how much to read
+ // 2. Fetch more data from the object store
+ // 3. Create a new block data from the remaining slice
and the newly fetched data
+ // 4. Continue decoding until end of block
+ self.finishing_partial_block = true;
+
+ let (size, vlq_header_len) = {
+ let mut vlq = VLQDecoder::default();
+ let mut vlq_buf = &data[..];
+ let original_len = vlq_buf.len();
+
+ let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
count".into(),
+ )
+ })?;
+
+ let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+ ArrowError::AvroError(
+ "Unexpected EOF while reading Avro block
size".into(),
+ )
+ })? as u64;
+
+ // Calculate how many bytes were consumed by the
two VLQ integers
+ let header_len =
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+ (size, header_len as u64)
Review Comment:
```suggestion
let header_len =
original_len.checked_sub(vlq_buf.len()).ok_or_else(||
ArrowError::AvroError("Invalid VLQ header: consumed more bytes than
available".into()))? as u64;
(size, header_len)
```
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
Review Comment:
Is the `ReaderState::Limbo` variant really necessary? Could we use
`Finished` and if a bug causes an early return without setting state, the
stream just ends (which is safer than panicking)?
##########
parquet/src/arrow/schema/extension.rs:
##########
@@ -36,6 +35,7 @@ use arrow_schema::extension::ExtensionType;
/// Arrow DataType, and instead are represented by an Arrow ExtensionType.
/// Extension types are attached to Arrow Fields via metadata.
pub(crate) fn try_add_extension_type(
+ #[cfg_attr(all(not(feature = "variant_experimental"), not(feature =
"arrow_canonical_extension_types"), not(feature = "geospatial")),
allow(unused_mut))]
Review Comment:
Also I'm curious if these changes are relevant to this PR and should be
included?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]