This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new eb10a423a7 Add arrow-avro examples and Reader documentation (#8316)
eb10a423a7 is described below

commit eb10a423a7cd8218cabe902dda9640ffacc0b592
Author: Connor Sanders <[email protected]>
AuthorDate: Sat Sep 13 07:56:49 2025 -0500

    Add arrow-avro examples and Reader documentation (#8316)
    
    # Which issue does this PR close?
    
    - **Related to**: #4886 (“Add Avro Support”)
    
    # Rationale for this change
    
    Working, end‑to‑end examples and clearer documentation make it much
    easier for users to adopt `arrow-avro` for common Avro ingestion paths
    (OCF files, Single‑Object framing, Confluent Schema Registry). This PR
    adds runnable examples that demonstrate typical patterns: projection via
    a reader schema, schema evolution, and streaming decode. It also expands
    module and type docs to explain trade‑offs and performance
    considerations.
    
    It also centralizes a default record‑name string as a constant to reduce
    duplication and potential drift in the codebase
    
    # What changes are included in this PR?
    
    ## New examples under `arrow-avro/examples/`
    
    * `read_avro_ocf.rs`: Read Avro OCF into Arrow RecordBatches with
    ReaderBuilder, including knobs for batch size, UTF‑8 handling, and
    strict mode; shows projection via a JSON reader schema.
    * `read_ocf_with_resolution.rs`: Demonstrates resolving older writer
    schemas to a current reader schema (schema evolution/projection).
    * `write_avro_ocf.rs`: Minimal example for writing Arrow data to Avro
    OCF.
    * `decode_stream.rs`: Build a streaming Decoder
    (ReaderBuilder::build_decoder), register writer schemas keyed by
    Single‑Object Rabin fingerprints, and decode generated frames.
    * `decode_kafka_stream.rs`: Decode Confluent Schema Registry–framed
    messages (0x00 magic, 4‑byte big‑endian schema ID, Avro body) while
    resolving older writer schemas against a current reader schema.
    
    ## Documentation improvements
    
    * Expanded `arrow-avro` module‑level docs and Decoder docs with usage
    examples for OCF, Single‑Object, and Confluent wire formats; added notes
    on schema evolution, streaming, and performance considerations.
    
    ## Maintenance tweak
    
    * Added `AVRO_ROOT_RECORD_DEFAULT_NAME` in schema.rs to centralize the
    default root record name. (Reduces literal duplication; no behavior
    change intended.)
    
    # Are these changes tested?
    
    * A unit test was added to `arrow-avro/src/codec.rs` to cover the
    addition of `AVRO_ROOT_RECORD_DEFAULT_NAME`.
    * No other tests were added in this PR because the work is primarily
    documentation and runnable examples. The examples themselves are
    intended to be compiled and executed by users as living documentation.
    
    # Are there any user-facing changes?
    
    N/A
---
 arrow-avro/examples/decode_kafka_stream.rs      | 233 +++++++++++++
 arrow-avro/examples/decode_stream.rs            | 104 ++++++
 arrow-avro/examples/read_avro_ocf.rs            |  71 ++++
 arrow-avro/examples/read_ocf_with_resolution.rs |  96 ++++++
 arrow-avro/examples/write_avro_ocf.rs           | 113 ++++++
 arrow-avro/src/codec.rs                         |  15 +-
 arrow-avro/src/reader/mod.rs                    | 439 +++++++++++++++++++-----
 arrow-avro/src/schema.rs                        |  11 +-
 8 files changed, 999 insertions(+), 83 deletions(-)

diff --git a/arrow-avro/examples/decode_kafka_stream.rs 
b/arrow-avro/examples/decode_kafka_stream.rs
new file mode 100644
index 0000000000..f5b0f2e657
--- /dev/null
+++ b/arrow-avro/examples/decode_kafka_stream.rs
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decode **Confluent Schema Registry - framed** Avro messages into Arrow 
[`RecordBatch`]es,
+//! resolving **older writer schemas** against a **current reader schema** 
without adding
+//! any new reader‑only fields.
+//!
+//! What this example shows:
+//! * A **reader schema** for the current topic version with fields: `{ id: 
long, name: string }`.
+//! * Two older **writer schemas** (Confluent IDs **0** and **1**):
+//!   - v0: `{ id: int, name: string }` (older type for `id`)
+//!   - v1: `{ id: long, name: string, email: ["null","string"] }` (extra 
writer field `email`)
+//! * Streaming decode with `ReaderBuilder::with_reader_schema(...)` so that:
+//!   - v0's `id:int` is **promoted** to `long` for the reader
+//!   - v1's extra `email` field is **ignored** by the reader (projection)
+//!
+//! Wire format reminder (message value bytes):
+//! `0x00` magic byte + 4‑byte **big‑endian** schema ID + Avro **binary** body.
+//!
+
+use arrow_array::{Int64Array, RecordBatch, StringArray};
+use arrow_avro::reader::ReaderBuilder;
+use arrow_avro::schema::{
+    AvroSchema, Fingerprint, FingerprintAlgorithm, SchemaStore, 
CONFLUENT_MAGIC,
+};
+use arrow_schema::ArrowError;
+
+fn encode_long(value: i64, out: &mut Vec<u8>) {
+    let mut n = ((value << 1) ^ (value >> 63)) as u64;
+    while (n & !0x7F) != 0 {
+        out.push(((n as u8) & 0x7F) | 0x80);
+        n >>= 7;
+    }
+    out.push(n as u8);
+}
+
+fn encode_len(len: usize, out: &mut Vec<u8>) {
+    encode_long(len as i64, out)
+}
+
+fn encode_string(s: &str, out: &mut Vec<u8>) {
+    encode_len(s.len(), out);
+    out.extend_from_slice(s.as_bytes());
+}
+
+fn encode_union_index(index: i64, out: &mut Vec<u8>) {
+    encode_long(index, out);
+}
+
+// Writer v0 (ID=0):
+//   {"type":"record","name":"User","fields":[
+//     {"name":"id","type":"int"},
+//     {"name":"name","type":"string"}]}
+fn encode_user_v0_body(id: i32, name: &str) -> Vec<u8> {
+    let mut v = Vec::with_capacity(16 + name.len());
+    encode_long(id as i64, &mut v);
+    encode_string(name, &mut v);
+    v
+}
+
+// Writer v1 (ID=1):
+//   {"type":"record","name":"User","fields":[
+//     {"name":"id","type":"long"},
+//     {"name":"name","type":"string"},
+//     {"name":"email","type":["null","string"],"default":null}]}
+fn encode_user_v1_body(id: i64, name: &str, email: Option<&str>) -> Vec<u8> {
+    let mut v = Vec::with_capacity(24 + name.len() + email.map(|s| 
s.len()).unwrap_or(0));
+    encode_long(id, &mut v); // id: long
+    encode_string(name, &mut v); // name: string
+    match email {
+        None => {
+            // union index 0 => null
+            encode_union_index(0, &mut v);
+            // no value bytes follow for null
+        }
+        Some(s) => {
+            // union index 1 => string, followed by the string payload
+            encode_union_index(1, &mut v);
+            encode_string(s, &mut v);
+        }
+    }
+    v
+}
+
+fn frame_confluent(id_be: u32, body: &[u8]) -> Vec<u8> {
+    let mut out = Vec::with_capacity(5 + body.len());
+    out.extend_from_slice(&CONFLUENT_MAGIC); // 0x00
+    out.extend_from_slice(&id_be.to_be_bytes());
+    out.extend_from_slice(body);
+    out
+}
+
+fn print_arrow_schema(schema: &arrow_schema::Schema) {
+    println!("Resolved Arrow schema (via reader schema):");
+    for (i, f) in schema.fields().iter().enumerate() {
+        println!(
+            "  {i:>2}: {}: {:?} (nullable: {})",
+            f.name(),
+            f.data_type(),
+            f.is_nullable()
+        );
+    }
+    if !schema.metadata.is_empty() {
+        println!("  metadata: {:?}", schema.metadata());
+    }
+}
+
+fn print_rows(batch: &RecordBatch) -> Result<(), ArrowError> {
+    let ids = batch
+        .column(0)
+        .as_any()
+        .downcast_ref::<Int64Array>()
+        .ok_or_else(|| ArrowError::ComputeError("col 0 not Int64".into()))?;
+    let names = batch
+        .column(1)
+        .as_any()
+        .downcast_ref::<StringArray>()
+        .ok_or_else(|| ArrowError::ComputeError("col 1 not Utf8".into()))?;
+    for row in 0..batch.num_rows() {
+        let id = ids.value(row);
+        let name = names.value(row);
+        println!("    row {row}: id={id}, name={name}");
+    }
+    Ok(())
+}
+
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    // The current topic schema as a READER schema
+    let reader_schema = AvroSchema::new(
+        r#"{
+            "type":"record","name":"User","fields":[
+                {"name":"id","type":"long"},
+                {"name":"name","type":"string"}
+            ]}"#
+        .to_string(),
+    );
+
+    // Two prior WRITER schemas versions under Confluent IDs 0 and 1
+    let writer_v0 = AvroSchema::new(
+        r#"{
+            "type":"record","name":"User","fields":[
+                {"name":"id","type":"int"},
+                {"name":"name","type":"string"}
+            ]}"#
+        .to_string(),
+    );
+    let writer_v1 = AvroSchema::new(
+        r#"{
+            "type":"record","name":"User","fields":[
+                {"name":"id","type":"long"},
+                {"name":"name","type":"string"},
+                {"name":"email","type":["null","string"],"default":null}
+            ]}"#
+        .to_string(),
+    );
+
+    let id_v0: u32 = 0;
+    let id_v1: u32 = 1;
+
+    // Confluent SchemaStore keyed by integer IDs (FingerprintAlgorithm::None)
+    let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+    store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
+    store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
+
+    // Build a streaming Decoder with the READER schema
+    let mut decoder = ReaderBuilder::new()
+        .with_reader_schema(reader_schema)
+        .with_writer_schema_store(store)
+        .with_batch_size(8) // small batches for demo output
+        .build_decoder()?;
+
+    // Print the resolved Arrow schema (derived from reader and writer)
+    let resolved = decoder.schema();
+    print_arrow_schema(resolved.as_ref());
+    println!();
+
+    // Simulate an interleaved Kafka stream (IDs 0 and 1)
+    //    - v0: {id:int, name:string} --> reader: id promoted to long
+    //    - v1: {id:long, name:string, email: ...} --> reader ignores extra 
field
+    let mut frames: Vec<(u32, Vec<u8>)> = Vec::new();
+
+    // Some v0 messages
+    for (i, name) in ["v0-alice", "v0-bob", "v0-carol"].iter().enumerate() {
+        let body = encode_user_v0_body(1000 + i as i32, name);
+        frames.push((id_v0, frame_confluent(id_v0, &body)));
+    }
+
+    // Some v1 messages (may include optional email on the writer side)
+    let v1_rows = [
+        (2001_i64, "v1-dave", Some("[email protected]")),
+        (2002_i64, "v1-erin", None),
+        (2003_i64, "v1-frank", Some("[email protected]")),
+    ];
+    for (id, name, email) in v1_rows {
+        let body = encode_user_v1_body(id, name, email);
+        frames.push((id_v1, frame_confluent(id_v1, &body)));
+    }
+
+    // Interleave to show mid-stream schema ID changes (0,1,0,1, ...)
+    frames.swap(1, 3); // crude interleave for demo
+
+    // Decode frames as if they were Kafka record values
+    for (schema_id, frame) in frames {
+        println!("Decoding record framed with Confluent schema id = 
{schema_id}");
+        let _consumed = decoder.decode(&frame)?;
+        while let Some(batch) = decoder.flush()? {
+            println!(
+                "  -> Emitted batch: rows = {}, cols = {}",
+                batch.num_rows(),
+                batch.num_columns()
+            );
+            print_rows(&batch)?;
+        }
+        println!();
+    }
+
+    println!("Done decoding Kafka-style stream with schema resolution (no 
reader-added fields).");
+    Ok(())
+}
diff --git a/arrow-avro/examples/decode_stream.rs 
b/arrow-avro/examples/decode_stream.rs
new file mode 100644
index 0000000000..fe13382d29
--- /dev/null
+++ b/arrow-avro/examples/decode_stream.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decode Avro **stream-framed** bytes into Arrow [`RecordBatch`]es.
+//!
+//! This example demonstrates how to:
+//! * Build a streaming `Decoder` via `ReaderBuilder::build_decoder`
+//! * Register a writer schema keyed by a **Single‑Object** Rabin fingerprint
+//! * Generate a few **Single‑Object** frames in‑memory and decode them
+
+use arrow_avro::reader::ReaderBuilder;
+use arrow_avro::schema::{AvroSchema, Fingerprint, SchemaStore, 
SINGLE_OBJECT_MAGIC};
+
+fn encode_long(value: i64, out: &mut Vec<u8>) {
+    let mut n = ((value << 1) ^ (value >> 63)) as u64;
+    while (n & !0x7F) != 0 {
+        out.push(((n as u8) & 0x7F) | 0x80);
+        n >>= 7;
+    }
+    out.push(n as u8);
+}
+
+fn encode_len(len: usize, out: &mut Vec<u8>) {
+    encode_long(len as i64, out)
+}
+
+fn encode_string(s: &str, out: &mut Vec<u8>) {
+    encode_len(s.len(), out);
+    out.extend_from_slice(s.as_bytes());
+}
+
+fn encode_user_body(id: i64, name: &str) -> Vec<u8> {
+    let mut v = Vec::with_capacity(16 + name.len());
+    encode_long(id, &mut v);
+    encode_string(name, &mut v);
+    v
+}
+
+// Frame a body as Avro Single‑Object: magic + 8-byte little‑endian 
fingerprint + body
+fn frame_single_object(fp_rabin: u64, body: &[u8]) -> Vec<u8> {
+    let mut out = Vec::with_capacity(2 + 8 + body.len());
+    out.extend_from_slice(&SINGLE_OBJECT_MAGIC);
+    out.extend_from_slice(&fp_rabin.to_le_bytes());
+    out.extend_from_slice(body);
+    out
+}
+
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    // A tiny Avro writer schema used to generate a few messages
+    let avro = AvroSchema::new(
+        r#"{"type":"record","name":"User","fields":[
+            {"name":"id","type":"long"},
+            {"name":"name","type":"string"}]}"#
+            .to_string(),
+    );
+
+    // Register the writer schema in a store (keyed by Rabin fingerprint).
+    // Keep the fingerprint to seed the decoder and to frame generated 
messages.
+    let mut store = SchemaStore::new();
+    let fp = store.register(avro.clone())?;
+    let rabin = match fp {
+        Fingerprint::Rabin(v) => v,
+        _ => unreachable!("Single‑Object framing uses Rabin fingerprints"),
+    };
+
+    // Build a streaming decoder configured for Single‑Object framing.
+    let mut decoder = ReaderBuilder::new()
+        .with_writer_schema_store(store)
+        .with_active_fingerprint(fp)
+        .build_decoder()?;
+
+    // Generate 5 Single‑Object frames for the "User" schema.
+    let mut bytes = Vec::new();
+    for i in 0..5 {
+        let body = encode_user_body(i as i64, &format!("user-{i}"));
+        bytes.extend_from_slice(&frame_single_object(rabin, &body));
+    }
+
+    // Feed all bytes at once, then flush completed batches.
+    let _consumed = decoder.decode(&bytes)?;
+    while let Some(batch) = decoder.flush()? {
+        println!(
+            "Batch: rows = {:>3}, cols = {}",
+            batch.num_rows(),
+            batch.num_columns()
+        );
+    }
+
+    Ok(())
+}
diff --git a/arrow-avro/examples/read_avro_ocf.rs 
b/arrow-avro/examples/read_avro_ocf.rs
new file mode 100644
index 0000000000..bf17ed572b
--- /dev/null
+++ b/arrow-avro/examples/read_avro_ocf.rs
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Read an Avro **Object Container File (OCF)** into Arrow [`RecordBatch`] 
values.
+//!
+//! This example demonstrates how to:
+//! * Construct a [`Reader`] using [`ReaderBuilder::build`]
+//! * Iterate `RecordBatch`es and print a brief summary
+
+use std::fs::File;
+use std::io::BufReader;
+use std::path::PathBuf;
+
+use arrow_array::RecordBatch;
+use arrow_avro::reader::ReaderBuilder;
+
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let ocf_path: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+        .join("test")
+        .join("data")
+        .join("skippable_types.avro");
+
+    let reader = BufReader::new(File::open(&ocf_path)?);
+    // Build a high-level OCF Reader with default settings
+    let avro_reader = ReaderBuilder::new().build(reader)?;
+    let schema = avro_reader.schema();
+    println!(
+        "Discovered Arrow schema with {} fields",
+        schema.fields().len()
+    );
+
+    let mut total_batches = 0usize;
+    let mut total_rows = 0usize;
+    let mut total_columns = schema.fields().len();
+
+    for result in avro_reader {
+        let batch: RecordBatch = result?;
+        total_batches += 1;
+        total_rows += batch.num_rows();
+        total_columns = batch.num_columns();
+
+        println!(
+            "Batch {:>3}: rows = {:>6}, cols = {:>3}",
+            total_batches,
+            batch.num_rows(),
+            batch.num_columns()
+        );
+    }
+
+    println!();
+    println!("Done.");
+    println!("  Batches : {total_batches}");
+    println!("  Rows    : {total_rows}");
+    println!("  Columns : {total_columns}");
+
+    Ok(())
+}
diff --git a/arrow-avro/examples/read_ocf_with_resolution.rs 
b/arrow-avro/examples/read_ocf_with_resolution.rs
new file mode 100644
index 0000000000..7367ba3cd5
--- /dev/null
+++ b/arrow-avro/examples/read_ocf_with_resolution.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Read an Avro **Object Container File (OCF)** using an inline **reader 
schema**
+//! that differs from the writer schema, demonstrating Avro **schema 
resolution**
+//! (field projection and legal type promotion) without ever fetching the 
writer
+//! schema from the file.
+//!
+//! What this example does:
+//! 1. Locates `<crate>/test/data/skippable_types.avro` (portable path).
+//! 2. Defines an inline **reader schema** JSON:
+//!    * Projects a subset of fields from the writer schema, and
+//!    * Promotes `"int"` to `"long"` where applicable.
+//! 3. Builds a `Reader` with `ReaderBuilder::with_reader_schema(...)` and 
prints batches.
+
+use std::fs::File;
+use std::io::BufReader;
+use std::path::PathBuf;
+
+use arrow_array::RecordBatch;
+use arrow_avro::reader::ReaderBuilder;
+use arrow_avro::schema::AvroSchema;
+
+fn default_ocf_path() -> PathBuf {
+    PathBuf::from(env!("CARGO_MANIFEST_DIR"))
+        .join("test")
+        .join("data")
+        .join("skippable_types.avro")
+}
+
+// A minimal reader schema compatible with the provided writer schema
+const READER_SCHEMA_JSON: &str = r#"
+{
+  "type": "record",
+  "name": "SkippableTypesRecord",
+  "fields": [
+    { "name": "boolean_field", "type": "boolean" },
+    { "name": "int_field", "type": "long" },
+    { "name": "long_field", "type": "long" },
+    { "name": "string_field", "type": "string" },
+    { "name": "nullable_nullfirst_field", "type": ["null", "long"] }
+  ]
+}
+"#;
+
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let ocf_path = default_ocf_path();
+    let file = File::open(&ocf_path)?;
+    let reader_schema = AvroSchema::new(READER_SCHEMA_JSON.to_string());
+
+    let reader = ReaderBuilder::new()
+        .with_reader_schema(reader_schema)
+        .build(BufReader::new(file))?;
+
+    let resolved_schema = reader.schema();
+    println!(
+        "Reader-based decode: resolved Arrow schema with {} fields",
+        resolved_schema.fields().len()
+    );
+
+    // Iterate batches and print a brief summary
+    let mut total_batches = 0usize;
+    let mut total_rows = 0usize;
+    for next in reader {
+        let batch: RecordBatch = next?;
+        total_batches += 1;
+        total_rows += batch.num_rows();
+        println!(
+            "  Batch {:>3}: rows = {:>6}, cols = {:>2}",
+            total_batches,
+            batch.num_rows(),
+            batch.num_columns()
+        );
+    }
+
+    println!();
+    println!("Done (with reader/writer schema resolution).");
+    println!("  Batches : {total_batches}");
+    println!("  Rows    : {total_rows}");
+
+    Ok(())
+}
diff --git a/arrow-avro/examples/write_avro_ocf.rs 
b/arrow-avro/examples/write_avro_ocf.rs
new file mode 100644
index 0000000000..5bdca0de7a
--- /dev/null
+++ b/arrow-avro/examples/write_avro_ocf.rs
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # Write an Avro Object Container File (OCF) from an Arrow `RecordBatch`
+//!
+//! This example builds a small Arrow `RecordBatch` and persists it to an
+//! **Avro Object Container File (OCF)** using
+//! `arrow_avro::writer::{Writer, WriterBuilder}`.
+//!
+//! ## What this example does
+//! - Define an Arrow schema with supported types (`Int64`, `Utf8`, `Boolean`,
+//!   `Float64`, `Binary`, and `Timestamp (Microsecond, "UTC")`).
+//! - Constructs arrays and a `RecordBatch`, ensuring each column’s data type
+//!   **exactly** matches the schema (timestamps include the `"UTC"` timezone).
+//! - Writes a single batch to `target/write_avro_ocf_example.avro` as an OCF,
+//!   using Snappy block compression (you can disable or change the codec).
+//! - Prints the file’s 16‑byte sync marker (used by OCF to delimit blocks).
+
+use std::fs::File;
+use std::io::BufWriter;
+use std::sync::Arc;
+
+use arrow_array::{
+    ArrayRef, BinaryArray, BooleanArray, Float64Array, Int64Array, 
RecordBatch, StringArray,
+    TimestampMicrosecondArray,
+};
+use arrow_avro::compression::CompressionCodec;
+use arrow_avro::writer::format::AvroOcfFormat;
+use arrow_avro::writer::{Writer, WriterBuilder};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+    // Arrow schema
+    // id:         Int64 (non-null)
+    // name:       Utf8  (nullable)
+    // active:     Boolean (non-null)
+    // score:      Float64 (nullable)
+    // payload:    Binary (nullable)
+    // created_at: Timestamp(Microsecond, Some("UTC")) (non-null)
+    let schema = Schema::new(vec![
+        Field::new("id", DataType::Int64, false),
+        Field::new("name", DataType::Utf8, true),
+        Field::new("active", DataType::Boolean, false),
+        Field::new("score", DataType::Float64, true),
+        Field::new("payload", DataType::Binary, true),
+        Field::new(
+            "created_at",
+            DataType::Timestamp(TimeUnit::Microsecond, 
Some(Arc::from("UTC".to_string()))),
+            false,
+        ),
+    ]);
+
+    let schema_ref = Arc::new(schema.clone());
+    let ids = Int64Array::from(vec![1_i64, 2, 3]);
+    let names = StringArray::from(vec![Some("alpha"), None, Some("gamma")]);
+    let active = BooleanArray::from(vec![true, false, true]);
+    let scores = Float64Array::from(vec![Some(1.5_f64), None, Some(3.0)]);
+
+    // BinaryArray: include a null
+    let payload = BinaryArray::from_opt_vec(vec![Some(&b"abc"[..]), None, 
Some(&[0u8, 1, 2][..])]);
+
+    // Timestamp in microseconds since UNIX epoch
+    let created_at = TimestampMicrosecondArray::from(vec![
+        Some(1_722_000_000_000_000_i64),
+        Some(1_722_000_123_456_000_i64),
+        Some(1_722_000_999_999_000_i64),
+    ])
+    .with_timezone("UTC".to_string());
+
+    let columns: Vec<ArrayRef> = vec![
+        Arc::new(ids),
+        Arc::new(names),
+        Arc::new(active),
+        Arc::new(scores),
+        Arc::new(payload),
+        Arc::new(created_at),
+    ];
+
+    let batch = RecordBatch::try_new(schema_ref, columns)?;
+
+    // Build an OCF writer with optional compression
+    let out_path = "target/write_avro_ocf_example.avro";
+    let file = File::create(out_path)?;
+    let mut writer: Writer<_, AvroOcfFormat> = WriterBuilder::new(schema)
+        .with_compression(Some(CompressionCodec::Snappy))
+        .build(BufWriter::new(file))?;
+
+    // Write a single batch (use `write_batches` for multiple)
+    writer.write(&batch)?;
+    writer.finish()?; // flush and finalize
+
+    if let Some(sync) = writer.sync_marker() {
+        println!("Wrote OCF to {out_path} (sync marker: {:02x?})", &sync[..]);
+    } else {
+        println!("Wrote OCF to {out_path}");
+    }
+
+    Ok(())
+}
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 3f94391c25..cf0276f0a2 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -18,7 +18,7 @@
 use crate::schema::{
     Array, Attributes, AvroSchema, ComplexType, Enum, Fixed, Map, Nullability, 
PrimitiveType,
     Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
-    AVRO_FIELD_DEFAULT_METADATA_KEY,
+    AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_ROOT_RECORD_DEFAULT_NAME,
 };
 use arrow_schema::{
     ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, 
DECIMAL128_MAX_PRECISION,
@@ -476,7 +476,7 @@ impl AvroField {
     ) -> Result<Self, ArrowError> {
         let top_name = match reader_schema {
             Schema::Complex(ComplexType::Record(r)) => r.name.to_string(),
-            _ => "root".to_string(),
+            _ => AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
         };
         let mut resolver = Maker::new(use_utf8view, strict_mode);
         let data_type = resolver.make_data_type(writer_schema, 
Some(reader_schema), None)?;
@@ -2034,6 +2034,17 @@ mod tests {
         }
     }
 
+    #[test]
+    fn 
test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
+        let writer_schema = 
Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
+        let reader_schema = 
Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
+        let field =
+            AvroField::resolve_from_writer_and_reader(&writer_schema, 
&reader_schema, false, false)
+                .expect("resolution should succeed");
+        assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
+        assert!(matches!(field.data_type().codec(), Codec::Utf8));
+    }
+
     fn json_string(s: &str) -> Value {
         Value::String(s.to_string())
     }
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 13e0f07b45..217366b633 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -17,49 +17,86 @@
 
 //! Avro reader
 //!
-//! This module provides facilities to read Apache Avro-encoded files or 
streams
-//! into Arrow's `RecordBatch` format. In particular, it introduces:
+//! Facilities to read Apache Avro–encoded data into Arrow's `RecordBatch` 
format.
 //!
-//! * `ReaderBuilder`: Configures Avro reading, e.g., batch size
-//! * `Reader`: Yields `RecordBatch` values, implementing `Iterator`
-//! * `Decoder`: A low-level push-based decoder for Avro records
+//! This module exposes three layers of the API surface, from highest to 
lowest-level:
 //!
-//! # Basic Usage
+//! * `ReaderBuilder`: configures how Avro is read (batch size, strict union 
handling,
+//!   string representation, reader schema, etc.) and produces either:
+//!   * a `Reader` for **Avro Object Container Files (OCF)** read from any 
`BufRead`, or
+//!   * a low-level `Decoder` for **single‑object encoded** Avro bytes and 
Confluent
+//!     **Schema Registry** framed messages.
+//! * `Reader`: a convenient, synchronous iterator over `RecordBatch` decoded 
from an OCF
+//!   input. Implements [`Iterator<Item = Result<RecordBatch, ArrowError>>`] 
and
+//!   `RecordBatchReader`.
+//! * `Decoder`: a push‑based row decoder that consumes raw Avro bytes and 
yields ready
+//!   `RecordBatch` values when batches fill. This is suitable for integrating 
with async
+//!   byte streams, network protocols, or other custom data sources.
 //!
-//! `Reader` can be used directly with synchronous data sources, such as 
[`std::fs::File`].
+//! ## Encodings and when to use which type
 //!
-//! ## Reading a Single Batch
+//! * **Object Container File (OCF)**: A self‑describing file format with a 
header containing
+//!   the writer schema, optional compression codec, and a sync marker, 
followed by one or
+//!   more data blocks. Use `Reader` for this format. See the Avro 
specification for the
+//!   structure of OCF headers and blocks. 
<https://avro.apache.org/docs/1.11.1/specification/>
+//! * **Single‑Object Encoding**: A stream‑friendly framing that prefixes each 
record body with
+//!   the 2‑byte magic `0xC3 0x01` followed by a schema fingerprint. Use 
`Decoder` with a
+//!   populated `SchemaStore` to resolve fingerprints to full
+//!   schemas. <https://avro.apache.org/docs/1.11.1/specification/>
+//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a 
4‑byte big‑endian
+//!   schema ID, then the Avro‑encoded body. Use `Decoder` with a
+//!   `SchemaStore` configured for `FingerprintAlgorithm::None`
+//!   and entries keyed by `Fingerprint::Id`. Confluent docs
+//!   describe this framing.
+//!
+//! ## Basic file usage (OCF)
+//!
+//! Use `ReaderBuilder::build` to construct a `Reader` from any `BufRead`, 
such as a
+//! `BufReader<File>`. The reader yields `RecordBatch` values you can iterate 
over or collect.
+//!
+//! ```no_run
+//! use std::fs::File;
+//! use std::io::BufReader;
+//! use arrow_array::RecordBatch;
+//! use arrow_avro::reader::ReaderBuilder;
+//!
+//! // Locate a test file (mirrors Arrow's test data layout)
+//! let path = "avro/alltypes_plain.avro";
+//! let path = std::env::var("ARROW_TEST_DATA")
+//!     .map(|dir| format!("{dir}/{path}"))
+//!     .unwrap_or_else(|_| format!("../testing/data/{path}"));
 //!
-//! ```
-//! # use std::fs::File;
-//! # use std::io::BufReader;
-//! # use arrow_avro::reader::ReaderBuilder;
-//! # let path = "avro/alltypes_plain.avro";
-//! # let path = match std::env::var("ARROW_TEST_DATA") {
-//! #   Ok(dir) => format!("{dir}/{path}"),
-//! #   Err(_) => format!("../testing/data/{path}")
-//! # };
 //! let file = File::open(path).unwrap();
-//! let mut avro = ReaderBuilder::new().build(BufReader::new(file)).unwrap();
-//! let batch = avro.next().unwrap();
+//! let mut reader = ReaderBuilder::new().build(BufReader::new(file)).unwrap();
+//!
+//! // Iterate batches
+//! let mut num_rows = 0usize;
+//! while let Some(batch) = reader.next() {
+//!     let batch: RecordBatch = batch.unwrap();
+//!     num_rows += batch.num_rows();
+//! }
+//! println!("decoded {num_rows} rows");
 //! ```
 //!
-//! # Async Usage
+//! ## Streaming usage (single‑object / Confluent)
 //!
-//! The lower-level `Decoder` can be integrated with various forms of async 
data streams,
-//! and is designed to be agnostic to different async IO primitives within
-//! the Rust ecosystem. It works by incrementally decoding Avro data from byte 
slices.
+//! The `Decoder` lets you integrate Avro decoding with **any** source of 
bytes by
+//! periodically calling `Decoder::decode` with new data and calling 
`Decoder::flush`
+//! to get a `RecordBatch` once at least one row is complete.
 //!
-//! For example, see below for how it could be used with an arbitrary `Stream` 
of `Bytes`:
+//! The example below shows how to decode from an arbitrary stream of 
`bytes::Bytes` using
+//! `futures` utilities. Note: this is illustrative and keeps a single 
in‑memory `Bytes`
+//! buffer for simplicity—real applications typically maintain a rolling 
buffer.
 //!
-//! ```
-//! # use std::task::{Poll, ready};
-//! # use bytes::{Buf, Bytes};
-//! # use arrow_schema::ArrowError;
-//! # use futures::stream::{Stream, StreamExt};
-//! # use arrow_array::RecordBatch;
-//! # use arrow_avro::reader::Decoder;
+//! ```no_run
+//! use bytes::{Buf, Bytes};
+//! use futures::{Stream, StreamExt};
+//! use std::task::{Poll, ready};
+//! use arrow_array::RecordBatch;
+//! use arrow_schema::ArrowError;
+//! use arrow_avro::reader::Decoder;
 //!
+//! /// Decode a stream of Avro-framed bytes into RecordBatch values.
 //! fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
 //!     mut decoder: Decoder,
 //!     mut input: S,
@@ -70,25 +107,101 @@
 //!             if buffered.is_empty() {
 //!                 buffered = match ready!(input.poll_next_unpin(cx)) {
 //!                     Some(b) => b,
-//!                     None => break,
+//!                     None => break, // EOF
 //!                 };
 //!             }
+//!             // Feed as much as possible
 //!             let decoded = match decoder.decode(buffered.as_ref()) {
-//!                 Ok(decoded) => decoded,
+//!                 Ok(n) => n,
 //!                 Err(e) => return Poll::Ready(Some(Err(e))),
 //!             };
 //!             let read = buffered.len();
 //!             buffered.advance(decoded);
 //!             if decoded != read {
+//!                 // decoder made partial progress; request more bytes
 //!                 break
 //!             }
 //!         }
-//!         // Convert any fully-decoded rows to a RecordBatch, if available
+//!         // Return a batch if one or more rows are complete
 //!         Poll::Ready(decoder.flush().transpose())
 //!     })
 //! }
 //! ```
 //!
+//! ### Building a `Decoder` for **single‑object encoding** (Rabin 
fingerprints)
+//!
+//! ```no_run
+//! use arrow_avro::schema::{AvroSchema, SchemaStore};
+//! use arrow_avro::reader::ReaderBuilder;
+//!
+//! // Build a SchemaStore and register known writer schemas
+//! let mut store = SchemaStore::new(); // Rabin by default
+//! let user_schema = 
AvroSchema::new(r#"{"type":"record","name":"User","fields":[
+//!     
{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#.to_string());
+//! let _fp = store.register(user_schema).unwrap(); // computes Rabin 
CRC-64-AVRO
+//!
+//! // Build a Decoder that expects single-object encoding (0xC3 0x01 + 
fingerprint and body)
+//! let decoder = ReaderBuilder::new()
+//!     .with_writer_schema_store(store)
+//!     .with_batch_size(1024)
+//!     .build_decoder()
+//!     .unwrap();
+//! // Feed decoder with framed bytes (not shown; see `decode_stream` above).
+//! ```
+//!
+//! ### Building a `Decoder` for **Confluent Schema Registry** framed messages
+//!
+//! ```no_run
+//! use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, 
FingerprintAlgorithm};
+//! use arrow_avro::reader::ReaderBuilder;
+//!
+//! // Confluent wire format uses a magic 0x00 byte + 4-byte schema id 
(big-endian).
+//! // Create a store keyed by `Fingerprint::Id` and pre-populate with known 
schemas.
+//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+//!
+//! // Suppose registry ID 42 corresponds to this Avro schema:
+//! let avro = AvroSchema::new(r#"{"type":"string"}"#.to_string());
+//! store.set(Fingerprint::Id(42), avro).unwrap();
+//!
+//! // Build a Decoder that understands Confluent framing
+//! let decoder = ReaderBuilder::new()
+//!     .with_writer_schema_store(store)
+//!     .build_decoder()
+//!     .unwrap();
+//! // Feed decoder with 0x00 + [id:4] + Avro body frames.
+//! ```
+//!
+//! ## Schema evolution and batch boundaries
+//!
+//! `Decoder` supports mid‑stream schema changes when the input framing 
carries a schema
+//! fingerprint (single‑object or Confluent). When a new fingerprint is 
observed:
+//!
+//! * If the current `RecordBatch` is **empty**, the decoder switches to the 
new schema
+//!   immediately.
+//! * If not, the decoder finishes the current batch first and only then 
switches.
+//!
+//! Consequently, the schema of batches produced by `Decoder::flush` may 
change over time,
+//! and `Decoder` intentionally does **not** implement `RecordBatchReader`. In 
contrast,
+//! `Reader` (OCF) has a single writer schema for the entire file and 
therefore implements
+//! `RecordBatchReader`.
+//!
+//! ## Performance & memory
+//!
+//! * `batch_size` controls the maximum number of rows per `RecordBatch`. 
Larger batches
+//!   amortize per‑batch overhead; smaller batches reduce peak memory usage 
and latency.
+//! * When `utf8_view` is enabled, string columns use Arrow’s 
`StringViewArray`, which can
+//!   reduce allocations for short strings.
+//! * For OCF, blocks may be compressed `Reader` will decompress using the 
codec specified
+//!   in the file header and feed uncompressed bytes to the row `Decoder`.
+//!
+//! ## Error handling
+//!
+//! * Incomplete inputs return parse errors with "Unexpected EOF"; callers 
typically provide
+//!   more bytes and try again.
+//! * If a fingerprint is unknown to the provided `SchemaStore`, decoding 
fails with a
+//!   descriptive error. Populate the store up front to avoid this.
+//!
+//! ---
 use crate::codec::{AvroField, AvroFieldBuilder};
 use crate::schema::{
     compare_schemas, AvroSchema, Fingerprint, FingerprintAlgorithm, Schema, 
SchemaStore,
@@ -138,7 +251,77 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
     )
 }
 
-/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+/// A low‑level, push‑based decoder from Avro bytes to Arrow `RecordBatch`.
+///
+/// `Decoder` is designed for **streaming** scenarios:
+///
+/// * You *feed* freshly received bytes using `Self::decode`, potentially 
multiple times,
+///   until at least one row is complete.
+/// * You then *drain* completed rows with `Self::flush`, which yields a 
`RecordBatch`
+///   if any rows were finished since the last flush.
+///
+/// Unlike `Reader`, which is specialized for Avro **Object Container Files**, 
`Decoder`
+/// understands **framed single‑object** inputs and **Confluent Schema 
Registry** messages,
+/// switching schemas mid‑stream when the framing indicates a new fingerprint.
+///
+/// ### Supported prefixes
+///
+/// On each new row boundary, `Decoder` tries to match one of the following 
"prefixes":
+///
+/// * **Single‑Object encoding**: magic `0xC3 0x01` + schema fingerprint 
(length depends on
+///   the configured `FingerprintAlgorithm`); see `SINGLE_OBJECT_MAGIC`.
+/// * **Confluent wire format**: magic `0x00` + 4‑byte big‑endian schema id; 
see
+///   `CONFLUENT_MAGIC`.
+///
+/// The active fingerprint determines which cached row decoder is used to 
decode the following
+/// record body bytes.
+///
+/// ### Schema switching semantics
+///
+/// When a new fingerprint is observed:
+///
+/// * If the current batch is empty, the decoder switches immediately;
+/// * Otherwise, the current batch is finalized on the next `flush` and only 
then
+///   does the decoder switch to the new schema. This guarantees that a single 
`RecordBatch`
+///   never mixes rows with different schemas.
+///
+/// ### Examples
+///
+/// Build a `Decoder` for single‑object encoding using a `SchemaStore` with 
Rabin fingerprints:
+///
+/// ```no_run
+/// use arrow_avro::schema::{AvroSchema, SchemaStore};
+/// use arrow_avro::reader::ReaderBuilder;
+///
+/// let mut store = SchemaStore::new(); // Rabin by default
+/// let avro = AvroSchema::new(r#""string""#.to_string());
+/// let _fp = store.register(avro).unwrap();
+///
+/// let mut decoder = ReaderBuilder::new()
+///     .with_writer_schema_store(store)
+///     .with_batch_size(512)
+///     .build_decoder()
+///     .unwrap();
+///
+/// // Feed bytes (framed as 0xC3 0x01 + fingerprint and body)
+/// // decoder.decode(&bytes)?;
+/// // if let Some(batch) = decoder.flush()? { /* process */ }
+/// ```
+///
+/// Build a `Decoder` for Confluent Registry messages (magic 0x00 + 4‑byte id):
+///
+/// ```no_run
+/// use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, 
FingerprintAlgorithm};
+/// use arrow_avro::reader::ReaderBuilder;
+///
+/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+/// store.set(Fingerprint::Id(7), 
AvroSchema::new(r#""long""#.to_string())).unwrap();
+///
+/// let mut decoder = ReaderBuilder::new()
+///     .with_writer_schema_store(store)
+///     .build_decoder()
+///     .unwrap();
+/// ```
 #[derive(Debug)]
 pub struct Decoder {
     active_decoder: RecordDecoder,
@@ -154,21 +337,39 @@ pub struct Decoder {
 }
 
 impl Decoder {
-    /// Return the Arrow schema for the rows decoded by this decoder
+    /// Returns the Arrow schema for the rows decoded by this decoder.
+    ///
+    /// **Note:** With single‑object or Confluent framing, the schema may 
change
+    /// at a row boundary when the input indicates a new fingerprint.
     pub fn schema(&self) -> SchemaRef {
         self.active_decoder.schema().clone()
     }
 
-    /// Return the configured maximum number of rows per batch
+    /// Returns the configured maximum number of rows per batch.
     pub fn batch_size(&self) -> usize {
         self.batch_size
     }
 
-    /// Feed `data` into the decoder row by row until we either:
-    /// - consume all bytes in `data`, or
-    /// - reach `batch_size` decoded rows.
+    /// Feed a chunk of bytes into the decoder.
     ///
-    /// Returns the number of bytes consumed.
+    /// This will:
+    ///
+    /// * Decode at most `Self::batch_size` rows;
+    /// * Return the number of input bytes **consumed** from `data` (which may 
be 0 if more
+    ///   bytes are required, or less than `data.len()` if a prefix/body 
straddles the
+    ///   chunk boundary);
+    /// * Defer producing a `RecordBatch` until you call `Self::flush`.
+    ///
+    /// # Returns
+    /// The number of bytes consumed from `data`.
+    ///
+    /// # Errors
+    /// Returns an error if:
+    ///
+    /// * The input indicates an unknown fingerprint (not present in the 
provided
+    ///   `SchemaStore`;
+    /// * The Avro body is malformed;
+    /// * A strict‑mode union rule is violated (see 
`ReaderBuilder::with_strict_mode`).
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
         let mut total_consumed = 0usize;
         while total_consumed < data.len() && self.remaining_capacity > 0 {
@@ -234,7 +435,7 @@ impl Decoder {
 
     /// This method checks for the provided `magic` bytes at the start of 
`buf` and, if present,
     /// attempts to read the following fingerprint of `N` bytes, converting it 
to a
-    /// [`Fingerprint`] using `fingerprint_from`.
+    /// `Fingerprint` using `fingerprint_from`.
     fn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>(
         &mut self,
         buf: &[u8],
@@ -318,6 +519,10 @@ impl Decoder {
 
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
+    ///
+    /// If a schema change was detected while decoding rows for the current 
batch, the
+    /// schema switch is applied **after** flushing this batch, so the 
**next** batch
+    /// (if any) may have a different schema.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
         // We must flush the active decoder before switching to the pending 
one.
         let batch = self.flush_and_reset();
@@ -335,7 +540,7 @@ impl Decoder {
         self.remaining_capacity == 0
     }
 
-    /// Returns true if the decoder has not decoded any batches yet.
+    /// Returns true if the decoder has not decoded any batches yet (i.e., the 
current batch is empty).
     pub fn batch_is_empty(&self) -> bool {
         self.remaining_capacity == self.batch_size
     }
@@ -361,8 +566,57 @@ impl Decoder {
     }
 }
 
-/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
-/// into Arrow `RecordBatch`.
+/// A builder that configures and constructs Avro readers and decoders.
+///
+/// `ReaderBuilder` is the primary entry point for this module. It supports:
+///
+/// * OCF reading via `Self::build`, returning a `Reader` over any `BufRead`;
+/// * streaming decoding via `Self::build_decoder`, returning a `Decoder`.
+///
+/// ### Options
+///
+/// * **`batch_size`**: Max rows per `RecordBatch` (default: `1024`). See 
`Self::with_batch_size`.
+/// * **`utf8_view`**: Use Arrow `StringViewArray` for string columns 
(default: `false`).
+///   See `Self::with_utf8_view`.
+/// * **`strict_mode`**: Opt‑in to stricter union handling (default: `false`).
+///   See `Self::with_strict_mode`.
+/// * **`reader_schema`**: Optional reader schema (projection / evolution) 
used when decoding
+///   values (default: `None`). See `Self::with_reader_schema`.
+/// * **`writer_schema_store`**: Required for building a `Decoder` for 
single‑object or
+///   Confluent framing. Maps fingerprints to Avro schemas. See 
`Self::with_writer_schema_store`.
+/// * **`active_fingerprint`**: Optional starting fingerprint for streaming 
decode when the
+///   first frame omits one (rare). See `Self::with_active_fingerprint`.
+///
+/// ### Examples
+///
+/// Read an OCF file in batches of 4096 rows:
+///
+/// ```no_run
+/// use std::fs::File;
+/// use std::io::BufReader;
+/// use arrow_avro::reader::ReaderBuilder;
+///
+/// let file = File::open("data.avro")?;
+/// let mut reader = ReaderBuilder::new()
+///     .with_batch_size(4096)
+///     .build(BufReader::new(file))?;
+/// # Ok::<(), Box<dyn std::error::Error>>(())
+/// ```
+///
+/// Build a `Decoder` for Confluent messages:
+///
+/// ```no_run
+/// use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, 
FingerprintAlgorithm};
+/// use arrow_avro::reader::ReaderBuilder;
+///
+/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+/// store.set(Fingerprint::Id(1234), 
AvroSchema::new(r#"{"type":"record","name":"E","fields":[]}"#.to_string()))?;
+///
+/// let decoder = ReaderBuilder::new()
+///     .with_writer_schema_store(store)
+///     .build_decoder()?;
+/// # Ok::<(), Box<dyn std::error::Error>>(())
+/// ```
 #[derive(Debug)]
 pub struct ReaderBuilder {
     batch_size: usize,
@@ -387,13 +641,14 @@ impl Default for ReaderBuilder {
 }
 
 impl ReaderBuilder {
-    /// Creates a new [`ReaderBuilder`] with default settings:
-    /// - `batch_size` = 1024
-    /// - `strict_mode` = false
-    /// - `utf8_view` = false
-    /// - `reader_schema` = None
-    /// - `writer_schema_store` = None
-    /// - `active_fingerprint` = None
+    /// Creates a new `ReaderBuilder` with defaults:
+    ///
+    /// * `batch_size = 1024`
+    /// * `strict_mode = false`
+    /// * `utf8_view = false`
+    /// * `reader_schema = None`
+    /// * `writer_schema_store = None`
+    /// * `active_fingerprint = None`
     pub fn new() -> Self {
         Self::default()
     }
@@ -513,45 +768,56 @@ impl ReaderBuilder {
         ))
     }
 
-    /// Sets the row-based batch size
+    /// Sets the **row‑based batch size**.
+    ///
+    /// Each call to `Decoder::flush` or each iteration of `Reader` yields a 
batch with
+    /// *up to* this many rows. Larger batches can reduce overhead; smaller 
batches can
+    /// reduce peak memory usage and latency.
     pub fn with_batch_size(mut self, batch_size: usize) -> Self {
         self.batch_size = batch_size;
         self
     }
 
-    /// Set whether to use StringViewArray for string data
+    /// Choose Arrow's `StringViewArray` for UTF‑8 string data.
     ///
-    /// When enabled, string data from Avro files will be loaded into
-    /// Arrow's StringViewArray instead of the standard StringArray.
+    /// When enabled, textual Avro fields are loaded into Arrow’s 
**StringViewArray**
+    /// instead of the standard `StringArray`. This can improve performance 
for workloads
+    /// with many short strings by reducing allocations.
     pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
         self.utf8_view = utf8_view;
         self
     }
 
-    /// Get whether StringViewArray is enabled for string data
+    /// Returns whether `StringViewArray` is enabled for string data.
     pub fn use_utf8view(&self) -> bool {
         self.utf8_view
     }
 
-    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    /// Enable stricter behavior for certain Avro unions (e.g., `[T, "null"]`).
+    ///
+    /// When `true`, ambiguous or lossy unions that would otherwise be coerced 
may instead
+    /// produce a descriptive error. Use this to catch schema issues early 
during ingestion.
     pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
         self.strict_mode = strict_mode;
         self
     }
 
-    /// Sets the Avro reader schema.
+    /// Sets the **reader schema** used during decoding.
     ///
-    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    /// If not provided, the writer schema from the OCF header (for `Reader`) 
or the
+    /// schema looked up from the fingerprint (for `Decoder`) is used directly.
+    ///
+    /// A reader schema can be used for **schema evolution** or **projection**.
     pub fn with_reader_schema(mut self, schema: AvroSchema) -> Self {
         self.reader_schema = Some(schema);
         self
     }
 
-    /// Sets the `SchemaStore` used for resolving writer schemas.
+    /// Sets the `SchemaStore` used to resolve writer schemas by fingerprint.
     ///
-    /// This is necessary when decoding single-object encoded data that 
identifies
-    /// schemas by a fingerprint. The store allows the decoder to look up the
-    /// full writer schema from a fingerprint embedded in the data.
+    /// This is required when building a `Decoder` for **single‑object 
encoding** or the
+    /// **Confluent** wire format. The store maps a fingerprint (Rabin / MD5 / 
SHA‑256 /
+    /// ID) to a full Avro schema.
     ///
     /// Defaults to `None`.
     pub fn with_writer_schema_store(mut self, store: SchemaStore) -> Self {
@@ -559,19 +825,20 @@ impl ReaderBuilder {
         self
     }
 
-    /// Sets the initial schema fingerprint for decoding single-object encoded 
data.
-    ///
-    /// This is useful when the data stream does not begin with a schema 
definition
-    /// or fingerprint, allowing the decoder to start with a known schema from 
the
-    /// `SchemaStore`.
+    /// Sets the initial schema fingerprint for stream decoding.
     ///
-    /// Defaults to `None`.
+    /// This can be useful for streams that **do not include** a fingerprint 
before the first
+    /// record body (uncommon). If not set, the first observed fingerprint is 
used.
     pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self {
         self.active_fingerprint = Some(fp);
         self
     }
 
-    /// Create a [`Reader`] from this builder and a `BufRead`
+    /// Build a `Reader` (OCF) from this builder and a `BufRead`.
+    ///
+    /// This reads and validates the OCF header, initializes an internal row 
decoder from
+    /// the discovered writer (and optional reader) schema, and prepares to 
iterate blocks,
+    /// decompressing if necessary.
     pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
         let header = read_header(&mut reader)?;
         let decoder = self.make_decoder(Some(&header), 
self.reader_schema.as_ref())?;
@@ -587,7 +854,14 @@ impl ReaderBuilder {
         })
     }
 
-    /// Create a [`Decoder`] from this builder.
+    /// Build a streaming `Decoder` from this builder.
+    ///
+    /// # Requirements
+    /// * `SchemaStore` **must** be provided via 
`Self::with_writer_schema_store`.
+    /// * The store should contain **all** fingerprints that may appear on the 
stream.
+    ///
+    /// # Errors
+    /// * Returns [`ArrowError::InvalidArgumentError`] if the schema store is 
missing
     pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
         if self.writer_schema_store.is_none() {
             return Err(ArrowError::InvalidArgumentError(
@@ -598,8 +872,15 @@ impl ReaderBuilder {
     }
 }
 
-/// A high-level Avro `Reader` that reads container-file blocks
-/// and feeds them into a row-level [`Decoder`].
+/// A high‑level Avro **Object Container File** reader.
+///
+/// `Reader` pulls blocks from a `BufRead` source, handles optional block 
compression,
+/// and decodes them row‑by‑row into Arrow `RecordBatch` values using an 
internal
+/// `Decoder`. It implements both:
+///
+/// * [`Iterator<Item = Result<RecordBatch, ArrowError>>`], and
+/// * `RecordBatchReader`, guaranteeing a consistent schema across all 
produced batches.
+///
 #[derive(Debug)]
 pub struct Reader<R: BufRead> {
     reader: R,
@@ -613,17 +894,21 @@ pub struct Reader<R: BufRead> {
 }
 
 impl<R: BufRead> Reader<R> {
-    /// Return the Arrow schema discovered from the Avro file header
+    /// Returns the Arrow schema discovered from the Avro file header (or 
derived via
+    /// the optional reader schema).
     pub fn schema(&self) -> SchemaRef {
         self.decoder.schema()
     }
 
-    /// Return the Avro container-file header
+    /// Returns a reference to the parsed Avro container‑file header (magic, 
metadata, codec, sync).
     pub fn avro_header(&self) -> &Header {
         &self.header
     }
 
-    /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
+    /// Reads the next `RecordBatch` from the Avro file, or `Ok(None)` on EOF.
+    ///
+    /// Batches are bounded by `batch_size`; a single OCF block may yield 
multiple batches,
+    /// and a batch may also span multiple blocks.
     fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
         'outer: while !self.finished && !self.decoder.batch_is_full() {
             while self.block_cursor == self.block_data.len() {
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index e73b1050c7..511ba280f7 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -27,15 +27,15 @@ use std::collections::hash_map::Entry;
 use std::collections::{HashMap, HashSet};
 use strum_macros::AsRefStr;
 
-/// The metadata key used for storing the JSON encoded [`Schema`]
-pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
-
 /// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
 pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
 
 /// The Confluent "magic" byte (`0x00`)
 pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];
 
+/// The metadata key used for storing the JSON encoded [`Schema`]
+pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
+
 /// Metadata key used to represent Avro enum symbols in an Arrow schema.
 pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
 
@@ -51,6 +51,9 @@ pub const AVRO_NAMESPACE_METADATA_KEY: &str = 
"avro.namespace";
 /// Metadata key used to store the documentation for a type in an Avro schema.
 pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
 
+/// Default name for the root record in an Avro schema.
+pub const AVRO_ROOT_RECORD_DEFAULT_NAME: &str = "topLevelRecord";
+
 /// Compare two Avro schemas for equality (identical schemas).
 /// Returns true if the schemas have the same parsing canonical form (i.e., 
logically identical).
 pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result<bool, 
ArrowError> {
@@ -451,7 +454,7 @@ impl AvroSchema {
         let record_name = schema
             .metadata
             .get(AVRO_NAME_METADATA_KEY)
-            .map_or("topLevelRecord", |s| s.as_str());
+            .map_or(AVRO_ROOT_RECORD_DEFAULT_NAME, |s| s.as_str());
         let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
         record.insert("type".into(), Value::String("record".into()));
         record.insert(

Reply via email to