alamb commented on code in PR #8316:
URL: https://github.com/apache/arrow-rs/pull/8316#discussion_r2341823318


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -17,49 +17,86 @@
 
 //! Avro reader
 //!
-//! This module provides facilities to read Apache Avro-encoded files or 
streams
-//! into Arrow's `RecordBatch` format. In particular, it introduces:
+//! Facilities to read Apache Avro–encoded data into Arrow's `RecordBatch` 
format.
 //!
-//! * `ReaderBuilder`: Configures Avro reading, e.g., batch size
-//! * `Reader`: Yields `RecordBatch` values, implementing `Iterator`
-//! * `Decoder`: A low-level push-based decoder for Avro records
+//! This module exposes three layers of the API surface, from highest to 
lowest-level:
 //!
-//! # Basic Usage
+//! * `ReaderBuilder`: configures how Avro is read (batch size, strict union 
handling,
+//!   string representation, reader schema, etc.) and produces either:
+//!   * a `Reader` for **Avro Object Container Files (OCF)** read from any 
`BufRead`, or
+//!   * a low-level `Decoder` for **single‑object encoded** Avro bytes and 
Confluent
+//!     **Schema Registry** framed messages.
+//! * `Reader`: a convenient, synchronous iterator over `RecordBatch` decoded 
from an OCF
+//!   input. Implements [`Iterator<Item = Result<RecordBatch, ArrowError>>`] 
and
+//!   `RecordBatchReader`.
+//! * `Decoder`: a push‑based row decoder that consumes raw Avro bytes and 
yields ready
+//!   `RecordBatch` values when batches fill. This is suitable for integrating 
with async
+//!   byte streams, network protocols, or other custom data sources.
 //!
-//! `Reader` can be used directly with synchronous data sources, such as 
[`std::fs::File`].
+//! ## Encodings and when to use which type
 //!
-//! ## Reading a Single Batch
+//! * **Object Container File (OCF)**: A self‑describing file format with a 
header containing
+//!   the writer schema, optional compression codec, and a sync marker, 
followed by one or
+//!   more data blocks. Use `Reader` for this format. See the Avro 
specification for the
+//!   structure of OCF headers and blocks. 
<https://avro.apache.org/docs/1.11.1/specification/>
+//! * **Single‑Object Encoding**: A stream‑friendly framing that prefixes each 
record body with
+//!   the 2‑byte magic `0xC3 0x01` followed by a schema fingerprint. Use 
`Decoder` with a
+//!   populated `SchemaStore` to resolve fingerprints to full
+//!   schemas. <https://avro.apache.org/docs/1.11.1/specification/>
+//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a 
4‑byte big‑endian
+//!   schema ID, then the Avro‑encoded body. Use `Decoder` with a
+//!   `SchemaStore` configured for `FingerprintAlgorithm::None`
+//!   and entries keyed by `Fingerprint::Id`. Confluent docs
+//!   describe this framing.
+//!
+//! ## Basic file usage (OCF)
+//!
+//! Use `ReaderBuilder::build` to construct a `Reader` from any `BufRead`, 
such as a
+//! `BufReader<File>`. The reader yields `RecordBatch` values you can iterate 
over or collect.
+//!
+//! ```no_run

Review Comment:
   why is this test marked as `no_run`? I think it would be fine to run it
   
   Maybe we can do what we do with Parquet and write a temporary file to 
memory, soething like
   
   
https://github.com/apache/arrow-rs/blob/f87f60e87eaebdb2e2103c12053bf7821ffae448/parquet/src/arrow/mod.rs#L102-L118



##########
arrow-avro/examples/decode_stream.rs:
##########
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decode Avro **stream-framed** bytes into Arrow [`RecordBatch`]es.

Review Comment:
   One comment is that I think many of these examples would also be easier to 
find if they were doc comment examples -- otherwise people will only be able to 
find these examples if they have the source checked out / think to look. 
   
   So I suggest you move as many of the small examples as makes sense into doc 
comments



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -138,7 +251,77 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
     )
 }
 
-/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+/// A low‑level, push‑based decoder from Avro bytes to Arrow `RecordBatch`.
+///
+/// `Decoder` is designed for **streaming** scenarios:
+///
+/// * You *feed* freshly received bytes using `Self::decode`, potentially 
multiple times,
+///   until at least one row is complete.
+/// * You then *drain* completed rows with `Self::flush`, which yields a 
`RecordBatch`
+///   if any rows were finished since the last flush.
+///
+/// Unlike `Reader`, which is specialized for Avro **Object Container Files**, 
`Decoder`
+/// understands **framed single‑object** inputs and **Confluent Schema 
Registry** messages,
+/// switching schemas mid‑stream when the framing indicates a new fingerprint.
+///
+/// ### Supported prefixes
+///
+/// On each new row boundary, `Decoder` tries to match one of the following 
"prefixes":
+///
+/// * **Single‑Object encoding**: magic `0xC3 0x01` + schema fingerprint 
(length depends on
+///   the configured `FingerprintAlgorithm`); see `SINGLE_OBJECT_MAGIC`.
+/// * **Confluent wire format**: magic `0x00` + 4‑byte big‑endian schema id; 
see
+///   `CONFLUENT_MAGIC`.
+///
+/// The active fingerprint determines which cached row decoder is used to 
decode the following
+/// record body bytes.
+///
+/// ### Schema switching semantics
+///
+/// When a new fingerprint is observed:
+///
+/// * If the current batch is empty, the decoder switches immediately;
+/// * Otherwise, the current batch is finalized on the next `flush` and only 
then
+///   does the decoder switch to the new schema. This guarantees that a single 
`RecordBatch`
+///   never mixes rows with different schemas.
+///
+/// ### Examples
+///
+/// Build a `Decoder` for single‑object encoding using a `SchemaStore` with 
Rabin fingerprints:
+///
+/// ```no_run
+/// use arrow_avro::schema::{AvroSchema, SchemaStore};
+/// use arrow_avro::reader::ReaderBuilder;
+///
+/// let mut store = SchemaStore::new(); // Rabin by default
+/// let avro = AvroSchema::new(r#""string""#.to_string());
+/// let _fp = store.register(avro).unwrap();
+///
+/// let mut decoder = ReaderBuilder::new()
+///     .with_writer_schema_store(store)
+///     .with_batch_size(512)
+///     .build_decoder()
+///     .unwrap();
+///
+/// // Feed bytes (framed as 0xC3 0x01 + fingerprint and body)

Review Comment:
   since you have working examples of this in the examples directory, I 
recommend moving the entire example here.
   
   You can hide the setup code by prefixing it with `#`
   
   the benefits are:
   1. We ensure the doc comments continue to work even if the code is changed 
(as they are compile checked as part of the test)
   2. It would be easier to find the entire working example right from the docs 
/ code directly (rather than having to find the relevant examples)



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -70,25 +107,101 @@
 //!             if buffered.is_empty() {
 //!                 buffered = match ready!(input.poll_next_unpin(cx)) {
 //!                     Some(b) => b,
-//!                     None => break,
+//!                     None => break, // EOF
 //!                 };
 //!             }
+//!             // Feed as much as possible
 //!             let decoded = match decoder.decode(buffered.as_ref()) {
-//!                 Ok(decoded) => decoded,
+//!                 Ok(n) => n,
 //!                 Err(e) => return Poll::Ready(Some(Err(e))),
 //!             };
 //!             let read = buffered.len();
 //!             buffered.advance(decoded);
 //!             if decoded != read {
+//!                 // decoder made partial progress; request more bytes
 //!                 break
 //!             }
 //!         }
-//!         // Convert any fully-decoded rows to a RecordBatch, if available
+//!         // Return a batch if one or more rows are complete
 //!         Poll::Ready(decoder.flush().transpose())
 //!     })
 //! }
 //! ```
 //!
+//! ### Building a `Decoder` for **single‑object encoding** (Rabin 
fingerprints)

Review Comment:
   I was confused about the Rabin finger print reference until I saw it is part 
of the spec: https://avro.apache.org/docs/1.12.0/specification/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to