This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 3027dbc595 Follow-up on arrow-avro Documentation (#8402)
3027dbc595 is described below

commit 3027dbc595819763dc2bff74b024ce943c82ca06
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Sep 23 13:38:03 2025 -0500

    Follow-up on arrow-avro Documentation (#8402)
    
    # Which issue does this PR close?
    
    - **Related to**: #4886 (“Add Avro Support”)
    - **Follows-up** on https://github.com/apache/arrow-rs/pull/8316
    
    # Rationale for this change
    
    @alamb had some recommendations for improving the `arrow-avro`
    documentation in #8316. This is a follow-up to address those
    suggestions.
    
    # What changes are included in this PR?
    
    1. `lib.rs` documentation
    2. `reader/mod.rs` improved documentation and inlined examples
    3. `writer/mod.rs`  improved documentation and inlined examples
    
    **NOTE:** Some doc tests are temporarily ignored until
    https://github.com/apache/arrow-rs/pull/8371 is merged in.
    
    # Are these changes tested?
    
    Yes, doc tests have been included which all run (with the exception of 3
    ignored ones that will work soon)
    
    <img width="1861" height="1027" alt="Screenshot 2025-09-22 at 3 36
    02 AM"
    
src="https://github.com/user-attachments/assets/9dbec0bd-aae0-4655-ab9d-89f9b2fc4e9a";
    />
    <img width="1878" height="1022" alt="Screenshot 2025-09-22 at 3 36
    19 AM"
    
src="https://github.com/user-attachments/assets/44f59af8-bcdb-4526-a97d-8a3478ec356b";
    />
    <img width="1900" height="1021" alt="Screenshot 2025-09-22 at 3 36
    34 AM"
    
src="https://github.com/user-attachments/assets/ebda9db2-103f-4026-be78-66c6115b557c";
    />
    
    
    # Are there any user-facing changes?
    
    N/A
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-avro/examples/decode_stream.rs | 104 --------
 arrow-avro/examples/read_avro_ocf.rs |  71 ------
 arrow-avro/src/lib.rs                | 195 ++++++++++++++-
 arrow-avro/src/reader/mod.rs         | 473 +++++++++++++++++++++++++++++------
 arrow-avro/src/schema.rs             |  33 ++-
 arrow-avro/src/writer/mod.rs         | 187 ++++++++++++--
 6 files changed, 771 insertions(+), 292 deletions(-)

diff --git a/arrow-avro/examples/decode_stream.rs 
b/arrow-avro/examples/decode_stream.rs
deleted file mode 100644
index fe13382d29..0000000000
--- a/arrow-avro/examples/decode_stream.rs
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Decode Avro **stream-framed** bytes into Arrow [`RecordBatch`]es.
-//!
-//! This example demonstrates how to:
-//! * Build a streaming `Decoder` via `ReaderBuilder::build_decoder`
-//! * Register a writer schema keyed by a **Single‑Object** Rabin fingerprint
-//! * Generate a few **Single‑Object** frames in‑memory and decode them
-
-use arrow_avro::reader::ReaderBuilder;
-use arrow_avro::schema::{AvroSchema, Fingerprint, SchemaStore, 
SINGLE_OBJECT_MAGIC};
-
-fn encode_long(value: i64, out: &mut Vec<u8>) {
-    let mut n = ((value << 1) ^ (value >> 63)) as u64;
-    while (n & !0x7F) != 0 {
-        out.push(((n as u8) & 0x7F) | 0x80);
-        n >>= 7;
-    }
-    out.push(n as u8);
-}
-
-fn encode_len(len: usize, out: &mut Vec<u8>) {
-    encode_long(len as i64, out)
-}
-
-fn encode_string(s: &str, out: &mut Vec<u8>) {
-    encode_len(s.len(), out);
-    out.extend_from_slice(s.as_bytes());
-}
-
-fn encode_user_body(id: i64, name: &str) -> Vec<u8> {
-    let mut v = Vec::with_capacity(16 + name.len());
-    encode_long(id, &mut v);
-    encode_string(name, &mut v);
-    v
-}
-
-// Frame a body as Avro Single‑Object: magic + 8-byte little‑endian 
fingerprint + body
-fn frame_single_object(fp_rabin: u64, body: &[u8]) -> Vec<u8> {
-    let mut out = Vec::with_capacity(2 + 8 + body.len());
-    out.extend_from_slice(&SINGLE_OBJECT_MAGIC);
-    out.extend_from_slice(&fp_rabin.to_le_bytes());
-    out.extend_from_slice(body);
-    out
-}
-
-fn main() -> Result<(), Box<dyn std::error::Error>> {
-    // A tiny Avro writer schema used to generate a few messages
-    let avro = AvroSchema::new(
-        r#"{"type":"record","name":"User","fields":[
-            {"name":"id","type":"long"},
-            {"name":"name","type":"string"}]}"#
-            .to_string(),
-    );
-
-    // Register the writer schema in a store (keyed by Rabin fingerprint).
-    // Keep the fingerprint to seed the decoder and to frame generated 
messages.
-    let mut store = SchemaStore::new();
-    let fp = store.register(avro.clone())?;
-    let rabin = match fp {
-        Fingerprint::Rabin(v) => v,
-        _ => unreachable!("Single‑Object framing uses Rabin fingerprints"),
-    };
-
-    // Build a streaming decoder configured for Single‑Object framing.
-    let mut decoder = ReaderBuilder::new()
-        .with_writer_schema_store(store)
-        .with_active_fingerprint(fp)
-        .build_decoder()?;
-
-    // Generate 5 Single‑Object frames for the "User" schema.
-    let mut bytes = Vec::new();
-    for i in 0..5 {
-        let body = encode_user_body(i as i64, &format!("user-{i}"));
-        bytes.extend_from_slice(&frame_single_object(rabin, &body));
-    }
-
-    // Feed all bytes at once, then flush completed batches.
-    let _consumed = decoder.decode(&bytes)?;
-    while let Some(batch) = decoder.flush()? {
-        println!(
-            "Batch: rows = {:>3}, cols = {}",
-            batch.num_rows(),
-            batch.num_columns()
-        );
-    }
-
-    Ok(())
-}
diff --git a/arrow-avro/examples/read_avro_ocf.rs 
b/arrow-avro/examples/read_avro_ocf.rs
deleted file mode 100644
index bf17ed572b..0000000000
--- a/arrow-avro/examples/read_avro_ocf.rs
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Read an Avro **Object Container File (OCF)** into Arrow [`RecordBatch`] 
values.
-//!
-//! This example demonstrates how to:
-//! * Construct a [`Reader`] using [`ReaderBuilder::build`]
-//! * Iterate `RecordBatch`es and print a brief summary
-
-use std::fs::File;
-use std::io::BufReader;
-use std::path::PathBuf;
-
-use arrow_array::RecordBatch;
-use arrow_avro::reader::ReaderBuilder;
-
-fn main() -> Result<(), Box<dyn std::error::Error>> {
-    let ocf_path: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
-        .join("test")
-        .join("data")
-        .join("skippable_types.avro");
-
-    let reader = BufReader::new(File::open(&ocf_path)?);
-    // Build a high-level OCF Reader with default settings
-    let avro_reader = ReaderBuilder::new().build(reader)?;
-    let schema = avro_reader.schema();
-    println!(
-        "Discovered Arrow schema with {} fields",
-        schema.fields().len()
-    );
-
-    let mut total_batches = 0usize;
-    let mut total_rows = 0usize;
-    let mut total_columns = schema.fields().len();
-
-    for result in avro_reader {
-        let batch: RecordBatch = result?;
-        total_batches += 1;
-        total_rows += batch.num_rows();
-        total_columns = batch.num_columns();
-
-        println!(
-            "Batch {:>3}: rows = {:>6}, cols = {:>3}",
-            total_batches,
-            batch.num_rows(),
-            batch.num_columns()
-        );
-    }
-
-    println!();
-    println!("Done.");
-    println!("  Batches : {total_batches}");
-    println!("  Rows    : {total_rows}");
-    println!("  Columns : {total_columns}");
-
-    Ok(())
-}
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index 9367bc8efc..be8408a36d 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -15,9 +15,200 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro]
+//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro].
 //!
-//! [Apache Arrow]: https://arrow.apache.org
+//! This crate provides:
+//! - a [`reader`] that decodes Avro (Object Container Files, Avro 
Single‑Object encoding,
+//!   and Confluent Schema Registry wire format) into Arrow `RecordBatch`es,
+//! - and a [`writer`] that encodes Arrow `RecordBatch`es into Avro (OCF or 
raw Avro binary).
+//!
+//! If you’re new to Arrow or Avro, see:
+//! - Arrow project site: <https://arrow.apache.org/>
+//! - Avro 1.11.1 specification: 
<https://avro.apache.org/docs/1.11.1/specification/>
+//!
+//! ## Example: OCF (Object Container File) round‑trip
+//!
+//! The example below creates an Arrow table, writes an **Avro OCF** fully in 
memory,
+//! and then reads it back. OCF is a self‑describing file format that embeds 
the Avro
+//! schema in a header with optional compression and block sync markers.
+//! Spec: 
<https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
+//!
+//! ```
+//! use std::io::Cursor;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AvroWriter;
+//! use arrow_avro::reader::ReaderBuilder;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Build a tiny Arrow batch
+//! let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(schema.clone()),
+//!     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+//! )?;
+//!
+//! // Write an Avro **Object Container File** (OCF) to a Vec<u8>
+//! let sink: Vec<u8> = Vec::new();
+//! let mut w = AvroWriter::new(sink, schema.clone())?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let bytes = w.into_inner();
+//! assert!(!bytes.is_empty());
+//!
+//! // Read it back
+//! let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
+//! let out = r.next().unwrap()?;
+//! assert_eq!(out.num_rows(), 3);
+//! # Ok(()) }
+//! ```
+//!
+//! ## Quickstart: Confluent wire‑format round‑trip *(runnable)*
+//!
+//! The **Confluent Schema Registry wire format** prefixes each Avro message 
with a
+//! 1‑byte magic `0x00` and a **4‑byte big‑endian** schema ID, followed by the 
Avro body.
+//! See: 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//!
+//! In this round‑trip, we:
+//! 1) Use `AvroStreamWriter` to create a **raw Avro body** for a single‑row 
batch,
+//! 2) Wrap it with the Confluent prefix (magic and schema ID),
+//! 3) Decode it back to Arrow using a `Decoder` configured with a 
`SchemaStore` that
+//!    maps the schema ID to the Avro schema used by the writer.
+//!
+//! ```
+//! use std::collections::HashMap;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
+//! use arrow_avro::reader::ReaderBuilder;
+//! use arrow_avro::schema::{
+//!     AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm,
+//!     FingerprintStrategy, SCHEMA_METADATA_KEY
+//! };
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Writer schema registered under Schema Registry ID 1
+//! let avro_json = r#"{
+//!   "type":"record","name":"User",
+//!   "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]
+//! }"#;
+//!
+//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+//! let id: u32 = 1;
+//! store.set(Fingerprint::Id(id), AvroSchema::new(avro_json.to_string()))?;
+//!
+//! // Build an Arrow schema that references the same Avro JSON
+//! let mut md = HashMap::new();
+//! md.insert(SCHEMA_METADATA_KEY.to_string(), avro_json.to_string());
+//! let schema = Schema::new_with_metadata(
+//!     vec![
+//!         Field::new("id", DataType::Int64, false),
+//!         Field::new("name", DataType::Utf8, false),
+//!     ],
+//!     md,
+//! );
+//!
+//! // One‑row batch: { id: 42, name: "alice" }
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(schema.clone()),
+//!     vec![
+//!         Arc::new(Int64Array::from(vec![42])) as ArrayRef,
+//!         Arc::new(StringArray::from(vec!["alice"])) as ArrayRef,
+//!     ],
+//! )?;
+//!
+//! // Stream‑write a single record, letting the writer add the **Confluent** 
prefix.
+//! let sink: Vec<u8> = Vec::new();
+//! let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
+//!     .with_fingerprint_strategy(FingerprintStrategy::Id(id))
+//!     .build(sink)?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let frame = w.into_inner(); // already: 0x00 + 4B BE ID + Avro body
+//! assert!(frame.len() > 5);
+//!
+//! // Decode
+//! let mut dec = ReaderBuilder::new()
+//!   .with_writer_schema_store(store)
+//!   .build_decoder()?;
+//! dec.decode(&frame)?;
+//! let out = dec.flush()?.expect("one row");
+//! assert_eq!(out.num_rows(), 1);
+//! # Ok(()) }
+//! ```
+//!
+//! ## Quickstart: Avro Single‑Object Encoding round‑trip *(runnable)*
+//!
+//! Avro **Single‑Object Encoding (SOE)** wraps an Avro body with a 2‑byte 
marker
+//! `0xC3 0x01` and an **8‑byte little‑endian CRC‑64‑AVRO Rabin fingerprint** 
of the
+//! writer schema, then the Avro body. Spec:
+//! <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+//!
+//! This example registers the writer schema (computing a Rabin fingerprint), 
writes a
+//! single‑row Avro body (using `AvroStreamWriter`), constructs the SOE frame, 
and decodes it back to Arrow.
+//!
+//! ```
+//! use std::collections::HashMap;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
+//! use arrow_avro::reader::ReaderBuilder;
+//! use arrow_avro::schema::{AvroSchema, SchemaStore, FingerprintStrategy, 
SCHEMA_METADATA_KEY};
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Writer schema: { 
"type":"record","name":"User","fields":[{"name":"x","type":"long"}] }
+//! let writer_json = 
r#"{"type":"record","name":"User","fields":[{"name":"x","type":"long"}]}"#;
+//! let mut store = SchemaStore::new(); // Rabin CRC‑64‑AVRO by default
+//! let _fp = store.register(AvroSchema::new(writer_json.to_string()))?;
+//!
+//! // Build an Arrow schema that references the same Avro JSON
+//! let mut md = HashMap::new();
+//! md.insert(SCHEMA_METADATA_KEY.to_string(), writer_json.to_string());
+//! let schema = Schema::new_with_metadata(
+//!     vec![Field::new("x", DataType::Int64, false)],
+//!     md,
+//! );
+//!
+//! // One‑row batch: { x: 7 }
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(schema.clone()),
+//!     vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef],
+//! )?;
+//!
+//! // Stream‑write a single record; the writer adds **SOE** (C3 01 + Rabin) 
automatically.
+//! let sink: Vec<u8> = Vec::new();
+//! let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
+//!     .with_fingerprint_strategy(FingerprintStrategy::Rabin)
+//!     .build(sink)?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let frame = w.into_inner(); // already: C3 01 + 8B LE Rabin + Avro body
+//! assert!(frame.len() > 10);
+//!
+//! // Decode
+//! let mut dec = ReaderBuilder::new()
+//!   .with_writer_schema_store(store)
+//!   .build_decoder()?;
+//! dec.decode(&frame)?;
+//! let out = dec.flush()?.expect("one row");
+//! assert_eq!(out.num_rows(), 1);
+//! # Ok(()) }
+//! ```
+//!
+//! ---
+//!
+//! ### Modules
+//!
+//! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es.
+//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent).
+//! - [`schema`]: Avro schema parsing / fingerprints / registries.
+//! - [`compression`]: codecs used for OCF blocks (i.e., Deflate, Snappy, 
Zstandard).
+//! - [`codec`]: internal Avro↔Arrow type conversion and row decode/encode 
plans.
+//!
+//! [Apache Arrow]: https://arrow.apache.org/
 //! [Apache Avro]: https://avro.apache.org/
 
 #![doc(
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index bf72fc92c6..56a7bef17e 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -21,15 +21,15 @@
 //!
 //! This module exposes three layers of the API surface, from highest to 
lowest-level:
 //!
-//! * `ReaderBuilder`: configures how Avro is read (batch size, strict union 
handling,
+//! * [`ReaderBuilder`](crate::reader::ReaderBuilder): configures how Avro is 
read (batch size, strict union handling,
 //!   string representation, reader schema, etc.) and produces either:
 //!   * a `Reader` for **Avro Object Container Files (OCF)** read from any 
`BufRead`, or
 //!   * a low-level `Decoder` for **single‑object encoded** Avro bytes and 
Confluent
 //!     **Schema Registry** framed messages.
-//! * `Reader`: a convenient, synchronous iterator over `RecordBatch` decoded 
from an OCF
+//! * [`Reader`](crate::reader::Reader): a convenient, synchronous iterator 
over `RecordBatch` decoded from an OCF
 //!   input. Implements [`Iterator<Item = Result<RecordBatch, ArrowError>>`] 
and
 //!   `RecordBatchReader`.
-//! * `Decoder`: a push‑based row decoder that consumes raw Avro bytes and 
yields ready
+//! * [`Decoder`](crate::reader::Decoder): a push‑based row decoder that 
consumes raw Avro bytes and yields ready
 //!   `RecordBatch` values when batches fill. This is suitable for integrating 
with async
 //!   byte streams, network protocols, or other custom data sources.
 //!
@@ -37,45 +37,53 @@
 //!
 //! * **Object Container File (OCF)**: A self‑describing file format with a 
header containing
 //!   the writer schema, optional compression codec, and a sync marker, 
followed by one or
-//!   more data blocks. Use `Reader` for this format. See the Avro 
specification for the
-//!   structure of OCF headers and blocks. 
<https://avro.apache.org/docs/1.11.1/specification/>
+//!   more data blocks. Use `Reader` for this format. See the Avro 1.11.1 
specification
+//!   (“Object Container Files”). 
<https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
 //! * **Single‑Object Encoding**: A stream‑friendly framing that prefixes each 
record body with
-//!   the 2‑byte magic `0xC3 0x01` followed by a schema fingerprint. Use 
`Decoder` with a
-//!   populated `SchemaStore` to resolve fingerprints to full
-//!   schemas. <https://avro.apache.org/docs/1.11.1/specification/>
-//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a 
4‑byte big‑endian
-//!   schema ID, then the Avro‑encoded body. Use `Decoder` with a
-//!   `SchemaStore` configured for `FingerprintAlgorithm::None`
-//!   and entries keyed by `Fingerprint::Id`. Confluent docs
-//!   describe this framing.
+//!   the 2‑byte marker `0xC3 0x01` followed by the **8‑byte little‑endian 
CRC‑64‑AVRO Rabin
+//!   fingerprint** of the writer schema, then the Avro binary body. Use 
`Decoder` with a
+//!   populated `SchemaStore` to resolve fingerprints to full schemas.
+//!   See “Single object encoding” in the Avro 1.11.1 spec.
+//!   
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a 
**4‑byte big‑endian**
+//!   schema ID, then the Avro‑encoded body. Use `Decoder` with a 
`SchemaStore` configured
+//!   for `FingerprintAlgorithm::None` and entries keyed by `Fingerprint::Id`. 
See
+//!   Confluent’s “Wire format” documentation.
+//!   
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
 //!
 //! ## Basic file usage (OCF)
 //!
-//! Use `ReaderBuilder::build` to construct a `Reader` from any `BufRead`, 
such as a
-//! `BufReader<File>`. The reader yields `RecordBatch` values you can iterate 
over or collect.
+//! Use `ReaderBuilder::build` to construct a `Reader` from any `BufRead`. The 
doctest below
+//! creates a tiny OCF in memory using `AvroWriter` and then reads it back.
 //!
-//! ```no_run
-//! use std::fs::File;
-//! use std::io::BufReader;
-//! use arrow_array::RecordBatch;
+//! ```
+//! use std::io::Cursor;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AvroWriter;
 //! use arrow_avro::reader::ReaderBuilder;
 //!
-//! // Locate a test file (mirrors Arrow's test data layout)
-//! let path = "avro/alltypes_plain.avro";
-//! let path = std::env::var("ARROW_TEST_DATA")
-//!     .map(|dir| format!("{dir}/{path}"))
-//!     .unwrap_or_else(|_| format!("../testing/data/{path}"));
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Build a minimal Arrow schema and batch
+//! let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(schema.clone()),
+//!     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+//! )?;
 //!
-//! let file = File::open(path).unwrap();
-//! let mut reader = ReaderBuilder::new().build(BufReader::new(file)).unwrap();
+//! // Write an Avro OCF to memory
+//! let buffer: Vec<u8> = Vec::new();
+//! let mut writer = AvroWriter::new(buffer, schema.clone())?;
+//! writer.write(&batch)?;
+//! writer.finish()?;
+//! let bytes = writer.into_inner();
 //!
-//! // Iterate batches
-//! let mut num_rows = 0usize;
-//! while let Some(batch) = reader.next() {
-//!     let batch: RecordBatch = batch.unwrap();
-//!     num_rows += batch.num_rows();
-//! }
-//! println!("decoded {num_rows} rows");
+//! // Read it back with ReaderBuilder
+//! let mut reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+//! let out = reader.next().unwrap()?;
+//! assert_eq!(out.num_rows(), 3);
+//! # Ok(()) }
 //! ```
 //!
 //! ## Streaming usage (single‑object / Confluent)
@@ -88,7 +96,7 @@
 //! `futures` utilities. Note: this is illustrative and keeps a single 
in‑memory `Bytes`
 //! buffer for simplicity—real applications typically maintain a rolling 
buffer.
 //!
-//! ```no_run
+//! ```
 //! use bytes::{Buf, Bytes};
 //! use futures::{Stream, StreamExt};
 //! use std::task::{Poll, ready};
@@ -128,47 +136,298 @@
 //! }
 //! ```
 //!
-//! ### Building a `Decoder` for **single‑object encoding** (Rabin 
fingerprints)
+//! ### Building and using a `Decoder` for **single‑object encoding** (Rabin 
fingerprints)
 //!
-//! ```no_run
-//! use arrow_avro::schema::{AvroSchema, SchemaStore};
+//! The doctest below **writes** a single‑object framed record using the Avro 
writer
+//! (no manual varints) for the writer schema
+//! (`{"type":"record","name":"User","fields":[{"name":"id","type":"long"}]}`)
+//! and then decodes it into a `RecordBatch`.
+//!
+//! ```
+//! use std::sync::Arc;
+//! use std::collections::HashMap;
+//! use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::schema::{AvroSchema, SchemaStore, SCHEMA_METADATA_KEY, 
FingerprintStrategy};
+//! use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
 //! use arrow_avro::reader::ReaderBuilder;
 //!
-//! // Build a SchemaStore and register known writer schemas
-//! let mut store = SchemaStore::new(); // Rabin by default
-//! let user_schema = 
AvroSchema::new(r#"{"type":"record","name":"User","fields":[
-//!     
{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#.to_string());
-//! let _fp = store.register(user_schema).unwrap(); // computes Rabin 
CRC-64-AVRO
-//!
-//! // Build a Decoder that expects single-object encoding (0xC3 0x01 + 
fingerprint and body)
-//! let decoder = ReaderBuilder::new()
-//!     .with_writer_schema_store(store)
-//!     .with_batch_size(1024)
-//!     .build_decoder()
-//!     .unwrap();
-//! // Feed decoder with framed bytes (not shown; see `decode_stream` above).
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Register the writer schema (Rabin fingerprint by default).
+//! let mut store = SchemaStore::new();
+//! let avro_schema = 
AvroSchema::new(r#"{"type":"record","name":"User","fields":[
+//!   {"name":"id","type":"long"}]}"#.to_string());
+//! let _fp = store.register(avro_schema.clone())?;
+//!
+//! // Create a single-object framed record { id: 42 } with the Avro writer.
+//! let mut md = HashMap::new();
+//! md.insert(SCHEMA_METADATA_KEY.to_string(), 
avro_schema.json_string.clone());
+//! let arrow = Schema::new_with_metadata(vec![Field::new("id", 
DataType::Int64, false)], md);
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(arrow.clone()),
+//!     vec![Arc::new(Int64Array::from(vec![42])) as ArrayRef],
+//! )?;
+//! let mut w = WriterBuilder::new(arrow)
+//!     .with_fingerprint_strategy(FingerprintStrategy::Rabin) // SOE prefix
+//!     .build::<_, AvroBinaryFormat>(Vec::new())?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let frame = w.into_inner(); // C3 01 + fp + Avro body
+//!
+//! // Decode with a `Decoder`
+//! let mut dec = ReaderBuilder::new()
+//!   .with_writer_schema_store(store)
+//!   .with_batch_size(1024)
+//!   .build_decoder()?;
+//!
+//! dec.decode(&frame)?;
+//! let out = dec.flush()?.expect("one batch");
+//! assert_eq!(out.num_rows(), 1);
+//! # Ok(()) }
 //! ```
 //!
-//! ### Building a `Decoder` for **Confluent Schema Registry** framed messages
+//! See Avro 1.11.1 “Single object encoding” for details of the 2‑byte marker
+//! and little‑endian CRC‑64‑AVRO fingerprint:
+//! <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+//!
+//! ### Building and using a `Decoder` for **Confluent Schema Registry** 
framing
+//!
+//! The Confluent wire format is: 1‑byte magic `0x00`, then a **4‑byte 
big‑endian** schema ID,
+//! then the Avro body. The doctest below crafts two messages for the same 
schema ID and
+//! decodes them into a single `RecordBatch` with two rows.
 //!
-//! ```no_run
-//! use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, 
FingerprintAlgorithm};
+//! ```
+//! use std::sync::Arc;
+//! use std::collections::HashMap;
+//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, 
FingerprintAlgorithm, SCHEMA_METADATA_KEY, FingerprintStrategy};
+//! use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
 //! use arrow_avro::reader::ReaderBuilder;
 //!
-//! // Confluent wire format uses a magic 0x00 byte + 4-byte schema id 
(big-endian).
-//! // Create a store keyed by `Fingerprint::Id` and pre-populate with known 
schemas.
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Set up a store keyed by numeric IDs (Confluent).
 //! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+//! let schema_id = 7u32;
+//! let avro_schema = 
AvroSchema::new(r#"{"type":"record","name":"User","fields":[
+//!   {"name":"id","type":"long"}, 
{"name":"name","type":"string"}]}"#.to_string());
+//! store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
+//!
+//! // Write two Confluent-framed messages {id:1,name:"a"} and {id:2,name:"b"}.
+//! fn msg(id: i64, name: &str, schema: &AvroSchema, schema_id: u32) -> 
Result<Vec<u8>, Box<dyn std::error::Error>> {
+//!     let mut md = HashMap::new();
+//!     md.insert(SCHEMA_METADATA_KEY.to_string(), schema.json_string.clone());
+//!     let arrow = Schema::new_with_metadata(
+//!         vec![Field::new("id", DataType::Int64, false), Field::new("name", 
DataType::Utf8, false)],
+//!         md,
+//!     );
+//!     let batch = RecordBatch::try_new(
+//!         Arc::new(arrow.clone()),
+//!         vec![
+//!           Arc::new(Int64Array::from(vec![id])) as ArrayRef,
+//!           Arc::new(StringArray::from(vec![name])) as ArrayRef,
+//!         ],
+//!     )?;
+//!     let mut w = WriterBuilder::new(arrow)
+//!         .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) // 
0x00 + ID + body
+//!         .build::<_, AvroBinaryFormat>(Vec::new())?;
+//!     w.write(&batch)?; w.finish()?;
+//!     Ok(w.into_inner())
+//! }
+//! let m1 = msg(1, "a", &avro_schema, schema_id)?;
+//! let m2 = msg(2, "b", &avro_schema, schema_id)?;
+//!
+//! // Decode both into a single batch.
+//! let mut dec = ReaderBuilder::new()
+//!   .with_writer_schema_store(store)
+//!   .with_batch_size(1024)
+//!   .build_decoder()?;
+//! dec.decode(&m1)?;
+//! dec.decode(&m2)?;
+//! let batch = dec.flush()?.expect("batch");
+//! assert_eq!(batch.num_rows(), 2);
+//! # Ok(()) }
+//! ```
+//!
+//! See Confluent’s “Wire format” notes: magic byte `0x00`, 4‑byte 
**big‑endian** schema ID,
+//! then the Avro‑encoded payload.
+//! 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//!
+//! ## Schema resolution (reader vs. writer schemas)
+//!
+//! Avro supports resolving data written with one schema (“writer”) into 
another (“reader”)
+//! using rules like **field aliases**, **default values**, and **numeric 
promotions**.
+//! In practice this lets you evolve schemas over time while remaining 
compatible with old data.
+//!
+//! *Spec background:* See Avro’s **Schema Resolution** (aliases, defaults) 
and the Confluent
+//! **Wire format** (magic `0x00` + big‑endian schema id + Avro body).
+//! <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
+//! 
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//!
+//! ### OCF example: rename a field and add a default via a reader schema
+//!
+//! Below we write an OCF with a *writer schema* having fields `id: long`, 
`name: string`.
+//! We then read it with a *reader schema* that:
+//! - **renames** `name` to `full_name` via `aliases`, and
+//! - **adds** `is_active: boolean` with a **default** value `true`.
+//!
+//! ```
+//! use std::io::Cursor;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AvroWriter;
+//! use arrow_avro::reader::ReaderBuilder;
+//! use arrow_avro::schema::AvroSchema;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Writer (past version): { id: long, name: string }
+//! let writer_arrow = Schema::new(vec![
+//!     Field::new("id", DataType::Int64, false),
+//!     Field::new("name", DataType::Utf8, false),
+//! ]);
+//! let batch = RecordBatch::try_new(
+//!     Arc::new(writer_arrow.clone()),
+//!     vec![
+//!         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
+//!         Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+//!     ],
+//! )?;
+//!
+//! // Write an OCF entirely in memory
+//! let mut w = AvroWriter::new(Vec::<u8>::new(), writer_arrow)?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let bytes = w.into_inner();
+//!
+//! // Reader (current version):
+//! //  - record name "topLevelRecord" matches the crate's default for OCF
+//! //  - rename `name` -> `full_name` using aliases (optional)
+//! let reader_json = r#"
+//! {
+//!   "type": "record",
+//!   "name": "topLevelRecord",
+//!   "fields": [
+//!     { "name": "id", "type": "long" },
+//!     { "name": "full_name", "type": ["null","string"], "aliases": ["name"], 
"default": null },
+//!     { "name": "is_active", "type": "boolean", "default": true }
+//!   ]
+//! }"#;
 //!
-//! // Suppose registry ID 42 corresponds to this Avro schema:
-//! let avro = AvroSchema::new(r#"{"type":"string"}"#.to_string());
-//! store.set(Fingerprint::Id(42), avro).unwrap();
+//! let mut reader = ReaderBuilder::new()
+//!   .with_reader_schema(AvroSchema::new(reader_json.to_string()))
+//!   .build(Cursor::new(bytes))?;
 //!
-//! // Build a Decoder that understands Confluent framing
-//! let decoder = ReaderBuilder::new()
-//!     .with_writer_schema_store(store)
-//!     .build_decoder()
-//!     .unwrap();
-//! // Feed decoder with 0x00 + [id:4] + Avro body frames.
+//! let out = reader.next().unwrap()?;
+//! assert_eq!(out.num_rows(), 2);
+//! # Ok(()) }
+//! ```
+//!
+//! ### Confluent single‑object example: resolve *past* writer versions to the 
topic’s **current** reader schema
+//!
+//! In this scenario, the **reader schema** is the topic’s *current* schema, 
while the two
+//! **writer schemas** registered under Confluent IDs **1** and **2** 
represent *past versions*.
+//! The decoder uses the reader schema to resolve both versions.
+//!
+//! ```
+//! use std::sync::Arc;
+//! use std::collections::HashMap;
+//! use arrow_avro::reader::ReaderBuilder;
+//! use arrow_avro::schema::{
+//!     AvroSchema, Fingerprint, FingerprintAlgorithm, SchemaStore,
+//!     SCHEMA_METADATA_KEY, FingerprintStrategy,
+//! };
+//! use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray, 
RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//!
+//! fn main() -> Result<(), Box<dyn std::error::Error>> {
+//!     // Reader: current topic schema (no reader-added fields)
+//!     //   {"type":"record","name":"User","fields":[
+//!     //     {"name":"id","type":"long"},
+//!     //     {"name":"name","type":"string"}]}
+//!     let reader_schema = AvroSchema::new(
+//!         r#"{"type":"record","name":"User",
+//!             
"fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#
+//!             .to_string(),
+//!     );
+//!
+//!     // Register two *writer* schemas under Confluent IDs 0 and 1
+//!     let writer_v0 = AvroSchema::new(
+//!         r#"{"type":"record","name":"User",
+//!             
"fields":[{"name":"id","type":"int"},{"name":"name","type":"string"}]}"#
+//!             .to_string(),
+//!     );
+//!     let writer_v1 = AvroSchema::new(
+//!         r#"{"type":"record","name":"User",
+//!             
"fields":[{"name":"id","type":"long"},{"name":"name","type":"string"},
+//!                       
{"name":"email","type":["null","string"],"default":null}]}"#
+//!             .to_string(),
+//!     );
+//!
+//!     let id_v0: u32 = 0;
+//!     let id_v1: u32 = 1;
+//!
+//!     let mut store = 
SchemaStore::new_with_type(FingerprintAlgorithm::None); // integer IDs
+//!     store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
+//!     store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
+//!
+//!     // Write two Confluent-framed messages using each writer version
+//!     // frame0: writer v0 body {id:1001_i32, name:"v0-alice"}
+//!     let mut md0 = HashMap::new();
+//!     md0.insert(SCHEMA_METADATA_KEY.to_string(), 
writer_v0.json_string.clone());
+//!     let arrow0 = Schema::new_with_metadata(
+//!         vec![Field::new("id", DataType::Int32, false),
+//!              Field::new("name", DataType::Utf8, false)], md0);
+//!     let batch0 = RecordBatch::try_new(
+//!         Arc::new(arrow0.clone()),
+//!         vec![Arc::new(Int32Array::from(vec![1001])) as ArrayRef,
+//!              Arc::new(StringArray::from(vec!["v0-alice"])) as ArrayRef])?;
+//!     let mut w0 = arrow_avro::writer::WriterBuilder::new(arrow0)
+//!         .with_fingerprint_strategy(FingerprintStrategy::Id(id_v0))
+//!         .build::<_, 
arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?;
+//!     w0.write(&batch0)?; w0.finish()?;
+//!     let frame0 = w0.into_inner(); // 0x00 + id_v0 + body
+//!
+//!     // frame1: writer v1 body {id:2002_i64, name:"v1-bob", email: 
Some("[email protected]")}
+//!     let mut md1 = HashMap::new();
+//!    md1.insert(SCHEMA_METADATA_KEY.to_string(), 
writer_v1.json_string.clone());
+//!     let arrow1 = Schema::new_with_metadata(
+//!         vec![Field::new("id", DataType::Int64, false),
+//!              Field::new("name", DataType::Utf8, false),
+//!              Field::new("email", DataType::Utf8, true)], md1);
+//!     let batch1 = RecordBatch::try_new(
+//!         Arc::new(arrow1.clone()),
+//!         vec![Arc::new(Int64Array::from(vec![2002])) as ArrayRef,
+//!              Arc::new(StringArray::from(vec!["v1-bob"])) as ArrayRef,
+//!              Arc::new(StringArray::from(vec![Some("[email protected]")])) 
as ArrayRef])?;
+//!     let mut w1 = arrow_avro::writer::WriterBuilder::new(arrow1)
+//!         .with_fingerprint_strategy(FingerprintStrategy::Id(id_v1))
+//!         .build::<_, 
arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?;
+//!     w1.write(&batch1)?; w1.finish()?;
+//!     let frame1 = w1.into_inner(); // 0x00 + id_v1 + body
+//!
+//!     // Build a streaming Decoder that understands Confluent framing
+//!     let mut decoder = ReaderBuilder::new()
+//!         .with_reader_schema(reader_schema)
+//!         .with_writer_schema_store(store)
+//!         .with_batch_size(8) // small demo batches
+//!         .build_decoder()?;
+//!
+//!     // Decode each whole frame, then drain completed rows with flush()
+//!     let mut total_rows = 0usize;
+//!
+//!     let consumed0 = decoder.decode(&frame0)?;
+//!     assert_eq!(consumed0, frame0.len(), "decoder must consume the whole 
frame");
+//!     while let Some(batch) = decoder.flush()? { total_rows += 
batch.num_rows(); }
+//!
+//!     let consumed1 = decoder.decode(&frame1)?;
+//!     assert_eq!(consumed1, frame1.len(), "decoder must consume the whole 
frame");
+//!     while let Some(batch) = decoder.flush()? { total_rows += 
batch.num_rows(); }
+//!
+//!     // We sent 2 records so we should get 2 rows (possibly one per flush)
+//!     assert_eq!(total_rows, 2);
+//!     Ok(())
+//! }
 //! ```
 //!
 //! ## Schema evolution and batch boundaries
@@ -191,7 +450,7 @@
 //!   amortize per‑batch overhead; smaller batches reduce peak memory usage 
and latency.
 //! * When `utf8_view` is enabled, string columns use Arrow’s 
`StringViewArray`, which can
 //!   reduce allocations for short strings.
-//! * For OCF, blocks may be compressed `Reader` will decompress using the 
codec specified
+//! * For OCF, blocks may be compressed; `Reader` will decompress using the 
codec specified
 //!   in the file header and feed uncompressed bytes to the row `Decoder`.
 //!
 //! ## Error handling
@@ -242,7 +501,6 @@ fn read_header<R: BufRead>(mut reader: R) -> Result<Header, 
ArrowError> {
     })
 }
 
-// NOTE: The Current ` is_incomplete_data ` below is temporary and will be 
improved prior to public release
 fn is_incomplete_data(err: &ArrowError) -> bool {
     matches!(
         err,
@@ -287,40 +545,91 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
 ///
 /// ### Examples
 ///
-/// Build a `Decoder` for single‑object encoding using a `SchemaStore` with 
Rabin fingerprints:
+/// Build and use a `Decoder` for single‑object encoding:
 ///
-/// ```no_run
+/// ```
 /// use arrow_avro::schema::{AvroSchema, SchemaStore};
 /// use arrow_avro::reader::ReaderBuilder;
 ///
-/// let mut store = SchemaStore::new(); // Rabin by default
-/// let avro = AvroSchema::new(r#""string""#.to_string());
-/// let _fp = store.register(avro).unwrap();
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// // Use a record schema at the top level so we can build an Arrow 
RecordBatch
+/// let mut store = SchemaStore::new(); // Rabin fingerprinting by default
+/// let avro = AvroSchema::new(
+///     
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()
+/// );
+/// let fp = store.register(avro)?;
+///
+/// // --- Hidden: write a single-object framed row {x:7} ---
+/// # use std::sync::Arc;
+/// # use std::collections::HashMap;
+/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+/// # use arrow_schema::{DataType, Field, Schema};
+/// # use arrow_avro::schema::{SCHEMA_METADATA_KEY, FingerprintStrategy};
+/// # use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
+/// # let mut md = HashMap::new();
+/// # md.insert(SCHEMA_METADATA_KEY.to_string(),
+/// #     
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string());
+/// # let arrow = Schema::new_with_metadata(vec![Field::new("x", 
DataType::Int64, false)], md);
+/// # let batch = RecordBatch::try_new(Arc::new(arrow.clone()), 
vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef])?;
+/// # let mut w = WriterBuilder::new(arrow)
+/// #     .with_fingerprint_strategy(fp.into())
+/// #     .build::<_, AvroBinaryFormat>(Vec::new())?;
+/// # w.write(&batch)?; w.finish()?; let frame = w.into_inner();
 ///
 /// let mut decoder = ReaderBuilder::new()
 ///     .with_writer_schema_store(store)
-///     .with_batch_size(512)
-///     .build_decoder()
-///     .unwrap();
+///     .with_batch_size(16)
+///     .build_decoder()?;
 ///
-/// // Feed bytes (framed as 0xC3 0x01 + fingerprint and body)
-/// // decoder.decode(&bytes)?;
-/// // if let Some(batch) = decoder.flush()? { /* process */ }
+/// # decoder.decode(&frame)?;
+/// let batch = decoder.flush()?.expect("one row");
+/// assert_eq!(batch.num_rows(), 1);
+/// # Ok(()) }
 /// ```
 ///
-/// Build a `Decoder` for Confluent Registry messages (magic 0x00 + 4‑byte id):
+/// *Background:* Avro's single‑object encoding is defined as `0xC3 0x01` + 
8‑byte
+/// little‑endian CRC‑64‑AVRO fingerprint of the **writer schema** + Avro 
binary body.
+/// See the Avro 1.11.1 spec for details. 
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
 ///
-/// ```no_run
+/// Build and use a `Decoder` for Confluent Registry messages:
+///
+/// ```
 /// use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, 
FingerprintAlgorithm};
 /// use arrow_avro::reader::ReaderBuilder;
 ///
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
 /// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
-/// store.set(Fingerprint::Id(7), 
AvroSchema::new(r#""long""#.to_string())).unwrap();
+/// store.set(Fingerprint::Id(1234), 
AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;
+///
+/// // --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---
+/// # use std::sync::Arc;
+/// # use std::collections::HashMap;
+/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+/// # use arrow_schema::{DataType, Field, Schema};
+/// # use arrow_avro::schema::{SCHEMA_METADATA_KEY, FingerprintStrategy};
+/// # use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
+/// # fn msg(x: i64) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
+/// #   let mut md = HashMap::new();
+/// #   md.insert(SCHEMA_METADATA_KEY.to_string(),
+/// #     
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string());
+/// #   let arrow = Schema::new_with_metadata(vec![Field::new("x", 
DataType::Int64, false)], md);
+/// #   let batch = RecordBatch::try_new(Arc::new(arrow.clone()), 
vec![Arc::new(Int64Array::from(vec![x])) as ArrayRef])?;
+/// #   let mut w = WriterBuilder::new(arrow)
+/// #       .with_fingerprint_strategy(FingerprintStrategy::Id(1234))
+/// #       .build::<_, AvroBinaryFormat>(Vec::new())?;
+/// #   w.write(&batch)?; w.finish()?; Ok(w.into_inner())
+/// # }
+/// # let m1 = msg(1)?;
+/// # let m2 = msg(2)?;
 ///
 /// let mut decoder = ReaderBuilder::new()
 ///     .with_writer_schema_store(store)
-///     .build_decoder()
-///     .unwrap();
+///     .build_decoder()?;
+/// # decoder.decode(&m1)?;
+/// # decoder.decode(&m2)?;
+/// let batch = decoder.flush()?.expect("two rows");
+/// assert_eq!(batch.num_rows(), 2);
+/// # Ok(()) }
 /// ```
 #[derive(Debug)]
 pub struct Decoder {
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 42c6d8a6c3..3fdfbda1da 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -345,27 +345,19 @@ impl AvroSchema {
         Self { json_string }
     }
 
-    /// Deserializes and returns the `AvroSchema`.
-    ///
-    /// The returned schema borrows from `self`.
-    pub fn schema(&self) -> Result<Schema<'_>, ArrowError> {
+    pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
         serde_json::from_str(self.json_string.as_str())
             .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema 
JSON: {e}")))
     }
 
-    /// Returns the fingerprint of the schema.
-    pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> 
Result<Fingerprint, ArrowError> {
-        Self::generate_fingerprint(&self.schema()?, hash_type)
-    }
-
-    /// Generates a fingerprint for the given `Schema` using the specified 
[`FingerprintAlgorithm`].
+    /// Returns the fingerprint of the schema, computed using the specified 
[`FingerprintAlgorithm`].
     ///
     /// The fingerprint is computed over the schema's Parsed Canonical Form
     /// as defined by the Avro specification. Depending on `hash_type`, this
     /// will return one of the supported [`Fingerprint`] variants:
     /// - [`Fingerprint::Rabin`] for [`FingerprintAlgorithm::Rabin`]
-    /// - [`Fingerprint::MD5`] for [`FingerprintAlgorithm::MD5`]
-    /// - [`Fingerprint::SHA256`] for [`FingerprintAlgorithm::SHA256`]
+    /// - `Fingerprint::MD5` for `FingerprintAlgorithm::MD5`
+    /// - `Fingerprint::SHA256` for `FingerprintAlgorithm::SHA256`
     ///
     /// Note: [`FingerprintAlgorithm::None`] cannot be used to generate a 
fingerprint
     /// and will result in an error. If you intend to use a Schema Registry 
ID-based
@@ -375,18 +367,21 @@ impl AvroSchema {
     /// See also: 
<https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
     ///
     /// # Errors
-    /// Returns an error if generating the canonical form of the schema fails,
-    /// or if `hash_type` is [`FingerprintAlgorithm::None`].
+    /// Returns an error if deserializing the schema fails, if generating the
+    /// canonical form of the schema fails, or if `hash_type` is 
[`FingerprintAlgorithm::None`].
     ///
     /// # Examples
-    /// ```no_run
+    /// ```
     /// use arrow_avro::schema::{AvroSchema, FingerprintAlgorithm};
     ///
     /// let avro = AvroSchema::new("\"string\"".to_string());
-    /// let schema = avro.schema().unwrap();
-    /// let fp = AvroSchema::generate_fingerprint(&schema, 
FingerprintAlgorithm::Rabin).unwrap();
+    /// let fp = avro.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
     /// ```
-    pub fn generate_fingerprint(
+    pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> 
Result<Fingerprint, ArrowError> {
+        Self::generate_fingerprint(&self.schema()?, hash_type)
+    }
+
+    pub(crate) fn generate_fingerprint(
         schema: &Schema,
         hash_type: FingerprintAlgorithm,
     ) -> Result<Fingerprint, ArrowError> {
@@ -432,7 +427,7 @@ impl AvroSchema {
     /// Avro specification.
     ///
     /// 
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
-    pub fn generate_canonical_form(schema: &Schema) -> Result<String, 
ArrowError> {
+    pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String, 
ArrowError> {
         build_canonical(schema, None)
     }
 
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index 7a7b0d2837..ad104f93b8 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -19,19 +19,44 @@
 //!
 //! # Overview
 //!
-//! *   Use **`AvroWriter`** (Object Container File) when you want a
-//!     self‑contained Avro file with header, schema JSON, optional 
compression,
-//!     blocks, and sync markers.
-//! *   Use **`AvroStreamWriter`** (raw binary stream) when you already know 
the
-//!     schema out‑of‑band (i.e., via a schema registry) and need a stream
-//!     of Avro‑encoded records with minimal framing.
+//! Use this module to serialize Arrow `RecordBatch` values into Avro. Two 
output
+//! formats are supported:
 //!
-
-/// Encodes `RecordBatch` into the Avro binary format.
-pub mod encoder;
-/// Logic for different Avro container file formats.
-pub mod format;
-
+//! * **[`AvroWriter`](crate::writer::AvroWriter)** — writes an **Object 
Container File (OCF)**: a self‑describing
+//!   file with header (schema JSON + metadata), optional compression, data 
blocks, and
+//!   sync markers. See Avro 1.11.1 “Object Container Files.”
+//!   
<https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
+//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a 
**raw Avro binary stream** (“datum” bytes) without
+//!   any container framing. This is useful when the schema is known 
out‑of‑band (i.e.,
+//!   via a registry) and you want minimal overhead.
+//!
+//! ## Which format should I use?
+//!
+//! * Use **OCF** when you need a portable, self‑contained file. The schema 
travels with
+//!   the data, making it easy to read elsewhere.
+//! * Use the **raw stream** when your surrounding protocol supplies schema 
information
+//!   (i.e., a schema registry). If you need **single‑object encoding (SOE)** 
or Confluent
+//!   **Schema Registry** framing, you must add the appropriate prefix 
*outside* this writer:
+//!   - **SOE**: `0xC3 0x01` + 8‑byte little‑endian CRC‑64‑AVRO fingerprint + 
Avro body
+//!     (see Avro 1.11.1 “Single object encoding”).
+//!     
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+//!   - **Confluent wire format**: magic `0x00` + **big‑endian** 4‑byte schema 
ID and Avro body.
+//!     
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//!
+//! ## Choosing the Avro schema
+//!
+//! By default, the writer converts your Arrow schema to Avro (including a 
top‑level record
+//! name) and stores the resulting JSON under the `avro::schema` metadata key. 
If you already
+//! have an Avro schema JSON, you want to use verbatim, put it into the Arrow 
schema metadata
+//! under the same key before constructing the writer. The builder will pick 
it up.
+//!
+//! ## Compression
+//!
+//! For OCF, you may enable a compression codec via 
`WriterBuilder::with_compression`. The
+//! chosen codec is written into the file header and used for subsequent 
blocks. Raw stream
+//! writing doesn’t apply container‑level compression.
+//!
+//! ---
 use crate::codec::AvroFieldBuilder;
 use crate::compression::CompressionCodec;
 use crate::schema::{
@@ -44,6 +69,11 @@ use arrow_schema::{ArrowError, Schema};
 use std::io::Write;
 use std::sync::Arc;
 
+/// Encodes `RecordBatch` into the Avro binary format.
+pub mod encoder;
+/// Logic for different Avro container file formats.
+pub mod format;
+
 /// Builder to configure and create a `Writer`.
 #[derive(Debug, Clone)]
 pub struct WriterBuilder {
@@ -55,6 +85,11 @@ pub struct WriterBuilder {
 
 impl WriterBuilder {
     /// Create a new builder with default settings.
+    ///
+    /// The Avro schema used for writing is determined as follows:
+    /// 1) If the Arrow schema metadata contains `avro::schema` (see 
`SCHEMA_METADATA_KEY`),
+    ///    that JSON is used verbatim.
+    /// 2) Otherwise, the Arrow schema is converted to an Avro record schema.
     pub fn new(schema: Schema) -> Self {
         Self {
             schema,
@@ -95,7 +130,6 @@ impl WriterBuilder {
             Some(json) => AvroSchema::new(json.clone()),
             None => AvroSchema::try_from(&self.schema)?,
         };
-
         let maybe_fingerprint = if F::NEEDS_PREFIX {
             match self.fingerprint_strategy {
                 Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
@@ -110,7 +144,6 @@ impl WriterBuilder {
         } else {
             None
         };
-
         let mut md = self.schema.metadata().clone();
         md.insert(
             SCHEMA_METADATA_KEY.to_string(),
@@ -134,6 +167,12 @@ impl WriterBuilder {
 }
 
 /// Generic Avro writer.
+///
+/// This type is generic over the output Write sink (`W`) and the Avro format 
(`F`).
+/// You’ll usually use the concrete aliases:
+///
+/// * **[`AvroWriter`]** for **OCF** (self‑describing container file)
+/// * **[`AvroStreamWriter`]** for **raw** Avro binary streams
 #[derive(Debug)]
 pub struct Writer<W: Write, F: AvroFormat> {
     writer: W,
@@ -145,12 +184,105 @@ pub struct Writer<W: Write, F: AvroFormat> {
 }
 
 /// Alias for an Avro **Object Container File** writer.
+///
+/// ### Quickstart (runnable)
+///
+/// ```
+/// use std::io::Cursor;
+/// use std::sync::Arc;
+/// use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
+/// use arrow_schema::{DataType, Field, Schema};
+/// use arrow_avro::writer::AvroWriter;
+/// use arrow_avro::reader::ReaderBuilder;
+///
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// // Writer schema: { id: long, name: string }
+/// let writer_schema = Schema::new(vec![
+///     Field::new("id", DataType::Int64, false),
+///     Field::new("name", DataType::Utf8, false),
+/// ]);
+///
+/// // Build a RecordBatch with two rows
+/// let batch = RecordBatch::try_new(
+///     Arc::new(writer_schema.clone()),
+///     vec![
+///         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
+///         Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+///     ],
+/// )?;
+///
+/// // Write an Avro **Object Container File** (OCF) to memory
+/// let mut w = AvroWriter::new(Vec::<u8>::new(), writer_schema.clone())?;
+/// w.write(&batch)?;
+/// w.finish()?;
+/// let bytes = w.into_inner();
+///
+/// // Build a Reader and decode the batch back
+/// let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
+/// let out = r.next().unwrap()?;
+/// assert_eq!(out.num_rows(), 2);
+/// # Ok(()) }
+/// ```
 pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
+
 /// Alias for a raw Avro **binary stream** writer.
+///
+/// ### Example
+///
+/// This writes only the **Avro body** bytes — no OCF header/sync and no
+/// single‑object or Confluent framing. If you need those frames, add them 
externally.
+///
+/// ```
+/// use std::sync::Arc;
+/// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+/// use arrow_schema::{DataType, Field, Schema};
+/// use arrow_avro::writer::AvroStreamWriter;
+///
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// // One‑column Arrow batch
+/// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
+/// let batch = RecordBatch::try_new(
+///     Arc::new(schema.clone()),
+///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
+/// )?;
+///
+/// // Write a raw Avro stream to a Vec<u8>
+/// let sink: Vec<u8> = Vec::new();
+/// let mut w = AvroStreamWriter::new(sink, schema)?;
+/// w.write(&batch)?;
+/// w.finish()?;
+/// let bytes = w.into_inner();
+/// assert!(!bytes.is_empty());
+/// # Ok(()) }
+/// ```
 pub type AvroStreamWriter<W> = Writer<W, AvroBinaryFormat>;
 
 impl<W: Write> Writer<W, AvroOcfFormat> {
     /// Convenience constructor – same as [`WriterBuilder::build`] with 
`AvroOcfFormat`.
+    ///
+    /// ### Example
+    ///
+    /// ```
+    /// use std::sync::Arc;
+    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+    /// use arrow_schema::{DataType, Field, Schema};
+    /// use arrow_avro::writer::AvroWriter;
+    ///
+    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, 
false)]);
+    /// let batch = RecordBatch::try_new(
+    ///     Arc::new(schema.clone()),
+    ///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+    /// )?;
+    ///
+    /// let buf: Vec<u8> = Vec::new();
+    /// let mut w = AvroWriter::new(buf, schema)?;
+    /// w.write(&batch)?;
+    /// w.finish()?;
+    /// let bytes = w.into_inner();
+    /// assert!(!bytes.is_empty());
+    /// # Ok(()) }
+    /// ```
     pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
         WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
     }
@@ -163,6 +295,33 @@ impl<W: Write> Writer<W, AvroOcfFormat> {
 
 impl<W: Write> Writer<W, AvroBinaryFormat> {
     /// Convenience constructor to create a new [`AvroStreamWriter`].
+    ///
+    /// The resulting stream contains just **Avro binary** bodies (no OCF 
header/sync and no
+    /// single‑object or Confluent framing). If you need those frames, add 
them externally.
+    ///
+    /// ### Example
+    ///
+    /// ```
+    /// use std::sync::Arc;
+    /// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+    /// use arrow_schema::{DataType, Field, Schema};
+    /// use arrow_avro::writer::AvroStreamWriter;
+    ///
+    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+    /// let schema = Schema::new(vec![Field::new("x", DataType::Int64, 
false)]);
+    /// let batch = RecordBatch::try_new(
+    ///     Arc::new(schema.clone()),
+    ///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
+    /// )?;
+    ///
+    /// let sink: Vec<u8> = Vec::new();
+    /// let mut w = AvroStreamWriter::new(sink, schema)?;
+    /// w.write(&batch)?;
+    /// w.finish()?;
+    /// let bytes = w.into_inner();
+    /// assert!(!bytes.is_empty());
+    /// # Ok(()) }
+    /// ```
     pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
         WriterBuilder::new(schema).build::<W, AvroBinaryFormat>(writer)
     }

Reply via email to