scovich commented on code in PR #8274:
URL: https://github.com/apache/arrow-rs/pull/8274#discussion_r2325187158


##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
 fn write_optional_index<W: Write + ?Sized>(
     writer: &mut W,
     is_null: bool,
-    order: Nullability,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    // For NullFirst: null => 0x00, value => 0x02
-    // For NullSecond: value => 0x00, null => 0x02
-    let byte = match order {
-        Nullability::NullFirst => {
-            if is_null {
-                0x00
-            } else {
-                0x02
-            }
-        }
-        Nullability::NullSecond => {
-            if is_null {
-                0x02
-            } else {
-                0x00
-            }
-        }
-    };
+    let byte = union_value_branch_byte(null_order, is_null);
     writer
         .write_all(&[byte])
         .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        byte: u8,

Review Comment:
   ```suggestion
           union_value_byte: u8,
   ```
   (`byte` by itself adds nothing -- `u8` is a byte by definition)



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
 fn write_optional_index<W: Write + ?Sized>(
     writer: &mut W,
     is_null: bool,
-    order: Nullability,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    // For NullFirst: null => 0x00, value => 0x02
-    // For NullSecond: value => 0x00, null => 0x02
-    let byte = match order {
-        Nullability::NullFirst => {
-            if is_null {
-                0x00
-            } else {
-                0x02
-            }
-        }
-        Nullability::NullSecond => {
-            if is_null {
-                0x02
-            } else {
-                0x00
-            }
-        }
-    };
+    let byte = union_value_branch_byte(null_order, is_null);
     writer
         .write_all(&[byte])
         .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        byte: u8,
+    },
+    Nullable {
+        nulls: NullBuffer,
+        null_order: Nullability,
+    },
+}
+
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
+pub struct FieldEncoder<'a> {
+    encoder: Encoder<'a>,
+    null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+    fn make_encoder(
+        array: &'a dyn Array,
+        field: &Field,
+        plan: &FieldPlan,
+        nullability: Option<Nullability>,
+    ) -> Result<Self, ArrowError> {
+        let has_nulls = array.null_count() > 0;
+        let encoder = match plan {
+            FieldPlan::Struct { encoders } => {
+                let arr = array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| ArrowError::SchemaError("Expected 
StructArray".into()))?;
+                Encoder::Struct(Box::new(StructEncoder::try_new(arr, 
encoders)?))
+            }
+            FieldPlan::List {
+                items_nullability,
+                item_plan,
+            } => match array.data_type() {
+                DataType::List(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListArray".into()))?;
+                    Encoder::List(Box::new(ListEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeList(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
LargeListArray".into()))?;
+                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                    )))
+                }
+            },
+            FieldPlan::Scalar => match array.data_type() {
+                DataType::Boolean => 
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+                DataType::Utf8 => {
+                    
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+                }
+                DataType::LargeUtf8 => {
+                    
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+                }
+                DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+                DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+                DataType::Float32 => {
+                    
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+                }
+                DataType::Float64 => {
+                    
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+                }
+                DataType::Binary => 
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+                DataType::LargeBinary => {
+                    
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+                }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
Encoder::Timestamp(LongEncoder(
+                    array.as_primitive::<TimestampMicrosecondType>(),
+                )),
+                other => {
+                    return Err(ArrowError::NotYetImplemented(format!(
+                        "Avro scalar type not yet supported: {other:?}"
+                    )));
+                }
+            },
+            other => {
+                return Err(ArrowError::NotYetImplemented(
+                    "Avro writer: {other:?} not yet supported".into(),
+                ));
+            }
+        };
+        // Compute the effective null state from writer-declared nullability 
and data nulls.
+        let null_state = match (nullability, has_nulls) {

Review Comment:
   This is the only use site for `has_nulls`, just fold it in?
   ```suggestion
           let null_state = match (nullability, array.null_count() > 0) {
   ```
   (or at least define it closer to its use site)



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
 fn write_optional_index<W: Write + ?Sized>(
     writer: &mut W,
     is_null: bool,
-    order: Nullability,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    // For NullFirst: null => 0x00, value => 0x02
-    // For NullSecond: value => 0x00, null => 0x02
-    let byte = match order {
-        Nullability::NullFirst => {
-            if is_null {
-                0x00
-            } else {
-                0x02
-            }
-        }
-        Nullability::NullSecond => {
-            if is_null {
-                0x02
-            } else {
-                0x00
-            }
-        }
-    };
+    let byte = union_value_branch_byte(null_order, is_null);
     writer
         .write_all(&[byte])
         .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        byte: u8,
+    },
+    Nullable {
+        nulls: NullBuffer,
+        null_order: Nullability,
+    },
+}
+
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
+pub struct FieldEncoder<'a> {
+    encoder: Encoder<'a>,
+    null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+    fn make_encoder(
+        array: &'a dyn Array,
+        field: &Field,
+        plan: &FieldPlan,
+        nullability: Option<Nullability>,
+    ) -> Result<Self, ArrowError> {
+        let has_nulls = array.null_count() > 0;
+        let encoder = match plan {
+            FieldPlan::Struct { encoders } => {
+                let arr = array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| ArrowError::SchemaError("Expected 
StructArray".into()))?;
+                Encoder::Struct(Box::new(StructEncoder::try_new(arr, 
encoders)?))
+            }
+            FieldPlan::List {
+                items_nullability,
+                item_plan,
+            } => match array.data_type() {
+                DataType::List(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListArray".into()))?;
+                    Encoder::List(Box::new(ListEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeList(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
LargeListArray".into()))?;
+                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                    )))
+                }
+            },
+            FieldPlan::Scalar => match array.data_type() {
+                DataType::Boolean => 
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+                DataType::Utf8 => {
+                    
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+                }
+                DataType::LargeUtf8 => {
+                    
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+                }
+                DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+                DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+                DataType::Float32 => {
+                    
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+                }
+                DataType::Float64 => {
+                    
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+                }
+                DataType::Binary => 
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+                DataType::LargeBinary => {
+                    
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+                }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
Encoder::Timestamp(LongEncoder(
+                    array.as_primitive::<TimestampMicrosecondType>(),
+                )),
+                other => {
+                    return Err(ArrowError::NotYetImplemented(format!(
+                        "Avro scalar type not yet supported: {other:?}"
+                    )));
+                }
+            },
+            other => {
+                return Err(ArrowError::NotYetImplemented(
+                    "Avro writer: {other:?} not yet supported".into(),
+                ));
+            }
+        };
+        // Compute the effective null state from writer-declared nullability 
and data nulls.
+        let null_state = match (nullability, has_nulls) {
+            (None, false) => NullState::NonNullable,
+            (None, true) => {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "Avro site '{}' is non-nullable, but array contains nulls",
+                    field.name()
+                )));
+            }
+            (Some(order), false) => {
+                // Optimization: drop any bitmap; emit a constant "value" 
branch byte.
+                let byte = union_value_branch_byte(order, false);
+                NullState::NullableNoNulls { byte }
+            }
+            (Some(null_order), true) => {
+                let null_buffer = array.nulls().cloned().ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(format!(
+                        "Array for Avro site '{}' reports nulls but has no 
null buffer",
+                        field.name()
+                    ))
+                })?;
+                NullState::Nullable {
+                    nulls: null_buffer,
+                    null_order,
+                }
+            }
+        };
+        Ok(Self {
+            encoder,
+            null_state,
+        })
+    }
+
+    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
+        match &self.null_state {
+            NullState::NonNullable => self.encoder.encode(idx, out),
+            NullState::NullableNoNulls { byte } => {
+                // Constant non-null branch byte, then value.
+                out.write_all(&[*byte]).map_err(|e| {
+                    ArrowError::IoError(format!("write union value branch: 
{e}"), e)
+                })?;
+                self.encoder.encode(idx, out)
+            }
+            NullState::Nullable { nulls, null_order } => {
+                let is_null = nulls.is_null(idx);
+                write_optional_index(out, is_null, *null_order)?;
+                if is_null {
+                    Ok(())
+                } else {
+                    self.encoder.encode(idx, out)
+                }
+            }
+        }

Review Comment:
   nit -- We could remove some duplication by:
   ```suggestion
               NullState::NonNullable => {}
               NullState::NullableNoNulls { byte } => 
out.write_all(&[*byte]).map_err(|e| {
                   ArrowError::IoError(format!("write union value branch: 
{e}"), e)
               })?,
               NullState::Nullable { nulls, null_order } => {
                   let is_null = nulls.is_null(idx);
                   write_optional_index(out, is_null, *null_order)?;
                   if is_null {
                       return Ok(()); // no value to write
                   }
               }
           }
           self.encoder.encode(idx, out)
   ```
   or even
   ```suggestion
               NullState::NonNullable => {}
               NullState::NullableNoNulls { byte } => 
out.write_all(&[*byte]).map_err(|e| {
                   ArrowError::IoError(format!("write union value branch: 
{e}"), e)
               })?,
               NullState::Nullable { nulls, null_order } if nulls.is_null(idx) 
=> {
                   return write_optional_index(out, true, *null_order); // no 
value to write
               }
               NullState::Nullable { null_order, .. } => {
                   write_optional_index(out, false, *null_order)?;
               }
           }
           self.encoder.encode(idx, out)
   ```
   (splitting apart the `write_optional_index` calls will likely trigger a 
bunch of jump threading and inlining optimizations that more than makes up for 
the apparent duplication at the call site -- the compiler will basically put 
half of the function in each location, thanks to the hard-wired `is_null` arg 
value)



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -83,49 +81,358 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
     writer: &mut W,
     is_null: bool,
-    impala_mode: bool,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    let branch = if impala_mode == is_null { 1 } else { 0 };
-    write_int(writer, branch)
+    let byte = union_value_branch_byte(null_order, is_null);
+    writer
+        .write_all(&[byte])
+        .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) -> 
Result<(), ArrowError> {
-    encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        byte: u8,
+    },
+    Nullable {
+        nulls: NullBuffer,
+        null_order: Nullability,
+    },
 }
 
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
-    batch: &RecordBatch,
-    out: &mut W,
-    opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
-    let mut encoders = batch
-        .schema()
-        .fields()
-        .iter()
-        .zip(batch.columns())
-        .map(|(field, array)| Ok((field.is_nullable(), 
make_encoder(array.as_ref())?)))
-        .collect::<Result<Vec<_>, ArrowError>>()?;
-    (0..batch.num_rows()).try_for_each(|row| {
-        encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
-            if *is_nullable {
-                let is_null = enc.is_null(row);
-                write_optional_branch(out, is_null, opts.impala_mode)?;
-                if is_null {
-                    return Ok(());
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
+pub struct FieldEncoder<'a> {
+    encoder: Encoder<'a>,
+    null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+    fn make_encoder(
+        array: &'a dyn Array,
+        field: &Field,
+        plan: &FieldPlan,
+        nullability: Option<Nullability>,
+    ) -> Result<Self, ArrowError> {
+        let has_nulls = array.null_count() > 0;
+        let encoder = match plan {
+            FieldPlan::Struct { encoders } => {
+                let arr = array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| ArrowError::SchemaError("Expected 
StructArray".into()))?;
+                Encoder::Struct(Box::new(StructEncoder::try_new(arr, 
encoders)?))
+            }
+            FieldPlan::List {
+                items_nullability,
+                item_plan,
+            } => match array.data_type() {
+                DataType::List(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListArray".into()))?;
+                    Encoder::List(Box::new(ListEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeList(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
LargeListArray".into()))?;
+                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                    )))
+                }
+            },
+            FieldPlan::Scalar => match array.data_type() {
+                DataType::Boolean => 
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+                DataType::Utf8 => {
+                    
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+                }
+                DataType::LargeUtf8 => {
+                    
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+                }
+                DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+                DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+                DataType::Float32 => {
+                    
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+                }
+                DataType::Float64 => {
+                    
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+                }
+                DataType::Binary => 
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+                DataType::LargeBinary => {
+                    
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+                }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
Encoder::Timestamp(LongEncoder(
+                    array.as_primitive::<TimestampMicrosecondType>(),
+                )),
+                other => {
+                    return Err(ArrowError::NotYetImplemented(format!(
+                        "Avro scalar type not yet supported: {other:?}"
+                    )));
+                }
+            },
+            other => {
+                return Err(ArrowError::NotYetImplemented(
+                    "Avro writer: {other:?} not yet supported".into(),
+                ));
+            }
+        };
+        // Compute the effective null state from writer-declared nullability 
and data nulls.
+        let null_state = match (nullability, has_nulls) {
+            (None, false) => NullState::NonNullable,
+            (None, true) => {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "Avro site '{}' is non-nullable, but array contains nulls",
+                    field.name()
+                )));
+            }
+            (Some(order), false) => {
+                // Optimization: drop any bitmap; emit a constant "value" 
branch byte.
+                let byte = union_value_branch_byte(order, false);
+                NullState::NullableNoNulls { byte }
+            }
+            (Some(null_order), true) => {
+                let null_buffer = array.nulls().cloned().ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(format!(
+                        "Array for Avro site '{}' reports nulls but has no 
null buffer",
+                        field.name()
+                    ))
+                })?;
+                NullState::Nullable {
+                    nulls: null_buffer,
+                    null_order,
                 }
             }
-            enc.encode(row, out)
+        };
+        Ok(Self {
+            encoder,
+            null_state,
         })
-    })
+    }
+
+    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
+        match &self.null_state {
+            NullState::NonNullable => self.encoder.encode(idx, out),
+            NullState::NullableNoNulls { byte } => {
+                // Constant non-null branch byte, then value.
+                out.write_all(&[*byte]).map_err(|e| {
+                    ArrowError::IoError(format!("write union value branch: 
{e}"), e)
+                })?;
+                self.encoder.encode(idx, out)
+            }
+            NullState::Nullable { nulls, null_order } => {
+                let is_null = nulls.is_null(idx);
+                write_optional_index(out, is_null, *null_order)?;
+                if is_null {
+                    Ok(())
+                } else {
+                    self.encoder.encode(idx, out)
+                }
+            }
+        }
+    }
+}
+
+fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
+    let nulls_first = null_order == Nullability::default();
+    if nulls_first == is_null {
+        0x00
+    } else {
+        0x02
+    }
+}
+
+/// Per‑site encoder plan for a field. This mirrors the Avro structure, so 
nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+    /// Non-nested scalar/logical type
+    Scalar,
+    /// Record/Struct with Avro‑ordered children
+    Struct { encoders: Vec<FieldBinding> },
+    /// Array with item‑site nullability and nested plan
+    List {
+        items_nullability: Option<Nullability>,
+        item_plan: Box<FieldPlan>,
+    },
+}
+
+#[derive(Debug, Clone)]
+struct FieldBinding {
+    /// Index of the Arrow field/column associated with this Avro field site
+    arrow_index: usize,
+    /// Nullability/order for this site (None if required)
+    nullability: Option<Nullability>,
+    /// Nested plan for this site
+    plan: FieldPlan,
+}
+
+/// Builder for `RecordEncoder` write plan
+#[derive(Debug)]
+pub struct RecordEncoderBuilder<'a> {
+    avro_root: &'a AvroField,
+    arrow_schema: &'a ArrowSchema,
+}
+
+impl<'a> RecordEncoderBuilder<'a> {
+    /// Create a new builder from the Avro root and Arrow schema.
+    pub fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> 
Self {
+        Self {
+            avro_root,
+            arrow_schema,
+        }
+    }
+
+    /// Build the `RecordEncoder` by walking the Avro **record** root in Avro 
order,
+    /// resolving each field to an Arrow index by name.
+    pub fn build(self) -> Result<RecordEncoder, ArrowError> {
+        let avro_root_dt = self.avro_root.data_type();
+        let root_fields = match avro_root_dt.codec() {
+            Codec::Struct(fields) => fields,
+            _ => {
+                return Err(ArrowError::SchemaError(
+                    "Top-level Avro schema must be a record/struct".into(),
+                ))
+            }
+        };
+        let mut columns = Vec::with_capacity(root_fields.len());
+        for root_field in root_fields.iter() {

Review Comment:
   nit: the `iter()` call should be unnecssary (thanks to `impl IntoIterator 
for &Self`)?
   ```suggestion
           for root_field in root_fields {
   ```



##########
arrow-avro/src/schema.rs:
##########
@@ -1118,26 +1056,37 @@ fn datatype_to_avro_with_order(
     Ok((val, extras))
 }
 
-fn arrow_field_to_avro_with_order(
+fn process_datatype(
+    dt: &DataType,
+    field_name: &str,
+    metadata: &HashMap<String, String>,
+    name_gen: &mut NameGenerator,
+    null_order: Nullability,
+    is_nullable: bool,
+) -> Result<Value, ArrowError> {
+    let (schema, extras) = datatype_to_avro(dt, field_name, metadata, 
name_gen, null_order)?;
+    let merged = merge_extras(schema, extras);
+    Ok(if is_nullable {
+        wrap_nullable(merged, null_order)
+    } else {
+        merged
+    })

Review Comment:
   nit: 
   ```suggestion
       let mut merged = merge_extras(schema, extras);
       if is_nullable {
           merged = wrap_nullable(merged, null_order)
       }
       Ok(merged)
   ```
   (`mut` is underutilized in situations like this, and IMO `if` inside parens 
is hard to read+maintain)



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
 fn write_optional_index<W: Write + ?Sized>(
     writer: &mut W,
     is_null: bool,
-    order: Nullability,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    // For NullFirst: null => 0x00, value => 0x02
-    // For NullSecond: value => 0x00, null => 0x02
-    let byte = match order {
-        Nullability::NullFirst => {
-            if is_null {
-                0x00
-            } else {
-                0x02
-            }
-        }
-        Nullability::NullSecond => {
-            if is_null {
-                0x02
-            } else {
-                0x00
-            }
-        }
-    };
+    let byte = union_value_branch_byte(null_order, is_null);
     writer
         .write_all(&[byte])
         .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        byte: u8,
+    },
+    Nullable {
+        nulls: NullBuffer,
+        null_order: Nullability,
+    },
+}
+
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
+pub struct FieldEncoder<'a> {
+    encoder: Encoder<'a>,
+    null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+    fn make_encoder(
+        array: &'a dyn Array,
+        field: &Field,
+        plan: &FieldPlan,
+        nullability: Option<Nullability>,
+    ) -> Result<Self, ArrowError> {
+        let has_nulls = array.null_count() > 0;
+        let encoder = match plan {
+            FieldPlan::Struct { encoders } => {
+                let arr = array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| ArrowError::SchemaError("Expected 
StructArray".into()))?;
+                Encoder::Struct(Box::new(StructEncoder::try_new(arr, 
encoders)?))
+            }
+            FieldPlan::List {
+                items_nullability,
+                item_plan,
+            } => match array.data_type() {
+                DataType::List(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListArray".into()))?;
+                    Encoder::List(Box::new(ListEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeList(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
LargeListArray".into()))?;
+                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                    )))
+                }
+            },
+            FieldPlan::Scalar => match array.data_type() {
+                DataType::Boolean => 
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+                DataType::Utf8 => {
+                    
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+                }
+                DataType::LargeUtf8 => {
+                    
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+                }
+                DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+                DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+                DataType::Float32 => {
+                    
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+                }
+                DataType::Float64 => {
+                    
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+                }
+                DataType::Binary => 
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+                DataType::LargeBinary => {
+                    
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+                }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
Encoder::Timestamp(LongEncoder(
+                    array.as_primitive::<TimestampMicrosecondType>(),
+                )),
+                other => {
+                    return Err(ArrowError::NotYetImplemented(format!(
+                        "Avro scalar type not yet supported: {other:?}"
+                    )));
+                }
+            },
+            other => {
+                return Err(ArrowError::NotYetImplemented(
+                    "Avro writer: {other:?} not yet supported".into(),
+                ));
+            }
+        };
+        // Compute the effective null state from writer-declared nullability 
and data nulls.
+        let null_state = match (nullability, has_nulls) {
+            (None, false) => NullState::NonNullable,
+            (None, true) => {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "Avro site '{}' is non-nullable, but array contains nulls",
+                    field.name()
+                )));
+            }
+            (Some(order), false) => {
+                // Optimization: drop any bitmap; emit a constant "value" 
branch byte.
+                let byte = union_value_branch_byte(order, false);
+                NullState::NullableNoNulls { byte }
+            }
+            (Some(null_order), true) => {
+                let null_buffer = array.nulls().cloned().ok_or_else(|| {

Review Comment:
   ```suggestion
                   let nulls = array.nulls().cloned().ok_or_else(|| {
   ```
   (simplifies the code below, and one less name to grok)



##########
arrow-avro/src/schema.rs:
##########
@@ -2056,4 +2133,15 @@ mod tests {
 
         assert_eq!(arrow_field, expected);
     }
+
+    #[test]
+    fn default_order_is_consistent() {
+        // Ensure TryFrom delegates to from_arrow_with_options(None)
+        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s", 
DataType::Utf8, true)]);
+        let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
+        let b = AvroSchema::from_arrow_with_options(&arrow_schema, None)
+            .unwrap()
+            .json_string;
+        assert_eq!(a, b);

Review Comment:
   nit
   ```suggestion
           let b = AvroSchema::from_arrow_with_options(&arrow_schema, None)
           assert_eq!(a, b.unwrap().json_string);
   ```



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -83,49 +81,358 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
     writer: &mut W,
     is_null: bool,
-    impala_mode: bool,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    let branch = if impala_mode == is_null { 1 } else { 0 };
-    write_int(writer, branch)
+    let byte = union_value_branch_byte(null_order, is_null);
+    writer
+        .write_all(&[byte])
+        .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) -> 
Result<(), ArrowError> {
-    encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        byte: u8,
+    },
+    Nullable {
+        nulls: NullBuffer,
+        null_order: Nullability,
+    },
 }
 
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
-    batch: &RecordBatch,
-    out: &mut W,
-    opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
-    let mut encoders = batch
-        .schema()
-        .fields()
-        .iter()
-        .zip(batch.columns())
-        .map(|(field, array)| Ok((field.is_nullable(), 
make_encoder(array.as_ref())?)))
-        .collect::<Result<Vec<_>, ArrowError>>()?;
-    (0..batch.num_rows()).try_for_each(|row| {
-        encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
-            if *is_nullable {
-                let is_null = enc.is_null(row);
-                write_optional_branch(out, is_null, opts.impala_mode)?;
-                if is_null {
-                    return Ok(());
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
+pub struct FieldEncoder<'a> {
+    encoder: Encoder<'a>,
+    null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+    fn make_encoder(
+        array: &'a dyn Array,
+        field: &Field,
+        plan: &FieldPlan,
+        nullability: Option<Nullability>,
+    ) -> Result<Self, ArrowError> {
+        let has_nulls = array.null_count() > 0;
+        let encoder = match plan {
+            FieldPlan::Struct { encoders } => {
+                let arr = array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| ArrowError::SchemaError("Expected 
StructArray".into()))?;
+                Encoder::Struct(Box::new(StructEncoder::try_new(arr, 
encoders)?))
+            }
+            FieldPlan::List {
+                items_nullability,
+                item_plan,
+            } => match array.data_type() {
+                DataType::List(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListArray".into()))?;
+                    Encoder::List(Box::new(ListEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeList(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
LargeListArray".into()))?;
+                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                    )))
+                }
+            },
+            FieldPlan::Scalar => match array.data_type() {
+                DataType::Boolean => 
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+                DataType::Utf8 => {
+                    
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+                }
+                DataType::LargeUtf8 => {
+                    
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+                }
+                DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+                DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+                DataType::Float32 => {
+                    
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+                }
+                DataType::Float64 => {
+                    
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+                }
+                DataType::Binary => 
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+                DataType::LargeBinary => {
+                    
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+                }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
Encoder::Timestamp(LongEncoder(
+                    array.as_primitive::<TimestampMicrosecondType>(),
+                )),
+                other => {
+                    return Err(ArrowError::NotYetImplemented(format!(
+                        "Avro scalar type not yet supported: {other:?}"
+                    )));
+                }
+            },
+            other => {
+                return Err(ArrowError::NotYetImplemented(
+                    "Avro writer: {other:?} not yet supported".into(),
+                ));
+            }
+        };
+        // Compute the effective null state from writer-declared nullability 
and data nulls.
+        let null_state = match (nullability, has_nulls) {
+            (None, false) => NullState::NonNullable,
+            (None, true) => {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "Avro site '{}' is non-nullable, but array contains nulls",
+                    field.name()
+                )));
+            }
+            (Some(order), false) => {
+                // Optimization: drop any bitmap; emit a constant "value" 
branch byte.
+                let byte = union_value_branch_byte(order, false);
+                NullState::NullableNoNulls { byte }
+            }
+            (Some(null_order), true) => {
+                let null_buffer = array.nulls().cloned().ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(format!(
+                        "Array for Avro site '{}' reports nulls but has no 
null buffer",
+                        field.name()
+                    ))
+                })?;
+                NullState::Nullable {
+                    nulls: null_buffer,
+                    null_order,
                 }
             }
-            enc.encode(row, out)
+        };
+        Ok(Self {
+            encoder,
+            null_state,
         })
-    })
+    }
+
+    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
+        match &self.null_state {
+            NullState::NonNullable => self.encoder.encode(idx, out),
+            NullState::NullableNoNulls { byte } => {
+                // Constant non-null branch byte, then value.
+                out.write_all(&[*byte]).map_err(|e| {
+                    ArrowError::IoError(format!("write union value branch: 
{e}"), e)
+                })?;
+                self.encoder.encode(idx, out)
+            }
+            NullState::Nullable { nulls, null_order } => {
+                let is_null = nulls.is_null(idx);
+                write_optional_index(out, is_null, *null_order)?;
+                if is_null {
+                    Ok(())
+                } else {
+                    self.encoder.encode(idx, out)
+                }
+            }
+        }
+    }
+}
+
+fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
+    let nulls_first = null_order == Nullability::default();
+    if nulls_first == is_null {
+        0x00
+    } else {
+        0x02
+    }
+}
+
+/// Per‑site encoder plan for a field. This mirrors the Avro structure, so 
nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+    /// Non-nested scalar/logical type
+    Scalar,
+    /// Record/Struct with Avro‑ordered children
+    Struct { encoders: Vec<FieldBinding> },
+    /// Array with item‑site nullability and nested plan
+    List {
+        items_nullability: Option<Nullability>,
+        item_plan: Box<FieldPlan>,
+    },
+}
+
+#[derive(Debug, Clone)]
+struct FieldBinding {
+    /// Index of the Arrow field/column associated with this Avro field site
+    arrow_index: usize,
+    /// Nullability/order for this site (None if required)

Review Comment:
   nit:
   ```suggestion
       /// Nullability/order for this site (None for required fields)
   ```



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -267,11 +513,275 @@ impl F32Encoder<'_> {
 
 struct F64Encoder<'a>(&'a arrow_array::Float64Array);
 impl F64Encoder<'_> {
-    #[inline]
     fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
         // Avro double: 8 bytes, IEEE-754 little-endian
         let bits = self.0.value(idx).to_bits();
         out.write_all(&bits.to_le_bytes())
             .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
     }
 }
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
+        write_len_prefixed(out, self.0.value(idx).as_bytes())
+    }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+struct StructEncoder<'a> {
+    encoders: Vec<FieldEncoder<'a>>,
+}
+
+impl<'a> StructEncoder<'a> {
+    fn try_new(
+        array: &'a StructArray,
+        field_bindings: &[FieldBinding],
+    ) -> Result<Self, ArrowError> {
+        let fields = match array.data_type() {
+            DataType::Struct(struct_fields) => struct_fields,
+            _ => return Err(ArrowError::SchemaError("Expected Struct".into())),
+        };
+        let mut encoders = Vec::with_capacity(field_bindings.len());
+        for field_binding in field_bindings {
+            let idx = field_binding.arrow_index;
+            let column = array.columns().get(idx).ok_or_else(|| {
+                ArrowError::SchemaError(format!("Struct child index {idx} out 
of range"))
+            })?;
+            let field = fields
+                .get(idx)
+                .ok_or_else(|| {
+                    ArrowError::SchemaError(format!("Struct child index {idx} 
out of range"))
+                })?
+                .as_ref();

Review Comment:
   nit:
   ```suggestion
                   })?;
   ```
   (just pass `field.as_ref()` to the function below)



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
 fn write_optional_index<W: Write + ?Sized>(
     writer: &mut W,
     is_null: bool,
-    order: Nullability,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    // For NullFirst: null => 0x00, value => 0x02
-    // For NullSecond: value => 0x00, null => 0x02
-    let byte = match order {
-        Nullability::NullFirst => {
-            if is_null {
-                0x00
-            } else {
-                0x02
-            }
-        }
-        Nullability::NullSecond => {
-            if is_null {
-                0x02
-            } else {
-                0x00
-            }
-        }
-    };
+    let byte = union_value_branch_byte(null_order, is_null);
     writer
         .write_all(&[byte])
         .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        byte: u8,
+    },
+    Nullable {
+        nulls: NullBuffer,
+        null_order: Nullability,
+    },
+}
+
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
+pub struct FieldEncoder<'a> {
+    encoder: Encoder<'a>,
+    null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+    fn make_encoder(
+        array: &'a dyn Array,
+        field: &Field,
+        plan: &FieldPlan,
+        nullability: Option<Nullability>,
+    ) -> Result<Self, ArrowError> {
+        let has_nulls = array.null_count() > 0;
+        let encoder = match plan {
+            FieldPlan::Struct { encoders } => {
+                let arr = array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| ArrowError::SchemaError("Expected 
StructArray".into()))?;
+                Encoder::Struct(Box::new(StructEncoder::try_new(arr, 
encoders)?))
+            }
+            FieldPlan::List {
+                items_nullability,
+                item_plan,
+            } => match array.data_type() {
+                DataType::List(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListArray".into()))?;
+                    Encoder::List(Box::new(ListEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeList(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
LargeListArray".into()))?;
+                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                    )))
+                }
+            },
+            FieldPlan::Scalar => match array.data_type() {
+                DataType::Boolean => 
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+                DataType::Utf8 => {
+                    
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+                }
+                DataType::LargeUtf8 => {
+                    
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+                }
+                DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+                DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+                DataType::Float32 => {
+                    
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+                }
+                DataType::Float64 => {
+                    
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+                }
+                DataType::Binary => 
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+                DataType::LargeBinary => {
+                    
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+                }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
Encoder::Timestamp(LongEncoder(
+                    array.as_primitive::<TimestampMicrosecondType>(),
+                )),
+                other => {
+                    return Err(ArrowError::NotYetImplemented(format!(
+                        "Avro scalar type not yet supported: {other:?}"
+                    )));
+                }
+            },
+            other => {
+                return Err(ArrowError::NotYetImplemented(
+                    "Avro writer: {other:?} not yet supported".into(),
+                ));
+            }
+        };
+        // Compute the effective null state from writer-declared nullability 
and data nulls.
+        let null_state = match (nullability, has_nulls) {
+            (None, false) => NullState::NonNullable,
+            (None, true) => {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "Avro site '{}' is non-nullable, but array contains nulls",
+                    field.name()
+                )));
+            }
+            (Some(order), false) => {
+                // Optimization: drop any bitmap; emit a constant "value" 
branch byte.
+                let byte = union_value_branch_byte(order, false);
+                NullState::NullableNoNulls { byte }
+            }
+            (Some(null_order), true) => {
+                let null_buffer = array.nulls().cloned().ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(format!(
+                        "Array for Avro site '{}' reports nulls but has no 
null buffer",
+                        field.name()
+                    ))
+                })?;

Review Comment:
   aside: I'm not sure `ok_or_else` plus `?` is actually helpful... unless the 
thirteen extra chars from `return Err(...);` makes the inner part spill to 
multiple lines (which it doesn't, in this case)
   ```suggestion
                   let Some(nulls) = array.nulls().cloned() else {
                       return Err(ArrowError::InvalidArgumentError(format!(
                           "Array for Avro site '{}' reports nulls but has no 
null buffer",
                           field.name()
                       )));
                   };
   ```



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -267,11 +513,275 @@ impl F32Encoder<'_> {
 
 struct F64Encoder<'a>(&'a arrow_array::Float64Array);
 impl F64Encoder<'_> {
-    #[inline]
     fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
         // Avro double: 8 bytes, IEEE-754 little-endian
         let bits = self.0.value(idx).to_bits();
         out.write_all(&bits.to_le_bytes())
             .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
     }
 }
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
+        write_len_prefixed(out, self.0.value(idx).as_bytes())
+    }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+struct StructEncoder<'a> {
+    encoders: Vec<FieldEncoder<'a>>,
+}
+
+impl<'a> StructEncoder<'a> {
+    fn try_new(
+        array: &'a StructArray,
+        field_bindings: &[FieldBinding],
+    ) -> Result<Self, ArrowError> {
+        let fields = match array.data_type() {
+            DataType::Struct(struct_fields) => struct_fields,
+            _ => return Err(ArrowError::SchemaError("Expected Struct".into())),

Review Comment:
   ```suggestion
           let DataType::Struct(fields) = array.data_type() else {
               return Err(ArrowError::SchemaError("Expected Struct".into()));
   ```



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -83,49 +81,358 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) 
-> Result<(), ArrowErr
 /// Branch index is 0-based per Avro unions:
 /// - Null-first (default): null => 0, value => 1
 /// - Null-second (Impala): value => 0, null => 1
-#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
     writer: &mut W,
     is_null: bool,
-    impala_mode: bool,
+    null_order: Nullability,
 ) -> Result<(), ArrowError> {
-    let branch = if impala_mode == is_null { 1 } else { 0 };
-    write_int(writer, branch)
+    let byte = union_value_branch_byte(null_order, is_null);
+    writer
+        .write_all(&[byte])
+        .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"), 
e))
 }
 
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) -> 
Result<(), ArrowError> {
-    encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+#[derive(Debug, Clone)]
+enum NullState {
+    NonNullable,
+    NullableNoNulls {
+        byte: u8,
+    },
+    Nullable {
+        nulls: NullBuffer,
+        null_order: Nullability,
+    },
 }
 
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
-    batch: &RecordBatch,
-    out: &mut W,
-    opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
-    let mut encoders = batch
-        .schema()
-        .fields()
-        .iter()
-        .zip(batch.columns())
-        .map(|(field, array)| Ok((field.is_nullable(), 
make_encoder(array.as_ref())?)))
-        .collect::<Result<Vec<_>, ArrowError>>()?;
-    (0..batch.num_rows()).try_for_each(|row| {
-        encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
-            if *is_nullable {
-                let is_null = enc.is_null(row);
-                write_optional_branch(out, is_null, opts.impala_mode)?;
-                if is_null {
-                    return Ok(());
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that 
enforces invariants
+pub struct FieldEncoder<'a> {
+    encoder: Encoder<'a>,
+    null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+    fn make_encoder(
+        array: &'a dyn Array,
+        field: &Field,
+        plan: &FieldPlan,
+        nullability: Option<Nullability>,
+    ) -> Result<Self, ArrowError> {
+        let has_nulls = array.null_count() > 0;
+        let encoder = match plan {
+            FieldPlan::Struct { encoders } => {
+                let arr = array
+                    .as_any()
+                    .downcast_ref::<StructArray>()
+                    .ok_or_else(|| ArrowError::SchemaError("Expected 
StructArray".into()))?;
+                Encoder::Struct(Box::new(StructEncoder::try_new(arr, 
encoders)?))
+            }
+            FieldPlan::List {
+                items_nullability,
+                item_plan,
+            } => match array.data_type() {
+                DataType::List(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<ListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
ListArray".into()))?;
+                    Encoder::List(Box::new(ListEncoder32::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                DataType::LargeList(_) => {
+                    let arr = array
+                        .as_any()
+                        .downcast_ref::<LargeListArray>()
+                        .ok_or_else(|| ArrowError::SchemaError("Expected 
LargeListArray".into()))?;
+                    Encoder::LargeList(Box::new(ListEncoder64::try_new(
+                        arr,
+                        *items_nullability,
+                        item_plan.as_ref(),
+                    )?))
+                }
+                other => {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Avro array site requires Arrow List/LargeList, found: 
{other:?}"
+                    )))
+                }
+            },
+            FieldPlan::Scalar => match array.data_type() {
+                DataType::Boolean => 
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+                DataType::Utf8 => {
+                    
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+                }
+                DataType::LargeUtf8 => {
+                    
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+                }
+                DataType::Int32 => 
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+                DataType::Int64 => 
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+                DataType::Float32 => {
+                    
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+                }
+                DataType::Float64 => {
+                    
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+                }
+                DataType::Binary => 
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+                DataType::LargeBinary => {
+                    
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+                }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => 
Encoder::Timestamp(LongEncoder(
+                    array.as_primitive::<TimestampMicrosecondType>(),
+                )),
+                other => {
+                    return Err(ArrowError::NotYetImplemented(format!(
+                        "Avro scalar type not yet supported: {other:?}"
+                    )));
+                }
+            },
+            other => {
+                return Err(ArrowError::NotYetImplemented(
+                    "Avro writer: {other:?} not yet supported".into(),
+                ));
+            }
+        };
+        // Compute the effective null state from writer-declared nullability 
and data nulls.
+        let null_state = match (nullability, has_nulls) {
+            (None, false) => NullState::NonNullable,
+            (None, true) => {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "Avro site '{}' is non-nullable, but array contains nulls",
+                    field.name()
+                )));
+            }
+            (Some(order), false) => {
+                // Optimization: drop any bitmap; emit a constant "value" 
branch byte.
+                let byte = union_value_branch_byte(order, false);
+                NullState::NullableNoNulls { byte }
+            }
+            (Some(null_order), true) => {
+                let null_buffer = array.nulls().cloned().ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(format!(
+                        "Array for Avro site '{}' reports nulls but has no 
null buffer",
+                        field.name()
+                    ))
+                })?;
+                NullState::Nullable {
+                    nulls: null_buffer,
+                    null_order,
                 }
             }
-            enc.encode(row, out)
+        };
+        Ok(Self {
+            encoder,
+            null_state,
         })
-    })
+    }
+
+    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
+        match &self.null_state {
+            NullState::NonNullable => self.encoder.encode(idx, out),
+            NullState::NullableNoNulls { byte } => {
+                // Constant non-null branch byte, then value.
+                out.write_all(&[*byte]).map_err(|e| {
+                    ArrowError::IoError(format!("write union value branch: 
{e}"), e)
+                })?;
+                self.encoder.encode(idx, out)
+            }
+            NullState::Nullable { nulls, null_order } => {
+                let is_null = nulls.is_null(idx);
+                write_optional_index(out, is_null, *null_order)?;
+                if is_null {
+                    Ok(())
+                } else {
+                    self.encoder.encode(idx, out)
+                }
+            }
+        }
+    }
+}
+
+fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
+    let nulls_first = null_order == Nullability::default();
+    if nulls_first == is_null {
+        0x00
+    } else {
+        0x02
+    }
+}
+
+/// Per‑site encoder plan for a field. This mirrors the Avro structure, so 
nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+    /// Non-nested scalar/logical type
+    Scalar,
+    /// Record/Struct with Avro‑ordered children
+    Struct { encoders: Vec<FieldBinding> },
+    /// Array with item‑site nullability and nested plan
+    List {
+        items_nullability: Option<Nullability>,
+        item_plan: Box<FieldPlan>,
+    },
+}
+
+#[derive(Debug, Clone)]
+struct FieldBinding {
+    /// Index of the Arrow field/column associated with this Avro field site
+    arrow_index: usize,
+    /// Nullability/order for this site (None if required)
+    nullability: Option<Nullability>,
+    /// Nested plan for this site
+    plan: FieldPlan,
+}
+
+/// Builder for `RecordEncoder` write plan
+#[derive(Debug)]
+pub struct RecordEncoderBuilder<'a> {
+    avro_root: &'a AvroField,
+    arrow_schema: &'a ArrowSchema,
+}
+
+impl<'a> RecordEncoderBuilder<'a> {
+    /// Create a new builder from the Avro root and Arrow schema.
+    pub fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) -> 
Self {
+        Self {
+            avro_root,
+            arrow_schema,
+        }
+    }
+
+    /// Build the `RecordEncoder` by walking the Avro **record** root in Avro 
order,
+    /// resolving each field to an Arrow index by name.
+    pub fn build(self) -> Result<RecordEncoder, ArrowError> {
+        let avro_root_dt = self.avro_root.data_type();
+        let root_fields = match avro_root_dt.codec() {
+            Codec::Struct(fields) => fields,
+            _ => {
+                return Err(ArrowError::SchemaError(
+                    "Top-level Avro schema must be a record/struct".into(),
+                ))
+            }

Review Comment:
   ```suggestion
           let Codec::Struct(root_fields) = avro_root_dt.codec() else {
               return Err(ArrowError::SchemaError(
                   "Top-level Avro schema must be a record/struct".into(),
               ));
   ```



##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -267,11 +513,275 @@ impl F32Encoder<'_> {
 
 struct F64Encoder<'a>(&'a arrow_array::Float64Array);
 impl F64Encoder<'_> {
-    #[inline]
     fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
         // Avro double: 8 bytes, IEEE-754 little-endian
         let bits = self.0.value(idx).to_bits();
         out.write_all(&bits.to_le_bytes())
             .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
     }
 }
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
+        write_len_prefixed(out, self.0.value(idx).as_bytes())
+    }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+struct StructEncoder<'a> {
+    encoders: Vec<FieldEncoder<'a>>,
+}
+
+impl<'a> StructEncoder<'a> {
+    fn try_new(
+        array: &'a StructArray,
+        field_bindings: &[FieldBinding],
+    ) -> Result<Self, ArrowError> {
+        let fields = match array.data_type() {
+            DataType::Struct(struct_fields) => struct_fields,
+            _ => return Err(ArrowError::SchemaError("Expected Struct".into())),
+        };
+        let mut encoders = Vec::with_capacity(field_bindings.len());
+        for field_binding in field_bindings {
+            let idx = field_binding.arrow_index;
+            let column = array.columns().get(idx).ok_or_else(|| {
+                ArrowError::SchemaError(format!("Struct child index {idx} out 
of range"))
+            })?;
+            let field = fields
+                .get(idx)
+                .ok_or_else(|| {
+                    ArrowError::SchemaError(format!("Struct child index {idx} 
out of range"))
+                })?
+                .as_ref();
+            let encoder = prepare_value_site_encoder(
+                column.as_ref(),
+                field,
+                field_binding.nullability,
+                &field_binding.plan,
+            )?;
+            encoders.push(encoder);
+        }
+        Ok(Self { encoders })
+    }
+
+    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> 
Result<(), ArrowError> {
+        for encoder in self.encoders.iter_mut() {
+            encoder.encode(idx, out)?;
+        }
+        Ok(())
+    }
+}
+
+fn encode_blocked_range<W: Write + ?Sized, F>(
+    out: &mut W,
+    start: usize,
+    end: usize,
+    mut write_item: F,
+) -> Result<(), ArrowError>
+where
+    F: FnMut(usize, &mut W) -> Result<(), ArrowError>,
+{
+    let len = end.saturating_sub(start);
+    if len == 0 {
+        // Zero-length terminator per Avro spec
+        write_long(out, 0)?;
+        return Ok(());
+    }
+    // Emit a single positive block for performance, then the end marker.
+    write_long(out, len as i64)?;
+    for j in start..end {
+        write_item(j, out)?;

Review Comment:
   aside: It's a bit disorienting that some methods (like this one and encoder 
`encode`) take `out` as the second param, while others (like `write_long` 
below) take `out` as the first param? Can we harmonize them? I would suggest 
`out` is always the first param, but consistency is the main concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to