This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 3027dbc595 Follow-up on arrow-avro Documentation (#8402)
3027dbc595 is described below
commit 3027dbc595819763dc2bff74b024ce943c82ca06
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Sep 23 13:38:03 2025 -0500
Follow-up on arrow-avro Documentation (#8402)
# Which issue does this PR close?
- **Related to**: #4886 (“Add Avro Support”)
- **Follows-up** on https://github.com/apache/arrow-rs/pull/8316
# Rationale for this change
@alamb had some recommendations for improving the `arrow-avro`
documentation in #8316. This is a follow-up to address those
suggestions.
# What changes are included in this PR?
1. `lib.rs` documentation
2. `reader/mod.rs` improved documentation and inlined examples
3. `writer/mod.rs` improved documentation and inlined examples
**NOTE:** Some doc tests are temporarily ignored until
https://github.com/apache/arrow-rs/pull/8371 is merged in.
# Are these changes tested?
Yes, doc tests have been included which all run (with the exception of 3
ignored ones that will work soon)
<img width="1861" height="1027" alt="Screenshot 2025-09-22 at 3 36
02 AM"
src="https://github.com/user-attachments/assets/9dbec0bd-aae0-4655-ab9d-89f9b2fc4e9a"
/>
<img width="1878" height="1022" alt="Screenshot 2025-09-22 at 3 36
19 AM"
src="https://github.com/user-attachments/assets/44f59af8-bcdb-4526-a97d-8a3478ec356b"
/>
<img width="1900" height="1021" alt="Screenshot 2025-09-22 at 3 36
34 AM"
src="https://github.com/user-attachments/assets/ebda9db2-103f-4026-be78-66c6115b557c"
/>
# Are there any user-facing changes?
N/A
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-avro/examples/decode_stream.rs | 104 --------
arrow-avro/examples/read_avro_ocf.rs | 71 ------
arrow-avro/src/lib.rs | 195 ++++++++++++++-
arrow-avro/src/reader/mod.rs | 473 +++++++++++++++++++++++++++++------
arrow-avro/src/schema.rs | 33 ++-
arrow-avro/src/writer/mod.rs | 187 ++++++++++++--
6 files changed, 771 insertions(+), 292 deletions(-)
diff --git a/arrow-avro/examples/decode_stream.rs
b/arrow-avro/examples/decode_stream.rs
deleted file mode 100644
index fe13382d29..0000000000
--- a/arrow-avro/examples/decode_stream.rs
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Decode Avro **stream-framed** bytes into Arrow [`RecordBatch`]es.
-//!
-//! This example demonstrates how to:
-//! * Build a streaming `Decoder` via `ReaderBuilder::build_decoder`
-//! * Register a writer schema keyed by a **Single‑Object** Rabin fingerprint
-//! * Generate a few **Single‑Object** frames in‑memory and decode them
-
-use arrow_avro::reader::ReaderBuilder;
-use arrow_avro::schema::{AvroSchema, Fingerprint, SchemaStore,
SINGLE_OBJECT_MAGIC};
-
-fn encode_long(value: i64, out: &mut Vec<u8>) {
- let mut n = ((value << 1) ^ (value >> 63)) as u64;
- while (n & !0x7F) != 0 {
- out.push(((n as u8) & 0x7F) | 0x80);
- n >>= 7;
- }
- out.push(n as u8);
-}
-
-fn encode_len(len: usize, out: &mut Vec<u8>) {
- encode_long(len as i64, out)
-}
-
-fn encode_string(s: &str, out: &mut Vec<u8>) {
- encode_len(s.len(), out);
- out.extend_from_slice(s.as_bytes());
-}
-
-fn encode_user_body(id: i64, name: &str) -> Vec<u8> {
- let mut v = Vec::with_capacity(16 + name.len());
- encode_long(id, &mut v);
- encode_string(name, &mut v);
- v
-}
-
-// Frame a body as Avro Single‑Object: magic + 8-byte little‑endian
fingerprint + body
-fn frame_single_object(fp_rabin: u64, body: &[u8]) -> Vec<u8> {
- let mut out = Vec::with_capacity(2 + 8 + body.len());
- out.extend_from_slice(&SINGLE_OBJECT_MAGIC);
- out.extend_from_slice(&fp_rabin.to_le_bytes());
- out.extend_from_slice(body);
- out
-}
-
-fn main() -> Result<(), Box<dyn std::error::Error>> {
- // A tiny Avro writer schema used to generate a few messages
- let avro = AvroSchema::new(
- r#"{"type":"record","name":"User","fields":[
- {"name":"id","type":"long"},
- {"name":"name","type":"string"}]}"#
- .to_string(),
- );
-
- // Register the writer schema in a store (keyed by Rabin fingerprint).
- // Keep the fingerprint to seed the decoder and to frame generated
messages.
- let mut store = SchemaStore::new();
- let fp = store.register(avro.clone())?;
- let rabin = match fp {
- Fingerprint::Rabin(v) => v,
- _ => unreachable!("Single‑Object framing uses Rabin fingerprints"),
- };
-
- // Build a streaming decoder configured for Single‑Object framing.
- let mut decoder = ReaderBuilder::new()
- .with_writer_schema_store(store)
- .with_active_fingerprint(fp)
- .build_decoder()?;
-
- // Generate 5 Single‑Object frames for the "User" schema.
- let mut bytes = Vec::new();
- for i in 0..5 {
- let body = encode_user_body(i as i64, &format!("user-{i}"));
- bytes.extend_from_slice(&frame_single_object(rabin, &body));
- }
-
- // Feed all bytes at once, then flush completed batches.
- let _consumed = decoder.decode(&bytes)?;
- while let Some(batch) = decoder.flush()? {
- println!(
- "Batch: rows = {:>3}, cols = {}",
- batch.num_rows(),
- batch.num_columns()
- );
- }
-
- Ok(())
-}
diff --git a/arrow-avro/examples/read_avro_ocf.rs
b/arrow-avro/examples/read_avro_ocf.rs
deleted file mode 100644
index bf17ed572b..0000000000
--- a/arrow-avro/examples/read_avro_ocf.rs
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Read an Avro **Object Container File (OCF)** into Arrow [`RecordBatch`]
values.
-//!
-//! This example demonstrates how to:
-//! * Construct a [`Reader`] using [`ReaderBuilder::build`]
-//! * Iterate `RecordBatch`es and print a brief summary
-
-use std::fs::File;
-use std::io::BufReader;
-use std::path::PathBuf;
-
-use arrow_array::RecordBatch;
-use arrow_avro::reader::ReaderBuilder;
-
-fn main() -> Result<(), Box<dyn std::error::Error>> {
- let ocf_path: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
- .join("test")
- .join("data")
- .join("skippable_types.avro");
-
- let reader = BufReader::new(File::open(&ocf_path)?);
- // Build a high-level OCF Reader with default settings
- let avro_reader = ReaderBuilder::new().build(reader)?;
- let schema = avro_reader.schema();
- println!(
- "Discovered Arrow schema with {} fields",
- schema.fields().len()
- );
-
- let mut total_batches = 0usize;
- let mut total_rows = 0usize;
- let mut total_columns = schema.fields().len();
-
- for result in avro_reader {
- let batch: RecordBatch = result?;
- total_batches += 1;
- total_rows += batch.num_rows();
- total_columns = batch.num_columns();
-
- println!(
- "Batch {:>3}: rows = {:>6}, cols = {:>3}",
- total_batches,
- batch.num_rows(),
- batch.num_columns()
- );
- }
-
- println!();
- println!("Done.");
- println!(" Batches : {total_batches}");
- println!(" Rows : {total_rows}");
- println!(" Columns : {total_columns}");
-
- Ok(())
-}
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index 9367bc8efc..be8408a36d 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -15,9 +15,200 @@
// specific language governing permissions and limitations
// under the License.
-//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro]
+//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro].
//!
-//! [Apache Arrow]: https://arrow.apache.org
+//! This crate provides:
+//! - a [`reader`] that decodes Avro (Object Container Files, Avro
Single‑Object encoding,
+//! and Confluent Schema Registry wire format) into Arrow `RecordBatch`es,
+//! - and a [`writer`] that encodes Arrow `RecordBatch`es into Avro (OCF or
raw Avro binary).
+//!
+//! If you’re new to Arrow or Avro, see:
+//! - Arrow project site: <https://arrow.apache.org/>
+//! - Avro 1.11.1 specification:
<https://avro.apache.org/docs/1.11.1/specification/>
+//!
+//! ## Example: OCF (Object Container File) round‑trip
+//!
+//! The example below creates an Arrow table, writes an **Avro OCF** fully in
memory,
+//! and then reads it back. OCF is a self‑describing file format that embeds
the Avro
+//! schema in a header with optional compression and block sync markers.
+//! Spec:
<https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
+//!
+//! ```
+//! use std::io::Cursor;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AvroWriter;
+//! use arrow_avro::reader::ReaderBuilder;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Build a tiny Arrow batch
+//! let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
+//! let batch = RecordBatch::try_new(
+//! Arc::new(schema.clone()),
+//! vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+//! )?;
+//!
+//! // Write an Avro **Object Container File** (OCF) to a Vec<u8>
+//! let sink: Vec<u8> = Vec::new();
+//! let mut w = AvroWriter::new(sink, schema.clone())?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let bytes = w.into_inner();
+//! assert!(!bytes.is_empty());
+//!
+//! // Read it back
+//! let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
+//! let out = r.next().unwrap()?;
+//! assert_eq!(out.num_rows(), 3);
+//! # Ok(()) }
+//! ```
+//!
+//! ## Quickstart: Confluent wire‑format round‑trip *(runnable)*
+//!
+//! The **Confluent Schema Registry wire format** prefixes each Avro message
with a
+//! 1‑byte magic `0x00` and a **4‑byte big‑endian** schema ID, followed by the
Avro body.
+//! See:
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//!
+//! In this round‑trip, we:
+//! 1) Use `AvroStreamWriter` to create a **raw Avro body** for a single‑row
batch,
+//! 2) Wrap it with the Confluent prefix (magic and schema ID),
+//! 3) Decode it back to Arrow using a `Decoder` configured with a
`SchemaStore` that
+//! maps the schema ID to the Avro schema used by the writer.
+//!
+//! ```
+//! use std::collections::HashMap;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
+//! use arrow_avro::reader::ReaderBuilder;
+//! use arrow_avro::schema::{
+//! AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm,
+//! FingerprintStrategy, SCHEMA_METADATA_KEY
+//! };
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Writer schema registered under Schema Registry ID 1
+//! let avro_json = r#"{
+//! "type":"record","name":"User",
+//! "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]
+//! }"#;
+//!
+//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+//! let id: u32 = 1;
+//! store.set(Fingerprint::Id(id), AvroSchema::new(avro_json.to_string()))?;
+//!
+//! // Build an Arrow schema that references the same Avro JSON
+//! let mut md = HashMap::new();
+//! md.insert(SCHEMA_METADATA_KEY.to_string(), avro_json.to_string());
+//! let schema = Schema::new_with_metadata(
+//! vec![
+//! Field::new("id", DataType::Int64, false),
+//! Field::new("name", DataType::Utf8, false),
+//! ],
+//! md,
+//! );
+//!
+//! // One‑row batch: { id: 42, name: "alice" }
+//! let batch = RecordBatch::try_new(
+//! Arc::new(schema.clone()),
+//! vec![
+//! Arc::new(Int64Array::from(vec![42])) as ArrayRef,
+//! Arc::new(StringArray::from(vec!["alice"])) as ArrayRef,
+//! ],
+//! )?;
+//!
+//! // Stream‑write a single record, letting the writer add the **Confluent**
prefix.
+//! let sink: Vec<u8> = Vec::new();
+//! let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
+//! .with_fingerprint_strategy(FingerprintStrategy::Id(id))
+//! .build(sink)?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let frame = w.into_inner(); // already: 0x00 + 4B BE ID + Avro body
+//! assert!(frame.len() > 5);
+//!
+//! // Decode
+//! let mut dec = ReaderBuilder::new()
+//! .with_writer_schema_store(store)
+//! .build_decoder()?;
+//! dec.decode(&frame)?;
+//! let out = dec.flush()?.expect("one row");
+//! assert_eq!(out.num_rows(), 1);
+//! # Ok(()) }
+//! ```
+//!
+//! ## Quickstart: Avro Single‑Object Encoding round‑trip *(runnable)*
+//!
+//! Avro **Single‑Object Encoding (SOE)** wraps an Avro body with a 2‑byte
marker
+//! `0xC3 0x01` and an **8‑byte little‑endian CRC‑64‑AVRO Rabin fingerprint**
of the
+//! writer schema, then the Avro body. Spec:
+//! <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+//!
+//! This example registers the writer schema (computing a Rabin fingerprint),
writes a
+//! single‑row Avro body (using `AvroStreamWriter`), constructs the SOE frame,
and decodes it back to Arrow.
+//!
+//! ```
+//! use std::collections::HashMap;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
+//! use arrow_avro::reader::ReaderBuilder;
+//! use arrow_avro::schema::{AvroSchema, SchemaStore, FingerprintStrategy,
SCHEMA_METADATA_KEY};
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Writer schema: {
"type":"record","name":"User","fields":[{"name":"x","type":"long"}] }
+//! let writer_json =
r#"{"type":"record","name":"User","fields":[{"name":"x","type":"long"}]}"#;
+//! let mut store = SchemaStore::new(); // Rabin CRC‑64‑AVRO by default
+//! let _fp = store.register(AvroSchema::new(writer_json.to_string()))?;
+//!
+//! // Build an Arrow schema that references the same Avro JSON
+//! let mut md = HashMap::new();
+//! md.insert(SCHEMA_METADATA_KEY.to_string(), writer_json.to_string());
+//! let schema = Schema::new_with_metadata(
+//! vec![Field::new("x", DataType::Int64, false)],
+//! md,
+//! );
+//!
+//! // One‑row batch: { x: 7 }
+//! let batch = RecordBatch::try_new(
+//! Arc::new(schema.clone()),
+//! vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef],
+//! )?;
+//!
+//! // Stream‑write a single record; the writer adds **SOE** (C3 01 + Rabin)
automatically.
+//! let sink: Vec<u8> = Vec::new();
+//! let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
+//! .with_fingerprint_strategy(FingerprintStrategy::Rabin)
+//! .build(sink)?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let frame = w.into_inner(); // already: C3 01 + 8B LE Rabin + Avro body
+//! assert!(frame.len() > 10);
+//!
+//! // Decode
+//! let mut dec = ReaderBuilder::new()
+//! .with_writer_schema_store(store)
+//! .build_decoder()?;
+//! dec.decode(&frame)?;
+//! let out = dec.flush()?.expect("one row");
+//! assert_eq!(out.num_rows(), 1);
+//! # Ok(()) }
+//! ```
+//!
+//! ---
+//!
+//! ### Modules
+//!
+//! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es.
+//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent).
+//! - [`schema`]: Avro schema parsing / fingerprints / registries.
+//! - [`compression`]: codecs used for OCF blocks (i.e., Deflate, Snappy,
Zstandard).
+//! - [`codec`]: internal Avro↔Arrow type conversion and row decode/encode
plans.
+//!
+//! [Apache Arrow]: https://arrow.apache.org/
//! [Apache Avro]: https://avro.apache.org/
#: configures how Avro is
read (batch size, strict union handling,
//! string representation, reader schema, etc.) and produces either:
//! * a `Reader` for **Avro Object Container Files (OCF)** read from any
`BufRead`, or
//! * a low-level `Decoder` for **single‑object encoded** Avro bytes and
Confluent
//! **Schema Registry** framed messages.
-//! * `Reader`: a convenient, synchronous iterator over `RecordBatch` decoded
from an OCF
+//! * [`Reader`](crate::reader::Reader): a convenient, synchronous iterator
over `RecordBatch` decoded from an OCF
//! input. Implements [`Iterator<Item = Result<RecordBatch, ArrowError>>`]
and
//! `RecordBatchReader`.
-//! * `Decoder`: a push‑based row decoder that consumes raw Avro bytes and
yields ready
+//! * [`Decoder`](crate::reader::Decoder): a push‑based row decoder that
consumes raw Avro bytes and yields ready
//! `RecordBatch` values when batches fill. This is suitable for integrating
with async
//! byte streams, network protocols, or other custom data sources.
//!
@@ -37,45 +37,53 @@
//!
//! * **Object Container File (OCF)**: A self‑describing file format with a
header containing
//! the writer schema, optional compression codec, and a sync marker,
followed by one or
-//! more data blocks. Use `Reader` for this format. See the Avro
specification for the
-//! structure of OCF headers and blocks.
<https://avro.apache.org/docs/1.11.1/specification/>
+//! more data blocks. Use `Reader` for this format. See the Avro 1.11.1
specification
+//! (“Object Container Files”).
<https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
//! * **Single‑Object Encoding**: A stream‑friendly framing that prefixes each
record body with
-//! the 2‑byte magic `0xC3 0x01` followed by a schema fingerprint. Use
`Decoder` with a
-//! populated `SchemaStore` to resolve fingerprints to full
-//! schemas. <https://avro.apache.org/docs/1.11.1/specification/>
-//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a
4‑byte big‑endian
-//! schema ID, then the Avro‑encoded body. Use `Decoder` with a
-//! `SchemaStore` configured for `FingerprintAlgorithm::None`
-//! and entries keyed by `Fingerprint::Id`. Confluent docs
-//! describe this framing.
+//! the 2‑byte marker `0xC3 0x01` followed by the **8‑byte little‑endian
CRC‑64‑AVRO Rabin
+//! fingerprint** of the writer schema, then the Avro binary body. Use
`Decoder` with a
+//! populated `SchemaStore` to resolve fingerprints to full schemas.
+//! See “Single object encoding” in the Avro 1.11.1 spec.
+//!
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a
**4‑byte big‑endian**
+//! schema ID, then the Avro‑encoded body. Use `Decoder` with a
`SchemaStore` configured
+//! for `FingerprintAlgorithm::None` and entries keyed by `Fingerprint::Id`.
See
+//! Confluent’s “Wire format” documentation.
+//!
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
//!
//! ## Basic file usage (OCF)
//!
-//! Use `ReaderBuilder::build` to construct a `Reader` from any `BufRead`,
such as a
-//! `BufReader<File>`. The reader yields `RecordBatch` values you can iterate
over or collect.
+//! Use `ReaderBuilder::build` to construct a `Reader` from any `BufRead`. The
doctest below
+//! creates a tiny OCF in memory using `AvroWriter` and then reads it back.
//!
-//! ```no_run
-//! use std::fs::File;
-//! use std::io::BufReader;
-//! use arrow_array::RecordBatch;
+//! ```
+//! use std::io::Cursor;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AvroWriter;
//! use arrow_avro::reader::ReaderBuilder;
//!
-//! // Locate a test file (mirrors Arrow's test data layout)
-//! let path = "avro/alltypes_plain.avro";
-//! let path = std::env::var("ARROW_TEST_DATA")
-//! .map(|dir| format!("{dir}/{path}"))
-//! .unwrap_or_else(|_| format!("../testing/data/{path}"));
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Build a minimal Arrow schema and batch
+//! let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
+//! let batch = RecordBatch::try_new(
+//! Arc::new(schema.clone()),
+//! vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+//! )?;
//!
-//! let file = File::open(path).unwrap();
-//! let mut reader = ReaderBuilder::new().build(BufReader::new(file)).unwrap();
+//! // Write an Avro OCF to memory
+//! let buffer: Vec<u8> = Vec::new();
+//! let mut writer = AvroWriter::new(buffer, schema.clone())?;
+//! writer.write(&batch)?;
+//! writer.finish()?;
+//! let bytes = writer.into_inner();
//!
-//! // Iterate batches
-//! let mut num_rows = 0usize;
-//! while let Some(batch) = reader.next() {
-//! let batch: RecordBatch = batch.unwrap();
-//! num_rows += batch.num_rows();
-//! }
-//! println!("decoded {num_rows} rows");
+//! // Read it back with ReaderBuilder
+//! let mut reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
+//! let out = reader.next().unwrap()?;
+//! assert_eq!(out.num_rows(), 3);
+//! # Ok(()) }
//! ```
//!
//! ## Streaming usage (single‑object / Confluent)
@@ -88,7 +96,7 @@
//! `futures` utilities. Note: this is illustrative and keeps a single
in‑memory `Bytes`
//! buffer for simplicity—real applications typically maintain a rolling
buffer.
//!
-//! ```no_run
+//! ```
//! use bytes::{Buf, Bytes};
//! use futures::{Stream, StreamExt};
//! use std::task::{Poll, ready};
@@ -128,47 +136,298 @@
//! }
//! ```
//!
-//! ### Building a `Decoder` for **single‑object encoding** (Rabin
fingerprints)
+//! ### Building and using a `Decoder` for **single‑object encoding** (Rabin
fingerprints)
//!
-//! ```no_run
-//! use arrow_avro::schema::{AvroSchema, SchemaStore};
+//! The doctest below **writes** a single‑object framed record using the Avro
writer
+//! (no manual varints) for the writer schema
+//! (`{"type":"record","name":"User","fields":[{"name":"id","type":"long"}]}`)
+//! and then decodes it into a `RecordBatch`.
+//!
+//! ```
+//! use std::sync::Arc;
+//! use std::collections::HashMap;
+//! use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::schema::{AvroSchema, SchemaStore, SCHEMA_METADATA_KEY,
FingerprintStrategy};
+//! use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
//! use arrow_avro::reader::ReaderBuilder;
//!
-//! // Build a SchemaStore and register known writer schemas
-//! let mut store = SchemaStore::new(); // Rabin by default
-//! let user_schema =
AvroSchema::new(r#"{"type":"record","name":"User","fields":[
-//!
{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#.to_string());
-//! let _fp = store.register(user_schema).unwrap(); // computes Rabin
CRC-64-AVRO
-//!
-//! // Build a Decoder that expects single-object encoding (0xC3 0x01 +
fingerprint and body)
-//! let decoder = ReaderBuilder::new()
-//! .with_writer_schema_store(store)
-//! .with_batch_size(1024)
-//! .build_decoder()
-//! .unwrap();
-//! // Feed decoder with framed bytes (not shown; see `decode_stream` above).
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Register the writer schema (Rabin fingerprint by default).
+//! let mut store = SchemaStore::new();
+//! let avro_schema =
AvroSchema::new(r#"{"type":"record","name":"User","fields":[
+//! {"name":"id","type":"long"}]}"#.to_string());
+//! let _fp = store.register(avro_schema.clone())?;
+//!
+//! // Create a single-object framed record { id: 42 } with the Avro writer.
+//! let mut md = HashMap::new();
+//! md.insert(SCHEMA_METADATA_KEY.to_string(),
avro_schema.json_string.clone());
+//! let arrow = Schema::new_with_metadata(vec![Field::new("id",
DataType::Int64, false)], md);
+//! let batch = RecordBatch::try_new(
+//! Arc::new(arrow.clone()),
+//! vec![Arc::new(Int64Array::from(vec![42])) as ArrayRef],
+//! )?;
+//! let mut w = WriterBuilder::new(arrow)
+//! .with_fingerprint_strategy(FingerprintStrategy::Rabin) // SOE prefix
+//! .build::<_, AvroBinaryFormat>(Vec::new())?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let frame = w.into_inner(); // C3 01 + fp + Avro body
+//!
+//! // Decode with a `Decoder`
+//! let mut dec = ReaderBuilder::new()
+//! .with_writer_schema_store(store)
+//! .with_batch_size(1024)
+//! .build_decoder()?;
+//!
+//! dec.decode(&frame)?;
+//! let out = dec.flush()?.expect("one batch");
+//! assert_eq!(out.num_rows(), 1);
+//! # Ok(()) }
//! ```
//!
-//! ### Building a `Decoder` for **Confluent Schema Registry** framed messages
+//! See Avro 1.11.1 “Single object encoding” for details of the 2‑byte marker
+//! and little‑endian CRC‑64‑AVRO fingerprint:
+//! <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+//!
+//! ### Building and using a `Decoder` for **Confluent Schema Registry**
framing
+//!
+//! The Confluent wire format is: 1‑byte magic `0x00`, then a **4‑byte
big‑endian** schema ID,
+//! then the Avro body. The doctest below crafts two messages for the same
schema ID and
+//! decodes them into a single `RecordBatch` with two rows.
//!
-//! ```no_run
-//! use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint,
FingerprintAlgorithm};
+//! ```
+//! use std::sync::Arc;
+//! use std::collections::HashMap;
+//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint,
FingerprintAlgorithm, SCHEMA_METADATA_KEY, FingerprintStrategy};
+//! use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
//! use arrow_avro::reader::ReaderBuilder;
//!
-//! // Confluent wire format uses a magic 0x00 byte + 4-byte schema id
(big-endian).
-//! // Create a store keyed by `Fingerprint::Id` and pre-populate with known
schemas.
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Set up a store keyed by numeric IDs (Confluent).
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+//! let schema_id = 7u32;
+//! let avro_schema =
AvroSchema::new(r#"{"type":"record","name":"User","fields":[
+//! {"name":"id","type":"long"},
{"name":"name","type":"string"}]}"#.to_string());
+//! store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
+//!
+//! // Write two Confluent-framed messages {id:1,name:"a"} and {id:2,name:"b"}.
+//! fn msg(id: i64, name: &str, schema: &AvroSchema, schema_id: u32) ->
Result<Vec<u8>, Box<dyn std::error::Error>> {
+//! let mut md = HashMap::new();
+//! md.insert(SCHEMA_METADATA_KEY.to_string(), schema.json_string.clone());
+//! let arrow = Schema::new_with_metadata(
+//! vec![Field::new("id", DataType::Int64, false), Field::new("name",
DataType::Utf8, false)],
+//! md,
+//! );
+//! let batch = RecordBatch::try_new(
+//! Arc::new(arrow.clone()),
+//! vec![
+//! Arc::new(Int64Array::from(vec![id])) as ArrayRef,
+//! Arc::new(StringArray::from(vec![name])) as ArrayRef,
+//! ],
+//! )?;
+//! let mut w = WriterBuilder::new(arrow)
+//! .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) //
0x00 + ID + body
+//! .build::<_, AvroBinaryFormat>(Vec::new())?;
+//! w.write(&batch)?; w.finish()?;
+//! Ok(w.into_inner())
+//! }
+//! let m1 = msg(1, "a", &avro_schema, schema_id)?;
+//! let m2 = msg(2, "b", &avro_schema, schema_id)?;
+//!
+//! // Decode both into a single batch.
+//! let mut dec = ReaderBuilder::new()
+//! .with_writer_schema_store(store)
+//! .with_batch_size(1024)
+//! .build_decoder()?;
+//! dec.decode(&m1)?;
+//! dec.decode(&m2)?;
+//! let batch = dec.flush()?.expect("batch");
+//! assert_eq!(batch.num_rows(), 2);
+//! # Ok(()) }
+//! ```
+//!
+//! See Confluent’s “Wire format” notes: magic byte `0x00`, 4‑byte
**big‑endian** schema ID,
+//! then the Avro‑encoded payload.
+//!
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//!
+//! ## Schema resolution (reader vs. writer schemas)
+//!
+//! Avro supports resolving data written with one schema (“writer”) into
another (“reader”)
+//! using rules like **field aliases**, **default values**, and **numeric
promotions**.
+//! In practice this lets you evolve schemas over time while remaining
compatible with old data.
+//!
+//! *Spec background:* See Avro’s **Schema Resolution** (aliases, defaults)
and the Confluent
+//! **Wire format** (magic `0x00` + big‑endian schema id + Avro body).
+//! <https://avro.apache.org/docs/1.11.1/specification/#schema-resolution>
+//!
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//!
+//! ### OCF example: rename a field and add a default via a reader schema
+//!
+//! Below we write an OCF with a *writer schema* having fields `id: long`,
`name: string`.
+//! We then read it with a *reader schema* that:
+//! - **renames** `name` to `full_name` via `aliases`, and
+//! - **adds** `is_active: boolean` with a **default** value `true`.
+//!
+//! ```
+//! use std::io::Cursor;
+//! use std::sync::Arc;
+//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//! use arrow_avro::writer::AvroWriter;
+//! use arrow_avro::reader::ReaderBuilder;
+//! use arrow_avro::schema::AvroSchema;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Writer (past version): { id: long, name: string }
+//! let writer_arrow = Schema::new(vec![
+//! Field::new("id", DataType::Int64, false),
+//! Field::new("name", DataType::Utf8, false),
+//! ]);
+//! let batch = RecordBatch::try_new(
+//! Arc::new(writer_arrow.clone()),
+//! vec![
+//! Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
+//! Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+//! ],
+//! )?;
+//!
+//! // Write an OCF entirely in memory
+//! let mut w = AvroWriter::new(Vec::<u8>::new(), writer_arrow)?;
+//! w.write(&batch)?;
+//! w.finish()?;
+//! let bytes = w.into_inner();
+//!
+//! // Reader (current version):
+//! // - record name "topLevelRecord" matches the crate's default for OCF
+//! // - rename `name` -> `full_name` using aliases (optional)
+//! let reader_json = r#"
+//! {
+//! "type": "record",
+//! "name": "topLevelRecord",
+//! "fields": [
+//! { "name": "id", "type": "long" },
+//! { "name": "full_name", "type": ["null","string"], "aliases": ["name"],
"default": null },
+//! { "name": "is_active", "type": "boolean", "default": true }
+//! ]
+//! }"#;
//!
-//! // Suppose registry ID 42 corresponds to this Avro schema:
-//! let avro = AvroSchema::new(r#"{"type":"string"}"#.to_string());
-//! store.set(Fingerprint::Id(42), avro).unwrap();
+//! let mut reader = ReaderBuilder::new()
+//! .with_reader_schema(AvroSchema::new(reader_json.to_string()))
+//! .build(Cursor::new(bytes))?;
//!
-//! // Build a Decoder that understands Confluent framing
-//! let decoder = ReaderBuilder::new()
-//! .with_writer_schema_store(store)
-//! .build_decoder()
-//! .unwrap();
-//! // Feed decoder with 0x00 + [id:4] + Avro body frames.
+//! let out = reader.next().unwrap()?;
+//! assert_eq!(out.num_rows(), 2);
+//! # Ok(()) }
+//! ```
+//!
+//! ### Confluent single‑object example: resolve *past* writer versions to the
topic’s **current** reader schema
+//!
+//! In this scenario, the **reader schema** is the topic’s *current* schema,
while the two
+//! **writer schemas** registered under Confluent IDs **1** and **2**
represent *past versions*.
+//! The decoder uses the reader schema to resolve both versions.
+//!
+//! ```
+//! use std::sync::Arc;
+//! use std::collections::HashMap;
+//! use arrow_avro::reader::ReaderBuilder;
+//! use arrow_avro::schema::{
+//! AvroSchema, Fingerprint, FingerprintAlgorithm, SchemaStore,
+//! SCHEMA_METADATA_KEY, FingerprintStrategy,
+//! };
+//! use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray,
RecordBatch};
+//! use arrow_schema::{DataType, Field, Schema};
+//!
+//! fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Reader: current topic schema (no reader-added fields)
+//! // {"type":"record","name":"User","fields":[
+//! // {"name":"id","type":"long"},
+//! // {"name":"name","type":"string"}]}
+//! let reader_schema = AvroSchema::new(
+//! r#"{"type":"record","name":"User",
+//!
"fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#
+//! .to_string(),
+//! );
+//!
+//! // Register two *writer* schemas under Confluent IDs 0 and 1
+//! let writer_v0 = AvroSchema::new(
+//! r#"{"type":"record","name":"User",
+//!
"fields":[{"name":"id","type":"int"},{"name":"name","type":"string"}]}"#
+//! .to_string(),
+//! );
+//! let writer_v1 = AvroSchema::new(
+//! r#"{"type":"record","name":"User",
+//!
"fields":[{"name":"id","type":"long"},{"name":"name","type":"string"},
+//!
{"name":"email","type":["null","string"],"default":null}]}"#
+//! .to_string(),
+//! );
+//!
+//! let id_v0: u32 = 0;
+//! let id_v1: u32 = 1;
+//!
+//! let mut store =
SchemaStore::new_with_type(FingerprintAlgorithm::None); // integer IDs
+//! store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
+//! store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
+//!
+//! // Write two Confluent-framed messages using each writer version
+//! // frame0: writer v0 body {id:1001_i32, name:"v0-alice"}
+//! let mut md0 = HashMap::new();
+//! md0.insert(SCHEMA_METADATA_KEY.to_string(),
writer_v0.json_string.clone());
+//! let arrow0 = Schema::new_with_metadata(
+//! vec![Field::new("id", DataType::Int32, false),
+//! Field::new("name", DataType::Utf8, false)], md0);
+//! let batch0 = RecordBatch::try_new(
+//! Arc::new(arrow0.clone()),
+//! vec![Arc::new(Int32Array::from(vec![1001])) as ArrayRef,
+//! Arc::new(StringArray::from(vec!["v0-alice"])) as ArrayRef])?;
+//! let mut w0 = arrow_avro::writer::WriterBuilder::new(arrow0)
+//! .with_fingerprint_strategy(FingerprintStrategy::Id(id_v0))
+//! .build::<_,
arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?;
+//! w0.write(&batch0)?; w0.finish()?;
+//! let frame0 = w0.into_inner(); // 0x00 + id_v0 + body
+//!
+//! // frame1: writer v1 body {id:2002_i64, name:"v1-bob", email:
Some("[email protected]")}
+//! let mut md1 = HashMap::new();
+//! md1.insert(SCHEMA_METADATA_KEY.to_string(),
writer_v1.json_string.clone());
+//! let arrow1 = Schema::new_with_metadata(
+//! vec![Field::new("id", DataType::Int64, false),
+//! Field::new("name", DataType::Utf8, false),
+//! Field::new("email", DataType::Utf8, true)], md1);
+//! let batch1 = RecordBatch::try_new(
+//! Arc::new(arrow1.clone()),
+//! vec![Arc::new(Int64Array::from(vec![2002])) as ArrayRef,
+//! Arc::new(StringArray::from(vec!["v1-bob"])) as ArrayRef,
+//! Arc::new(StringArray::from(vec![Some("[email protected]")]))
as ArrayRef])?;
+//! let mut w1 = arrow_avro::writer::WriterBuilder::new(arrow1)
+//! .with_fingerprint_strategy(FingerprintStrategy::Id(id_v1))
+//! .build::<_,
arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?;
+//! w1.write(&batch1)?; w1.finish()?;
+//! let frame1 = w1.into_inner(); // 0x00 + id_v1 + body
+//!
+//! // Build a streaming Decoder that understands Confluent framing
+//! let mut decoder = ReaderBuilder::new()
+//! .with_reader_schema(reader_schema)
+//! .with_writer_schema_store(store)
+//! .with_batch_size(8) // small demo batches
+//! .build_decoder()?;
+//!
+//! // Decode each whole frame, then drain completed rows with flush()
+//! let mut total_rows = 0usize;
+//!
+//! let consumed0 = decoder.decode(&frame0)?;
+//! assert_eq!(consumed0, frame0.len(), "decoder must consume the whole
frame");
+//! while let Some(batch) = decoder.flush()? { total_rows +=
batch.num_rows(); }
+//!
+//! let consumed1 = decoder.decode(&frame1)?;
+//! assert_eq!(consumed1, frame1.len(), "decoder must consume the whole
frame");
+//! while let Some(batch) = decoder.flush()? { total_rows +=
batch.num_rows(); }
+//!
+//! // We sent 2 records so we should get 2 rows (possibly one per flush)
+//! assert_eq!(total_rows, 2);
+//! Ok(())
+//! }
//! ```
//!
//! ## Schema evolution and batch boundaries
@@ -191,7 +450,7 @@
//! amortize per‑batch overhead; smaller batches reduce peak memory usage
and latency.
//! * When `utf8_view` is enabled, string columns use Arrow’s
`StringViewArray`, which can
//! reduce allocations for short strings.
-//! * For OCF, blocks may be compressed `Reader` will decompress using the
codec specified
+//! * For OCF, blocks may be compressed; `Reader` will decompress using the
codec specified
//! in the file header and feed uncompressed bytes to the row `Decoder`.
//!
//! ## Error handling
@@ -242,7 +501,6 @@ fn read_header<R: BufRead>(mut reader: R) -> Result<Header,
ArrowError> {
})
}
-// NOTE: The Current ` is_incomplete_data ` below is temporary and will be
improved prior to public release
fn is_incomplete_data(err: &ArrowError) -> bool {
matches!(
err,
@@ -287,40 +545,91 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
///
/// ### Examples
///
-/// Build a `Decoder` for single‑object encoding using a `SchemaStore` with
Rabin fingerprints:
+/// Build and use a `Decoder` for single‑object encoding:
///
-/// ```no_run
+/// ```
/// use arrow_avro::schema::{AvroSchema, SchemaStore};
/// use arrow_avro::reader::ReaderBuilder;
///
-/// let mut store = SchemaStore::new(); // Rabin by default
-/// let avro = AvroSchema::new(r#""string""#.to_string());
-/// let _fp = store.register(avro).unwrap();
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// // Use a record schema at the top level so we can build an Arrow
RecordBatch
+/// let mut store = SchemaStore::new(); // Rabin fingerprinting by default
+/// let avro = AvroSchema::new(
+///
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()
+/// );
+/// let fp = store.register(avro)?;
+///
+/// // --- Hidden: write a single-object framed row {x:7} ---
+/// # use std::sync::Arc;
+/// # use std::collections::HashMap;
+/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+/// # use arrow_schema::{DataType, Field, Schema};
+/// # use arrow_avro::schema::{SCHEMA_METADATA_KEY, FingerprintStrategy};
+/// # use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
+/// # let mut md = HashMap::new();
+/// # md.insert(SCHEMA_METADATA_KEY.to_string(),
+/// #
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string());
+/// # let arrow = Schema::new_with_metadata(vec![Field::new("x",
DataType::Int64, false)], md);
+/// # let batch = RecordBatch::try_new(Arc::new(arrow.clone()),
vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef])?;
+/// # let mut w = WriterBuilder::new(arrow)
+/// # .with_fingerprint_strategy(fp.into())
+/// # .build::<_, AvroBinaryFormat>(Vec::new())?;
+/// # w.write(&batch)?; w.finish()?; let frame = w.into_inner();
///
/// let mut decoder = ReaderBuilder::new()
/// .with_writer_schema_store(store)
-/// .with_batch_size(512)
-/// .build_decoder()
-/// .unwrap();
+/// .with_batch_size(16)
+/// .build_decoder()?;
///
-/// // Feed bytes (framed as 0xC3 0x01 + fingerprint and body)
-/// // decoder.decode(&bytes)?;
-/// // if let Some(batch) = decoder.flush()? { /* process */ }
+/// # decoder.decode(&frame)?;
+/// let batch = decoder.flush()?.expect("one row");
+/// assert_eq!(batch.num_rows(), 1);
+/// # Ok(()) }
/// ```
///
-/// Build a `Decoder` for Confluent Registry messages (magic 0x00 + 4‑byte id):
+/// *Background:* Avro's single‑object encoding is defined as `0xC3 0x01` +
8‑byte
+/// little‑endian CRC‑64‑AVRO fingerprint of the **writer schema** + Avro
binary body.
+/// See the Avro 1.11.1 spec for details.
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
///
-/// ```no_run
+/// Build and use a `Decoder` for Confluent Registry messages:
+///
+/// ```
/// use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint,
FingerprintAlgorithm};
/// use arrow_avro::reader::ReaderBuilder;
///
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
-/// store.set(Fingerprint::Id(7),
AvroSchema::new(r#""long""#.to_string())).unwrap();
+/// store.set(Fingerprint::Id(1234),
AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;
+///
+/// // --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---
+/// # use std::sync::Arc;
+/// # use std::collections::HashMap;
+/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+/// # use arrow_schema::{DataType, Field, Schema};
+/// # use arrow_avro::schema::{SCHEMA_METADATA_KEY, FingerprintStrategy};
+/// # use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
+/// # fn msg(x: i64) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
+/// # let mut md = HashMap::new();
+/// # md.insert(SCHEMA_METADATA_KEY.to_string(),
+/// #
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string());
+/// # let arrow = Schema::new_with_metadata(vec![Field::new("x",
DataType::Int64, false)], md);
+/// # let batch = RecordBatch::try_new(Arc::new(arrow.clone()),
vec![Arc::new(Int64Array::from(vec![x])) as ArrayRef])?;
+/// # let mut w = WriterBuilder::new(arrow)
+/// # .with_fingerprint_strategy(FingerprintStrategy::Id(1234))
+/// # .build::<_, AvroBinaryFormat>(Vec::new())?;
+/// # w.write(&batch)?; w.finish()?; Ok(w.into_inner())
+/// # }
+/// # let m1 = msg(1)?;
+/// # let m2 = msg(2)?;
///
/// let mut decoder = ReaderBuilder::new()
/// .with_writer_schema_store(store)
-/// .build_decoder()
-/// .unwrap();
+/// .build_decoder()?;
+/// # decoder.decode(&m1)?;
+/// # decoder.decode(&m2)?;
+/// let batch = decoder.flush()?.expect("two rows");
+/// assert_eq!(batch.num_rows(), 2);
+/// # Ok(()) }
/// ```
#[derive(Debug)]
pub struct Decoder {
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 42c6d8a6c3..3fdfbda1da 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -345,27 +345,19 @@ impl AvroSchema {
Self { json_string }
}
- /// Deserializes and returns the `AvroSchema`.
- ///
- /// The returned schema borrows from `self`.
- pub fn schema(&self) -> Result<Schema<'_>, ArrowError> {
+ pub(crate) fn schema(&self) -> Result<Schema<'_>, ArrowError> {
serde_json::from_str(self.json_string.as_str())
.map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema
JSON: {e}")))
}
- /// Returns the fingerprint of the schema.
- pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) ->
Result<Fingerprint, ArrowError> {
- Self::generate_fingerprint(&self.schema()?, hash_type)
- }
-
- /// Generates a fingerprint for the given `Schema` using the specified
[`FingerprintAlgorithm`].
+ /// Returns the fingerprint of the schema, computed using the specified
[`FingerprintAlgorithm`].
///
/// The fingerprint is computed over the schema's Parsed Canonical Form
/// as defined by the Avro specification. Depending on `hash_type`, this
/// will return one of the supported [`Fingerprint`] variants:
/// - [`Fingerprint::Rabin`] for [`FingerprintAlgorithm::Rabin`]
- /// - [`Fingerprint::MD5`] for [`FingerprintAlgorithm::MD5`]
- /// - [`Fingerprint::SHA256`] for [`FingerprintAlgorithm::SHA256`]
+ /// - `Fingerprint::MD5` for `FingerprintAlgorithm::MD5`
+ /// - `Fingerprint::SHA256` for `FingerprintAlgorithm::SHA256`
///
/// Note: [`FingerprintAlgorithm::None`] cannot be used to generate a
fingerprint
/// and will result in an error. If you intend to use a Schema Registry
ID-based
@@ -375,18 +367,21 @@ impl AvroSchema {
/// See also:
<https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
///
/// # Errors
- /// Returns an error if generating the canonical form of the schema fails,
- /// or if `hash_type` is [`FingerprintAlgorithm::None`].
+ /// Returns an error if deserializing the schema fails, if generating the
+ /// canonical form of the schema fails, or if `hash_type` is
[`FingerprintAlgorithm::None`].
///
/// # Examples
- /// ```no_run
+ /// ```
/// use arrow_avro::schema::{AvroSchema, FingerprintAlgorithm};
///
/// let avro = AvroSchema::new("\"string\"".to_string());
- /// let schema = avro.schema().unwrap();
- /// let fp = AvroSchema::generate_fingerprint(&schema,
FingerprintAlgorithm::Rabin).unwrap();
+ /// let fp = avro.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
/// ```
- pub fn generate_fingerprint(
+ pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) ->
Result<Fingerprint, ArrowError> {
+ Self::generate_fingerprint(&self.schema()?, hash_type)
+ }
+
+ pub(crate) fn generate_fingerprint(
schema: &Schema,
hash_type: FingerprintAlgorithm,
) -> Result<Fingerprint, ArrowError> {
@@ -432,7 +427,7 @@ impl AvroSchema {
/// Avro specification.
///
///
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
- pub fn generate_canonical_form(schema: &Schema) -> Result<String,
ArrowError> {
+ pub(crate) fn generate_canonical_form(schema: &Schema) -> Result<String,
ArrowError> {
build_canonical(schema, None)
}
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index 7a7b0d2837..ad104f93b8 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -19,19 +19,44 @@
//!
//! # Overview
//!
-//! * Use **`AvroWriter`** (Object Container File) when you want a
-//! self‑contained Avro file with header, schema JSON, optional
compression,
-//! blocks, and sync markers.
-//! * Use **`AvroStreamWriter`** (raw binary stream) when you already know
the
-//! schema out‑of‑band (i.e., via a schema registry) and need a stream
-//! of Avro‑encoded records with minimal framing.
+//! Use this module to serialize Arrow `RecordBatch` values into Avro. Two
output
+//! formats are supported:
//!
-
-/// Encodes `RecordBatch` into the Avro binary format.
-pub mod encoder;
-/// Logic for different Avro container file formats.
-pub mod format;
-
+//! * **[`AvroWriter`](crate::writer::AvroWriter)** — writes an **Object
Container File (OCF)**: a self‑describing
+//! file with header (schema JSON + metadata), optional compression, data
blocks, and
+//! sync markers. See Avro 1.11.1 “Object Container Files.”
+//!
<https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
+//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a
**raw Avro binary stream** (“datum” bytes) without
+//! any container framing. This is useful when the schema is known
out‑of‑band (i.e.,
+//! via a registry) and you want minimal overhead.
+//!
+//! ## Which format should I use?
+//!
+//! * Use **OCF** when you need a portable, self‑contained file. The schema
travels with
+//! the data, making it easy to read elsewhere.
+//! * Use the **raw stream** when your surrounding protocol supplies schema
information
+//! (i.e., a schema registry). If you need **single‑object encoding (SOE)**
or Confluent
+//! **Schema Registry** framing, you must add the appropriate prefix
*outside* this writer:
+//! - **SOE**: `0xC3 0x01` + 8‑byte little‑endian CRC‑64‑AVRO fingerprint +
Avro body
+//! (see Avro 1.11.1 “Single object encoding”).
+//!
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
+//! - **Confluent wire format**: magic `0x00` + **big‑endian** 4‑byte schema
ID and Avro body.
+//!
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//!
+//! ## Choosing the Avro schema
+//!
+//! By default, the writer converts your Arrow schema to Avro (including a
top‑level record
+//! name) and stores the resulting JSON under the `avro::schema` metadata key.
If you already
+//! have an Avro schema JSON, you want to use verbatim, put it into the Arrow
schema metadata
+//! under the same key before constructing the writer. The builder will pick
it up.
+//!
+//! ## Compression
+//!
+//! For OCF, you may enable a compression codec via
`WriterBuilder::with_compression`. The
+//! chosen codec is written into the file header and used for subsequent
blocks. Raw stream
+//! writing doesn’t apply container‑level compression.
+//!
+//! ---
use crate::codec::AvroFieldBuilder;
use crate::compression::CompressionCodec;
use crate::schema::{
@@ -44,6 +69,11 @@ use arrow_schema::{ArrowError, Schema};
use std::io::Write;
use std::sync::Arc;
+/// Encodes `RecordBatch` into the Avro binary format.
+pub mod encoder;
+/// Logic for different Avro container file formats.
+pub mod format;
+
/// Builder to configure and create a `Writer`.
#[derive(Debug, Clone)]
pub struct WriterBuilder {
@@ -55,6 +85,11 @@ pub struct WriterBuilder {
impl WriterBuilder {
/// Create a new builder with default settings.
+ ///
+ /// The Avro schema used for writing is determined as follows:
+ /// 1) If the Arrow schema metadata contains `avro::schema` (see
`SCHEMA_METADATA_KEY`),
+ /// that JSON is used verbatim.
+ /// 2) Otherwise, the Arrow schema is converted to an Avro record schema.
pub fn new(schema: Schema) -> Self {
Self {
schema,
@@ -95,7 +130,6 @@ impl WriterBuilder {
Some(json) => AvroSchema::new(json.clone()),
None => AvroSchema::try_from(&self.schema)?,
};
-
let maybe_fingerprint = if F::NEEDS_PREFIX {
match self.fingerprint_strategy {
Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
@@ -110,7 +144,6 @@ impl WriterBuilder {
} else {
None
};
-
let mut md = self.schema.metadata().clone();
md.insert(
SCHEMA_METADATA_KEY.to_string(),
@@ -134,6 +167,12 @@ impl WriterBuilder {
}
/// Generic Avro writer.
+///
+/// This type is generic over the output Write sink (`W`) and the Avro format
(`F`).
+/// You’ll usually use the concrete aliases:
+///
+/// * **[`AvroWriter`]** for **OCF** (self‑describing container file)
+/// * **[`AvroStreamWriter`]** for **raw** Avro binary streams
#[derive(Debug)]
pub struct Writer<W: Write, F: AvroFormat> {
writer: W,
@@ -145,12 +184,105 @@ pub struct Writer<W: Write, F: AvroFormat> {
}
/// Alias for an Avro **Object Container File** writer.
+///
+/// ### Quickstart (runnable)
+///
+/// ```
+/// use std::io::Cursor;
+/// use std::sync::Arc;
+/// use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
+/// use arrow_schema::{DataType, Field, Schema};
+/// use arrow_avro::writer::AvroWriter;
+/// use arrow_avro::reader::ReaderBuilder;
+///
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// // Writer schema: { id: long, name: string }
+/// let writer_schema = Schema::new(vec![
+/// Field::new("id", DataType::Int64, false),
+/// Field::new("name", DataType::Utf8, false),
+/// ]);
+///
+/// // Build a RecordBatch with two rows
+/// let batch = RecordBatch::try_new(
+/// Arc::new(writer_schema.clone()),
+/// vec![
+/// Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
+/// Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
+/// ],
+/// )?;
+///
+/// // Write an Avro **Object Container File** (OCF) to memory
+/// let mut w = AvroWriter::new(Vec::<u8>::new(), writer_schema.clone())?;
+/// w.write(&batch)?;
+/// w.finish()?;
+/// let bytes = w.into_inner();
+///
+/// // Build a Reader and decode the batch back
+/// let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
+/// let out = r.next().unwrap()?;
+/// assert_eq!(out.num_rows(), 2);
+/// # Ok(()) }
+/// ```
pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
+
/// Alias for a raw Avro **binary stream** writer.
+///
+/// ### Example
+///
+/// This writes only the **Avro body** bytes — no OCF header/sync and no
+/// single‑object or Confluent framing. If you need those frames, add them
externally.
+///
+/// ```
+/// use std::sync::Arc;
+/// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+/// use arrow_schema::{DataType, Field, Schema};
+/// use arrow_avro::writer::AvroStreamWriter;
+///
+/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+/// // One‑column Arrow batch
+/// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
+/// let batch = RecordBatch::try_new(
+/// Arc::new(schema.clone()),
+/// vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
+/// )?;
+///
+/// // Write a raw Avro stream to a Vec<u8>
+/// let sink: Vec<u8> = Vec::new();
+/// let mut w = AvroStreamWriter::new(sink, schema)?;
+/// w.write(&batch)?;
+/// w.finish()?;
+/// let bytes = w.into_inner();
+/// assert!(!bytes.is_empty());
+/// # Ok(()) }
+/// ```
pub type AvroStreamWriter<W> = Writer<W, AvroBinaryFormat>;
impl<W: Write> Writer<W, AvroOcfFormat> {
/// Convenience constructor – same as [`WriterBuilder::build`] with
`AvroOcfFormat`.
+ ///
+ /// ### Example
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::AvroWriter;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("id", DataType::Int32,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+ /// )?;
+ ///
+ /// let buf: Vec<u8> = Vec::new();
+ /// let mut w = AvroWriter::new(buf, schema)?;
+ /// w.write(&batch)?;
+ /// w.finish()?;
+ /// let bytes = w.into_inner();
+ /// assert!(!bytes.is_empty());
+ /// # Ok(()) }
+ /// ```
pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
}
@@ -163,6 +295,33 @@ impl<W: Write> Writer<W, AvroOcfFormat> {
impl<W: Write> Writer<W, AvroBinaryFormat> {
/// Convenience constructor to create a new [`AvroStreamWriter`].
+ ///
+ /// The resulting stream contains just **Avro binary** bodies (no OCF
header/sync and no
+ /// single‑object or Confluent framing). If you need those frames, add
them externally.
+ ///
+ /// ### Example
+ ///
+ /// ```
+ /// use std::sync::Arc;
+ /// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+ /// use arrow_schema::{DataType, Field, Schema};
+ /// use arrow_avro::writer::AvroStreamWriter;
+ ///
+ /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+ /// let schema = Schema::new(vec![Field::new("x", DataType::Int64,
false)]);
+ /// let batch = RecordBatch::try_new(
+ /// Arc::new(schema.clone()),
+ /// vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
+ /// )?;
+ ///
+ /// let sink: Vec<u8> = Vec::new();
+ /// let mut w = AvroStreamWriter::new(sink, schema)?;
+ /// w.write(&batch)?;
+ /// w.finish()?;
+ /// let bytes = w.into_inner();
+ /// assert!(!bytes.is_empty());
+ /// # Ok(()) }
+ /// ```
pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
WriterBuilder::new(schema).build::<W, AvroBinaryFormat>(writer)
}