scovich commented on code in PR #8274:
URL: https://github.com/apache/arrow-rs/pull/8274#discussion_r2318689900
##########
arrow-avro/src/schema.rs:
##########
@@ -790,12 +856,35 @@ fn merge_extras(schema: Value, mut extras:
JsonMap<String, Value>) -> Value {
}
}
-// Convert an Arrow `DataType` into an Avro schema `Value`.
+fn wrap_nullable(inner: Value, order: Nullability) -> Value {
+ match order {
+ Nullability::NullFirst =>
Value::Array(vec![Value::String("null".into()), inner]),
+ Nullability::NullSecond => Value::Array(vec![inner,
Value::String("null".into())]),
+ }
Review Comment:
nit: Worthwhile to sacrifice a couple lines to reduce redundancy?
```suggestion
let null = Value::String("null".into());
let elements = match order {
Nullability::NullFirst => vec![null, inner],
Nullability::NullSecond => vec![inner, null],
};
Value::Array(elements)
```
##########
arrow-avro/src/schema.rs:
##########
@@ -370,6 +371,49 @@ impl AvroSchema {
pub fn fingerprint(&self) -> Result<Fingerprint, ArrowError> {
generate_fingerprint_rabin(&self.schema()?)
}
+
+ /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given
null‑union order.
+ ///
+ /// If the input Arrow schema already contains Avro JSON in
+ /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
+ /// the exact header encoding alignment; otherwise, a new JSON is
generated
+ /// honoring `null_union_order` at **all nullable sites**.
+ pub fn from_arrow_with_options(
+ schema: &ArrowSchema,
+ null_union_order: Option<Nullability>,
+ ) -> Result<AvroSchema, ArrowError> {
+ if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
+ return Ok(AvroSchema::new(json.clone()));
+ }
+ let order = null_union_order.unwrap_or(Nullability::NullFirst);
+ let mut name_gen = NameGenerator::default();
+ let fields_json = schema
+ .fields()
+ .iter()
+ .map(|f| arrow_field_to_avro_with_order(f, &mut name_gen, order))
+ .collect::<Result<Vec<_>, _>>()?;
+ let record_name = schema
+ .metadata
+ .get(AVRO_NAME_METADATA_KEY)
+ .map_or("topLevelRecord", |s| s.as_str());
Review Comment:
aside: Is this a well-known default name? Or just an arbitrary naming choice
by this package? And does it actually matter in practice? (I guess if it
mattered, the `schema` metadata would say so)?
##########
arrow-avro/src/schema.rs:
##########
@@ -790,12 +856,35 @@ fn merge_extras(schema: Value, mut extras:
JsonMap<String, Value>) -> Value {
}
}
-// Convert an Arrow `DataType` into an Avro schema `Value`.
+fn wrap_nullable(inner: Value, order: Nullability) -> Value {
+ match order {
+ Nullability::NullFirst =>
Value::Array(vec![Value::String("null".into()), inner]),
+ Nullability::NullSecond => Value::Array(vec![inner,
Value::String("null".into())]),
+ }
+}
+
fn datatype_to_avro(
dt: &DataType,
field_name: &str,
metadata: &HashMap<String, String>,
name_gen: &mut NameGenerator,
+) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
+ datatype_to_avro_with_order(dt, field_name, metadata, name_gen,
Nullability::NullFirst)
+}
+
+fn arrow_field_to_avro(
+ field: &ArrowField,
+ name_gen: &mut NameGenerator,
+) -> Result<Value, ArrowError> {
+ arrow_field_to_avro_with_order(field, name_gen, Nullability::NullFirst)
+}
Review Comment:
Why not define this down below, similar to what you did here with
`data_type_to_avro[_with_order]`?
##########
arrow-avro/src/schema.rs:
##########
@@ -370,6 +371,49 @@ impl AvroSchema {
pub fn fingerprint(&self) -> Result<Fingerprint, ArrowError> {
generate_fingerprint_rabin(&self.schema()?)
}
+
+ /// Build Avro JSON from an Arrow [`ArrowSchema`], applying the given
null‑union order.
+ ///
+ /// If the input Arrow schema already contains Avro JSON in
+ /// [`SCHEMA_METADATA_KEY`], that JSON is returned verbatim to preserve
+ /// the exact header encoding alignment; otherwise, a new JSON is
generated
+ /// honoring `null_union_order` at **all nullable sites**.
+ pub fn from_arrow_with_options(
+ schema: &ArrowSchema,
+ null_union_order: Option<Nullability>,
+ ) -> Result<AvroSchema, ArrowError> {
+ if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
+ return Ok(AvroSchema::new(json.clone()));
+ }
+ let order = null_union_order.unwrap_or(Nullability::NullFirst);
Review Comment:
I see at least three places that use `Nullability::NullFirst` as a default
when no order was requested. Would it make sense to derive `Default` for the
enum to make this decision publicly explicit?
```suggestion
let order = null_union_order.unwrap_or_default();
```
##########
arrow-avro/src/writer/format.rs:
##########
@@ -44,24 +43,6 @@ pub trait AvroFormat: Debug + Default {
#[derive(Debug, Default)]
pub struct AvroOcfFormat {
sync_marker: [u8; 16],
- /// Optional encoder behavior hints to keep file header schema ordering
- /// consistent with value encoding (e.g. Impala null-second).
- encoder_options: EncoderOptions,
-}
-
-impl AvroOcfFormat {
- /// Optional helper to attach encoder options (i.e., Impala null-second)
to the format.
- #[allow(dead_code)]
- pub fn with_encoder_options(mut self, opts: EncoderOptions) -> Self {
- self.encoder_options = opts;
- self
- }
-
- /// Access the options used by this format.
- #[allow(dead_code)]
- pub fn encoder_options(&self) -> &EncoderOptions {
- &self.encoder_options
- }
Review Comment:
This code is removed in favor of the new approach, correct?
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,48 +85,211 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ // For NullFirst: null => 0x00, value => 0x02
+ // For NullSecond: value => 0x00, null => 0x02
+ let byte = match order {
+ Nullability::NullFirst => {
+ if is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+ }
+ Nullability::NullSecond => {
+ if is_null {
+ 0x02
+ } else {
+ 0x00
+ }
+ }
Review Comment:
nit: is this cleaner?
```suggestion
let byte = match (order, is_null) {
(Nullability::NullFirst, true) | (Nullablility::NullSecond, false)
=> 0x00,
(Nullability::NullFirst, false) | (Nullability::NullSecond, true) =>
0x02,
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -17,25 +17,26 @@
//! Avro Encoder for Arrow types.
+use crate::codec::{AvroDataType, AvroField, Codec};
+use crate::schema::Nullability;
use arrow_array::cast::AsArray;
use arrow_array::types::{
- ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type,
TimestampMicrosecondType,
+ ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type,
IntervalMonthDayNanoType,
+ TimestampMicrosecondType,
+};
+use arrow_array::{
+ Array, Decimal128Array, Decimal256Array, Decimal32Array, Decimal64Array,
DictionaryArray,
+ FixedSizeBinaryArray, GenericBinaryArray, GenericListArray,
GenericStringArray, LargeListArray,
+ ListArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
StringArray, StructArray,
};
-use arrow_array::OffsetSizeTrait;
-use arrow_array::{Array, GenericBinaryArray, PrimitiveArray, RecordBatch};
use arrow_buffer::NullBuffer;
-use arrow_schema::{ArrowError, DataType, FieldRef, TimeUnit};
+use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as
ArrowSchema, TimeUnit};
use std::io::Write;
+use std::sync::Arc;
+use uuid::Uuid;
-/// Behavior knobs for the Avro encoder.
-///
-/// When `impala_mode` is `true`, optional/nullable values are encoded
-/// as Avro unions with **null second** (`[T, "null"]`). When `false`
-/// (default), we use **null first** (`["null", T]`).
-#[derive(Debug, Clone, Copy, Default)]
-pub struct EncoderOptions {
- impala_mode: bool, // Will be fully implemented in a follow-up PR
-}
+/// Plan reference passed to the unified encoder constructor (required).
+type PlanRef<'p> = &'p FieldPlan;
Review Comment:
I'm not sure this is a helpful type alias? It only has two use sites, both
as function arg types with elided lifetimes. With the alias they look like
this, because lifetime elision can't fully simplify the signature:
```rust
plan: PlanRef<'_>,
```
vs. without the alias and complete lifetime elision:
```rust
plan: &Plan,
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -275,3 +384,434 @@ impl F64Encoder<'_> {
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+/// Unified field encoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Tracks the column/site null buffer and whether any nulls exist
+/// - Carries per-site Avro `Nullability` and precomputed union branch (fast
path)
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ nulls: Option<NullBuffer>,
+ has_nulls: bool,
+ nullability: Option<Nullability>,
+ /// Precomputed constant branch byte if the site is nullable but contains
no nulls
+ pre: Option<u8>,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: PlanRef<'_>,
+ ) -> Result<Self, ArrowError> {
+ let nulls = array.nulls().cloned();
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ Ok(Self {
+ encoder,
+ nulls,
+ has_nulls,
+ nullability: None,
+ pre: None,
+ })
+ }
+
+ #[inline]
Review Comment:
I see a lot of these `#[inline]` markers, but the compiler already inlines
code pretty aggressively.
Do we have some evidence that these markers actually improve performance?
##########
arrow-avro/src/schema.rs:
##########
@@ -790,12 +856,35 @@ fn merge_extras(schema: Value, mut extras:
JsonMap<String, Value>) -> Value {
}
}
-// Convert an Arrow `DataType` into an Avro schema `Value`.
+fn wrap_nullable(inner: Value, order: Nullability) -> Value {
+ match order {
+ Nullability::NullFirst =>
Value::Array(vec![Value::String("null".into()), inner]),
+ Nullability::NullSecond => Value::Array(vec![inner,
Value::String("null".into())]),
+ }
+}
+
fn datatype_to_avro(
Review Comment:
Do we need to preserve existing callsites after this PR? Or can we keep the
nice name and just add the extra arg? Asking because the function is private, I
can only find six callsites in my local arrow-rs checkout, and this PR changes
all six of them.
A similar question applies to `arrow_field_to_avro`, except this PR only
seems to update one of the two callsites; the callsite in `impl
TryFrom<&ArrowSchema> for AvroSchema` remains unchanged. Seems reasonable to
just have one function (with the extra arg) and pass `NullFirst` manually as
needed?
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,48 +85,211 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ // For NullFirst: null => 0x00, value => 0x02
+ // For NullSecond: value => 0x00, null => 0x02
+ let byte = match order {
+ Nullability::NullFirst => {
+ if is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+ }
+ Nullability::NullSecond => {
+ if is_null {
+ 0x02
+ } else {
+ 0x00
+ }
+ }
+ };
+ writer
+ .write_all(&[byte])
+ .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) ->
Result<(), ArrowError> {
- encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+ /// Non-nested scalar/logical type
+ Scalar,
+ /// Record/Struct with Avro‑ordered children
+ Struct { encoders: Vec<FieldBinding> },
+ /// Array with item‑site nullability and nested plan
+ List {
+ items_nullability: Option<Nullability>,
+ item_plan: Box<FieldPlan>,
+ },
}
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
- batch: &RecordBatch,
- out: &mut W,
- opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
- let mut encoders = batch
- .schema()
- .fields()
- .iter()
- .zip(batch.columns())
- .map(|(field, array)| Ok((field.is_nullable(),
make_encoder(array.as_ref())?)))
- .collect::<Result<Vec<_>, ArrowError>>()?;
- (0..batch.num_rows()).try_for_each(|row| {
- encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
- if *is_nullable {
- let is_null = enc.is_null(row);
- write_optional_branch(out, is_null, opts.impala_mode)?;
- if is_null {
- return Ok(());
+/// Unified binding used for both top‑level columns and struct children.
+///
+/// This replaces the previous duplication between `StructChildPlan` and
`ColumnPlan`.
+#[derive(Debug, Clone)]
+struct FieldBinding {
+ /// Index of the Arrow field/column associated with this Avro field site
+ arrow_index: usize,
+ /// Nullability/order for this site (None if not optional)
Review Comment:
Also (tiny nit), it might be slightly more intuitive to put the plan (noun)
first and nullability (adjective describing noun) second, both here and in
FieldPlan::List above?
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -275,3 +384,434 @@ impl F64Encoder<'_> {
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+/// Unified field encoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Tracks the column/site null buffer and whether any nulls exist
+/// - Carries per-site Avro `Nullability` and precomputed union branch (fast
path)
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ nulls: Option<NullBuffer>,
+ has_nulls: bool,
+ nullability: Option<Nullability>,
+ /// Precomputed constant branch byte if the site is nullable but contains
no nulls
+ pre: Option<u8>,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: PlanRef<'_>,
+ ) -> Result<Self, ArrowError> {
+ let nulls = array.nulls().cloned();
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ Ok(Self {
+ encoder,
+ nulls,
+ has_nulls,
+ nullability: None,
+ pre: None,
+ })
+ }
+
+ #[inline]
+ fn has_nulls(&self) -> bool {
+ self.has_nulls
+ }
+
+ #[inline]
+ fn is_null(&self, idx: usize) -> bool {
+ self.nulls
+ .as_ref()
+ .is_some_and(|null_buffer| null_buffer.is_null(idx))
+ }
+
+ #[inline]
+ fn with_effective_nullability(mut self, order: Option<Nullability>) ->
Self {
+ self.nullability = order;
+ self.pre = self.precomputed_union_value_branch(order);
+ self
+ }
+
+ #[inline]
+ fn precomputed_union_value_branch(&self, order: Option<Nullability>) ->
Option<u8> {
+ match (order, self.has_nulls()) {
+ (Some(Nullability::NullFirst), false) => Some(0x02), // value
branch index 1
+ (Some(Nullability::NullSecond), false) => Some(0x00), // value
branch index 0
+ _ => None,
+ }
+ }
+
+ #[inline]
+ fn encode_inner<W: Write + ?Sized>(
+ &mut self,
+ idx: usize,
+ out: &mut W,
+ ) -> Result<(), ArrowError> {
+ self.encoder.encode(idx, out)
+ }
+
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ if let Some(b) = self.pre {
+ return out
+ .write_all(&[b])
+ .map_err(|e| ArrowError::IoError(format!("write union value
branch: {e}"), e))
+ .and_then(|_| self.encode_inner(idx, out));
+ }
+ if let Some(order) = self.nullability {
+ let is_null = self.is_null(idx);
+ write_optional_index(out, is_null, order)?;
+ if is_null {
+ return Ok(());
+ }
+ }
+ self.encode_inner(idx, out)
Review Comment:
Staring more at the code, I don't think `with_effective_nullability` should
exist as a separate method. The `nullability` and `pre` are too closely tied up
with `nulls` and `has_nulls`.
Also, there are too many Option here, which explodes the number of possible
states we have to consider. Is there a way to simplify it?
|nulls|has_nulls|nullability|pre|note
|:-:|:-:|:-:|:-:|-
|None|true|*|*|**INVALID** -- `has_null` implies Some `nulls`
|*|true|None|*|**INVALID** -- `has_nulls` but writer is non-nullable
|*|*|None|Some|**INVALID** -- Some `pre` implies Some `nullability`
|*|true|*|Some|**INVALID** -- Some `pre` implies not `has_nulls`
|*|false|Some|None|**Missed optimization** -- we should have Some `pre`
|None|false|None|None|Everyone agrees this is a non-nullable column
|None|false|Some|Some|Non-nullable data with nullable writer (leverage `pre`)
|Some|false|None|None|Nullable data with no nulls and non-nullable writer
|Some|false|Some|Some|Nullable data with no nulls and nullable writer
(leverage `pre`)
|Some|true|Some|None|Nullable data with nulls and nullable writer (cannot
leverage `pre`)
##########
arrow-avro/src/schema.rs:
##########
@@ -790,12 +856,35 @@ fn merge_extras(schema: Value, mut extras:
JsonMap<String, Value>) -> Value {
}
}
-// Convert an Arrow `DataType` into an Avro schema `Value`.
+fn wrap_nullable(inner: Value, order: Nullability) -> Value {
+ match order {
+ Nullability::NullFirst =>
Value::Array(vec![Value::String("null".into()), inner]),
+ Nullability::NullSecond => Value::Array(vec![inner,
Value::String("null".into())]),
+ }
+}
+
fn datatype_to_avro(
dt: &DataType,
field_name: &str,
metadata: &HashMap<String, String>,
name_gen: &mut NameGenerator,
+) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
+ datatype_to_avro_with_order(dt, field_name, metadata, name_gen,
Nullability::NullFirst)
+}
+
+fn arrow_field_to_avro(
+ field: &ArrowField,
+ name_gen: &mut NameGenerator,
+) -> Result<Value, ArrowError> {
+ arrow_field_to_avro_with_order(field, name_gen, Nullability::NullFirst)
+}
+
+fn datatype_to_avro_with_order(
+ dt: &DataType,
+ field_name: &str,
+ metadata: &HashMap<String, String>,
+ name_gen: &mut NameGenerator,
+ order: Nullability,
Review Comment:
In isolation, "order" is neither intuitive nor self-describing. For that
matter, `Nullability` isn't, either. Perhaps `null_order` and `NullOrder` would
be better names, along with e.g. `datatype_to_avro_with_null_order` (tho see
other comment about the function's name)?
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,48 +85,211 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ // For NullFirst: null => 0x00, value => 0x02
+ // For NullSecond: value => 0x00, null => 0x02
+ let byte = match order {
+ Nullability::NullFirst => {
+ if is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+ }
+ Nullability::NullSecond => {
+ if is_null {
+ 0x02
+ } else {
+ 0x00
+ }
+ }
+ };
+ writer
+ .write_all(&[byte])
+ .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) ->
Result<(), ArrowError> {
- encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
Review Comment:
No `Map` variant needed because maps are just a fancy way of interpreting a
struct?
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,48 +85,211 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ // For NullFirst: null => 0x00, value => 0x02
+ // For NullSecond: value => 0x00, null => 0x02
+ let byte = match order {
+ Nullability::NullFirst => {
+ if is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+ }
+ Nullability::NullSecond => {
+ if is_null {
+ 0x02
+ } else {
+ 0x00
+ }
+ }
+ };
+ writer
+ .write_all(&[byte])
+ .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) ->
Result<(), ArrowError> {
- encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+ /// Non-nested scalar/logical type
+ Scalar,
+ /// Record/Struct with Avro‑ordered children
+ Struct { encoders: Vec<FieldBinding> },
+ /// Array with item‑site nullability and nested plan
+ List {
+ items_nullability: Option<Nullability>,
+ item_plan: Box<FieldPlan>,
+ },
}
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
- batch: &RecordBatch,
- out: &mut W,
- opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
- let mut encoders = batch
- .schema()
- .fields()
- .iter()
- .zip(batch.columns())
- .map(|(field, array)| Ok((field.is_nullable(),
make_encoder(array.as_ref())?)))
- .collect::<Result<Vec<_>, ArrowError>>()?;
- (0..batch.num_rows()).try_for_each(|row| {
- encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
- if *is_nullable {
- let is_null = enc.is_null(row);
- write_optional_branch(out, is_null, opts.impala_mode)?;
- if is_null {
- return Ok(());
+/// Unified binding used for both top‑level columns and struct children.
+///
+/// This replaces the previous duplication between `StructChildPlan` and
`ColumnPlan`.
+#[derive(Debug, Clone)]
+struct FieldBinding {
+ /// Index of the Arrow field/column associated with this Avro field site
+ arrow_index: usize,
+ /// Nullability/order for this site (None if not optional)
Review Comment:
Does avro use `optional` and `required` as complementary terms, the way
parquet does?
```suggestion
/// Nullability/order for this site (None if required)
```
##########
arrow-avro/src/schema.rs:
##########
@@ -909,20 +998,42 @@ fn datatype_to_avro(
if matches!(dt, DataType::LargeList(_)) {
extras.insert("arrowLargeList".into(), Value::Bool(true));
}
- let (items, ie) =
- datatype_to_avro(child.data_type(), child.name(),
child.metadata(), name_gen)?;
+ let (items_inner, ie) = datatype_to_avro_with_order(
+ child.data_type(),
+ child.name(),
+ child.metadata(),
+ name_gen,
+ order,
+ )?;
+ let items_with_meta = merge_extras(items_inner, ie);
+ let items_schema = if child.is_nullable() {
+ wrap_nullable(items_with_meta, order)
+ } else {
+ items_with_meta
+ };
Review Comment:
This exact pattern shows up four times (List/LargeList, FixedSizedList, Map,
and struct field). Can we pull out a helper that all four call sites can use?
It might look something like this:
```rust
fn process_datatype_with_nullability(
dt: &DataType,
field_name: &str,
metadata: &HashMap<String, String>,
name_gen: &mut NameGenerator,
null_order: Nullability,
is_nullable: bool,
) -> Result<Value, ArrowError> {
let (schema, extras) = datatype_to_avro_with_order(dt, field_name,
metadata, name_gen, null_order)?;
let schema = merge_extras(schema, extras);
if is_nullable {
wrap_nullable(schema, null_union_order)
} else {
Ok(schema)
}
}
```
(terrible name for the helper, but you get the idea)
There are two additional sites (Dictionary and RunEndEncoded) that could
also use the helper -- they'd just need to pass `is_nullable: false`.
If you're looking for ways to split up the PR, a useful "prerefactor" might:
* plumb through the new (largely unused) `order` arg in
`datatype_to_avro_with_order` and friends
* add+use this new helper at the 4-6 call sites that benefit from it
##########
arrow-avro/src/writer/mod.rs:
##########
@@ -63,19 +66,42 @@ impl WriterBuilder {
self
}
+ /// Sets the capacity for the given object and returns the modified
instance.
+ pub fn with_capacity(mut self, capacity: usize) -> Self {
+ self.capacity = capacity;
+ self
+ }
+
/// Create a new `Writer` with specified `AvroFormat` and builder options.
- pub fn build<W, F>(self, writer: W) -> Writer<W, F>
+ /// Performs one‑time startup (header/stream init, encoder plan).
+ pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, ArrowError>
where
W: Write,
F: AvroFormat,
{
- Writer {
+ let mut format = F::default();
+ let avro_schema = if let Some(json) =
self.schema.metadata.get(SCHEMA_METADATA_KEY) {
+ AvroSchema::new(json.clone())
+ } else {
+ AvroSchema::try_from(&self.schema)?
+ };
Review Comment:
aside: I'm always torn whether to use if let Some vs. match at times like
this...
```suggestion
let avro_schema = match
self.schema.metadata.get(SCHEMA_METADATA_KEY) {
Some(json) => AvroSchema::new(json.clone()),
None => AvroSchema::try_from(&self.schema)?,
};
```
(I think it _might_ be a net win this time?)
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -275,3 +384,434 @@ impl F64Encoder<'_> {
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+/// Unified field encoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Tracks the column/site null buffer and whether any nulls exist
+/// - Carries per-site Avro `Nullability` and precomputed union branch (fast
path)
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ nulls: Option<NullBuffer>,
+ has_nulls: bool,
+ nullability: Option<Nullability>,
+ /// Precomputed constant branch byte if the site is nullable but contains
no nulls
+ pre: Option<u8>,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: PlanRef<'_>,
+ ) -> Result<Self, ArrowError> {
+ let nulls = array.nulls().cloned();
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ Ok(Self {
+ encoder,
+ nulls,
+ has_nulls,
+ nullability: None,
+ pre: None,
+ })
+ }
+
+ #[inline]
+ fn has_nulls(&self) -> bool {
+ self.has_nulls
+ }
+
+ #[inline]
+ fn is_null(&self, idx: usize) -> bool {
+ self.nulls
+ .as_ref()
+ .is_some_and(|null_buffer| null_buffer.is_null(idx))
+ }
+
+ #[inline]
+ fn with_effective_nullability(mut self, order: Option<Nullability>) ->
Self {
+ self.nullability = order;
+ self.pre = self.precomputed_union_value_branch(order);
+ self
+ }
+
+ #[inline]
+ fn precomputed_union_value_branch(&self, order: Option<Nullability>) ->
Option<u8> {
+ match (order, self.has_nulls()) {
+ (Some(Nullability::NullFirst), false) => Some(0x02), // value
branch index 1
+ (Some(Nullability::NullSecond), false) => Some(0x00), // value
branch index 0
+ _ => None,
+ }
Review Comment:
There seems to be some overlap with `write_optional_index`; is there a way
to factor out the common logic (and magic constants) in a way that both can use?
<details>
<summary>Mabye something like this?</summary>
```rust
fn union_value_branch_byte(null_order: NullOrder, is_null: bool) -> u8 {
let nulls_first = null_order == NullOrder::NullFirst;
nulls_first == is_null { 0x00 } else { 0x02 }
}
```
and then the code in `write_optional_index` changes to:
```rust
let byte = union_value_branch_byte(null_order, is_null);
```
and this code here:
```suggestion
// ... explanatory comment here ...
let null_order = nullability?;
(!self.has_nulls()).then(|| union_value_branch_byte(null_order,
false))
```
</details>
Also, would `(None, true)` be an error, or at least an invalid combination?
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -275,3 +384,434 @@ impl F64Encoder<'_> {
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+/// Unified field encoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Tracks the column/site null buffer and whether any nulls exist
+/// - Carries per-site Avro `Nullability` and precomputed union branch (fast
path)
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ nulls: Option<NullBuffer>,
+ has_nulls: bool,
+ nullability: Option<Nullability>,
+ /// Precomputed constant branch byte if the site is nullable but contains
no nulls
+ pre: Option<u8>,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: PlanRef<'_>,
+ ) -> Result<Self, ArrowError> {
+ let nulls = array.nulls().cloned();
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ Ok(Self {
+ encoder,
+ nulls,
+ has_nulls,
+ nullability: None,
+ pre: None,
+ })
+ }
+
+ #[inline]
+ fn has_nulls(&self) -> bool {
+ self.has_nulls
+ }
+
+ #[inline]
+ fn is_null(&self, idx: usize) -> bool {
+ self.nulls
+ .as_ref()
+ .is_some_and(|null_buffer| null_buffer.is_null(idx))
+ }
+
+ #[inline]
+ fn with_effective_nullability(mut self, order: Option<Nullability>) ->
Self {
+ self.nullability = order;
+ self.pre = self.precomputed_union_value_branch(order);
+ self
+ }
+
+ #[inline]
+ fn precomputed_union_value_branch(&self, order: Option<Nullability>) ->
Option<u8> {
+ match (order, self.has_nulls()) {
+ (Some(Nullability::NullFirst), false) => Some(0x02), // value
branch index 1
+ (Some(Nullability::NullSecond), false) => Some(0x00), // value
branch index 0
+ _ => None,
+ }
+ }
+
+ #[inline]
+ fn encode_inner<W: Write + ?Sized>(
+ &mut self,
+ idx: usize,
+ out: &mut W,
+ ) -> Result<(), ArrowError> {
+ self.encoder.encode(idx, out)
+ }
+
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ if let Some(b) = self.pre {
+ return out
+ .write_all(&[b])
+ .map_err(|e| ArrowError::IoError(format!("write union value
branch: {e}"), e))
+ .and_then(|_| self.encode_inner(idx, out));
+ }
+ if let Some(order) = self.nullability {
+ let is_null = self.is_null(idx);
+ write_optional_index(out, is_null, order)?;
+ if is_null {
+ return Ok(());
+ }
+ }
+ self.encode_inner(idx, out)
Review Comment:
This seems a bit complex/redundant (two `self.encode_inner` calls). Can it
be simplified while still preserving the `pre` shortcut?
```suggestion
if let Some(b) = self.pre {
out
.write_all(&[b])
.map_err(|e| ArrowError::IoError(format!("write union value
branch: {e}"), e))?;
} else if let Some(null_order) = self.nullability {
let is_null = self.is_null(idx);
write_optional_index(out, is_null, null_order)?;
if is_null {
return Ok(());
}
}
self.encode_inner(idx, out)
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,48 +85,211 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ // For NullFirst: null => 0x00, value => 0x02
+ // For NullSecond: value => 0x00, null => 0x02
+ let byte = match order {
+ Nullability::NullFirst => {
+ if is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+ }
+ Nullability::NullSecond => {
+ if is_null {
+ 0x02
+ } else {
+ 0x00
+ }
+ }
Review Comment:
Could also keep a similar approach to the existing code:
```rust
let nulls_first = order == Nullability::NullFirst;
let byte = if nulls_first == is_null { 0x00 } else { 0x02 };
```
... very compact but perhaps harder to grok
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -275,3 +384,434 @@ impl F64Encoder<'_> {
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+/// Unified field encoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Tracks the column/site null buffer and whether any nulls exist
+/// - Carries per-site Avro `Nullability` and precomputed union branch (fast
path)
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ nulls: Option<NullBuffer>,
+ has_nulls: bool,
+ nullability: Option<Nullability>,
+ /// Precomputed constant branch byte if the site is nullable but contains
no nulls
+ pre: Option<u8>,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: PlanRef<'_>,
+ ) -> Result<Self, ArrowError> {
+ let nulls = array.nulls().cloned();
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ Ok(Self {
+ encoder,
+ nulls,
+ has_nulls,
+ nullability: None,
+ pre: None,
+ })
+ }
+
+ #[inline]
+ fn has_nulls(&self) -> bool {
+ self.has_nulls
+ }
+
+ #[inline]
+ fn is_null(&self, idx: usize) -> bool {
+ self.nulls
+ .as_ref()
+ .is_some_and(|null_buffer| null_buffer.is_null(idx))
+ }
+
+ #[inline]
+ fn with_effective_nullability(mut self, order: Option<Nullability>) ->
Self {
+ self.nullability = order;
+ self.pre = self.precomputed_union_value_branch(order);
+ self
+ }
+
+ #[inline]
+ fn precomputed_union_value_branch(&self, order: Option<Nullability>) ->
Option<u8> {
+ match (order, self.has_nulls()) {
+ (Some(Nullability::NullFirst), false) => Some(0x02), // value
branch index 1
+ (Some(Nullability::NullSecond), false) => Some(0x00), // value
branch index 0
+ _ => None,
+ }
+ }
+
+ #[inline]
+ fn encode_inner<W: Write + ?Sized>(
+ &mut self,
+ idx: usize,
+ out: &mut W,
+ ) -> Result<(), ArrowError> {
+ self.encoder.encode(idx, out)
+ }
+
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ if let Some(b) = self.pre {
+ return out
+ .write_all(&[b])
+ .map_err(|e| ArrowError::IoError(format!("write union value
branch: {e}"), e))
+ .and_then(|_| self.encode_inner(idx, out));
+ }
+ if let Some(order) = self.nullability {
+ let is_null = self.is_null(idx);
+ write_optional_index(out, is_null, order)?;
+ if is_null {
+ return Ok(());
+ }
+ }
+ self.encode_inner(idx, out)
Review Comment:
Also -- What should happen if `self.is_null(idx) &&
self.nullability.is_none()`?
Should `with_effective_nullability` block that combination by returning an
error?
Or do we need to somehow handle it here?
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -275,3 +384,434 @@ impl F64Encoder<'_> {
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+/// Unified field encoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Tracks the column/site null buffer and whether any nulls exist
+/// - Carries per-site Avro `Nullability` and precomputed union branch (fast
path)
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ nulls: Option<NullBuffer>,
+ has_nulls: bool,
+ nullability: Option<Nullability>,
+ /// Precomputed constant branch byte if the site is nullable but contains
no nulls
+ pre: Option<u8>,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: PlanRef<'_>,
+ ) -> Result<Self, ArrowError> {
+ let nulls = array.nulls().cloned();
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ Ok(Self {
+ encoder,
+ nulls,
+ has_nulls,
+ nullability: None,
+ pre: None,
+ })
+ }
+
+ #[inline]
+ fn has_nulls(&self) -> bool {
+ self.has_nulls
+ }
+
+ #[inline]
+ fn is_null(&self, idx: usize) -> bool {
+ self.nulls
+ .as_ref()
+ .is_some_and(|null_buffer| null_buffer.is_null(idx))
+ }
+
+ #[inline]
+ fn with_effective_nullability(mut self, order: Option<Nullability>) ->
Self {
+ self.nullability = order;
+ self.pre = self.precomputed_union_value_branch(order);
+ self
+ }
+
+ #[inline]
+ fn precomputed_union_value_branch(&self, order: Option<Nullability>) ->
Option<u8> {
+ match (order, self.has_nulls()) {
+ (Some(Nullability::NullFirst), false) => Some(0x02), // value
branch index 1
+ (Some(Nullability::NullSecond), false) => Some(0x00), // value
branch index 0
+ _ => None,
+ }
+ }
+
+ #[inline]
+ fn encode_inner<W: Write + ?Sized>(
+ &mut self,
+ idx: usize,
+ out: &mut W,
+ ) -> Result<(), ArrowError> {
+ self.encoder.encode(idx, out)
+ }
+
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ if let Some(b) = self.pre {
+ return out
+ .write_all(&[b])
+ .map_err(|e| ArrowError::IoError(format!("write union value
branch: {e}"), e))
+ .and_then(|_| self.encode_inner(idx, out));
+ }
+ if let Some(order) = self.nullability {
+ let is_null = self.is_null(idx);
+ write_optional_index(out, is_null, order)?;
+ if is_null {
+ return Ok(());
+ }
+ }
+ self.encode_inner(idx, out)
+ }
+}
+
+struct StructEncoder<'a> {
+ encoders: Vec<FieldEncoder<'a>>,
+}
+
+impl<'a> StructEncoder<'a> {
+ fn try_new(
+ array: &'a StructArray,
+ field_bindings: &[FieldBinding],
+ ) -> Result<Self, ArrowError> {
+ let fields = match array.data_type() {
+ DataType::Struct(struct_fields) => struct_fields,
+ _ => return Err(ArrowError::SchemaError("Expected Struct".into())),
+ };
+ let mut encoders = Vec::with_capacity(field_bindings.len());
+ for field_binding in field_bindings {
+ let idx = field_binding.arrow_index;
+ let column = array.columns().get(idx).ok_or_else(|| {
+ ArrowError::SchemaError(format!("Struct child index {idx} out
of range"))
+ })?;
+ let field = fields
+ .get(idx)
+ .ok_or_else(|| {
+ ArrowError::SchemaError(format!("Struct child index {idx}
out of range"))
+ })?
+ .as_ref();
+ let encoder = prepare_value_site_encoder(
+ column.as_ref(),
+ field,
+ field_binding.nullability,
+ &field_binding.plan,
+ )?;
+ encoders.push(encoder);
+ }
+ Ok(Self { encoders })
+ }
+
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ for encoder in self.encoders.iter_mut() {
+ encoder.encode(idx, out)?;
+ }
+ Ok(())
+ }
+}
+
+#[inline]
+fn encode_blocked_range<W: Write + ?Sized, F>(
+ out: &mut W,
+ start: usize,
+ end: usize,
+ mut write_item: F,
+) -> Result<(), ArrowError>
+where
+ F: FnMut(usize, &mut W) -> Result<(), ArrowError>,
+{
+ let len = end.saturating_sub(start);
+ if len == 0 {
+ // Zero-length terminator per Avro spec
+ write_long(out, 0)?;
+ return Ok(());
+ }
+ // Emit a single positive block for performance, then the end marker.
+ write_long(out, len as i64)?;
+ for j in start..end {
+ write_item(j, out)?;
+ }
+ write_long(out, 0)?;
+ Ok(())
+}
+
+struct ListEncoder<'a, O: OffsetSizeTrait> {
+ list: &'a GenericListArray<O>,
+ values: FieldEncoder<'a>,
+ values_offset: usize,
+}
+
+type ListEncoder32<'a> = ListEncoder<'a, i32>;
+type ListEncoder64<'a> = ListEncoder<'a, i64>;
+
+impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
+ fn try_new(
+ list: &'a GenericListArray<O>,
+ items_nullability: Option<Nullability>,
+ item_plan: &FieldPlan,
+ ) -> Result<Self, ArrowError> {
+ let child_field = match list.data_type() {
+ DataType::List(field) => field.as_ref(),
+ DataType::LargeList(field) => field.as_ref(),
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Expected List or LargeList for ListEncoder".into(),
+ ))
+ }
+ };
+ let values_enc = prepare_value_site_encoder(
+ list.values().as_ref(),
+ child_field,
+ items_nullability,
+ item_plan,
+ )?;
+ Ok(Self {
+ list,
+ values: values_enc,
+ values_offset: list.values().offset(),
+ })
+ }
+
+ #[inline]
+ fn encode_list_range<W: Write + ?Sized>(
+ &mut self,
+ out: &mut W,
+ start: usize,
+ end: usize,
+ ) -> Result<(), ArrowError> {
+ encode_blocked_range(out, start, end, |row, out| {
+ self.values
+ .encode(row.saturating_sub(self.values_offset), out)
+ })
+ }
+
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ let offsets = self.list.offsets();
+ let start = offsets[idx].to_usize().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!("Error converting
offset[{idx}] to usize"))
+ })?;
+ let end = offsets[idx + 1].to_usize().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!(
+ "Error converting offset[{}] to usize",
+ idx + 1
+ ))
+ })?;
+ self.encode_list_range(out, start, end)
+ }
+}
+
+#[inline]
+fn prepare_value_site_encoder<'a>(
+ values_array: &'a dyn Array,
+ value_field: &Field,
+ site_nullability: Option<Nullability>,
+ plan: PlanRef<'_>,
+) -> Result<FieldEncoder<'a>, ArrowError> {
+ // Effective nullability is exactly the site's Avro-declared nullability.
+ Ok(FieldEncoder::make_encoder(values_array, value_field, plan)?
+ .with_effective_nullability(site_nullability))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::types::Int32Type;
+ use arrow_array::{
+ Array, ArrayRef, BinaryArray, BooleanArray, Float32Array,
Float64Array, Int32Array,
+ Int64Array, LargeBinaryArray, LargeListArray, LargeStringArray,
ListArray, StringArray,
+ TimestampMicrosecondArray,
+ };
+ use arrow_schema::{DataType, Field, Fields};
+
+ fn zigzag_i64(v: i64) -> u64 {
+ ((v << 1) ^ (v >> 63)) as u64
+ }
+
+ fn varint(mut x: u64) -> Vec<u8> {
+ let mut out = Vec::new();
+ while (x & !0x7f) != 0 {
+ out.push(((x & 0x7f) as u8) | 0x80);
+ x >>= 7;
+ }
+ out.push((x & 0x7f) as u8);
+ out
+ }
+
+ fn avro_long_bytes(v: i64) -> Vec<u8> {
+ varint(zigzag_i64(v))
+ }
+
+ fn avro_len_prefixed_bytes(payload: &[u8]) -> Vec<u8> {
+ let mut out = avro_long_bytes(payload.len() as i64);
+ out.extend_from_slice(payload);
+ out
+ }
+
+ fn encode_all(array: &dyn Array, plan: &FieldPlan, site:
Option<Nullability>) -> Vec<u8> {
Review Comment:
`site` is a really unintuitive name for this... earlier code used
`site_nullability` which seems better?
But why not just `nullability` for both?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]