scovich commented on code in PR #8274:
URL: https://github.com/apache/arrow-rs/pull/8274#discussion_r2325187158
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- order: Nullability,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- // For NullFirst: null => 0x00, value => 0x02
- // For NullSecond: value => 0x00, null => 0x02
- let byte = match order {
- Nullability::NullFirst => {
- if is_null {
- 0x00
- } else {
- 0x02
- }
- }
- Nullability::NullSecond => {
- if is_null {
- 0x02
- } else {
- 0x00
- }
- }
- };
+ let byte = union_value_branch_byte(null_order, is_null);
writer
.write_all(&[byte])
.map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ byte: u8,
Review Comment:
```suggestion
union_value_byte: u8,
```
(`byte` by itself adds nothing -- `u8` is a byte by definition)
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- order: Nullability,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- // For NullFirst: null => 0x00, value => 0x02
- // For NullSecond: value => 0x00, null => 0x02
- let byte = match order {
- Nullability::NullFirst => {
- if is_null {
- 0x00
- } else {
- 0x02
- }
- }
- Nullability::NullSecond => {
- if is_null {
- 0x02
- } else {
- 0x00
- }
- }
- };
+ let byte = union_value_branch_byte(null_order, is_null);
writer
.write_all(&[byte])
.map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ byte: u8,
+ },
+ Nullable {
+ nulls: NullBuffer,
+ null_order: Nullability,
+ },
+}
+
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: &FieldPlan,
+ nullability: Option<Nullability>,
+ ) -> Result<Self, ArrowError> {
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ // Compute the effective null state from writer-declared nullability
and data nulls.
+ let null_state = match (nullability, has_nulls) {
Review Comment:
This is the only use site for `has_nulls`, just fold it in?
```suggestion
let null_state = match (nullability, array.null_count() > 0) {
```
(or at least define it closer to its use site)
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- order: Nullability,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- // For NullFirst: null => 0x00, value => 0x02
- // For NullSecond: value => 0x00, null => 0x02
- let byte = match order {
- Nullability::NullFirst => {
- if is_null {
- 0x00
- } else {
- 0x02
- }
- }
- Nullability::NullSecond => {
- if is_null {
- 0x02
- } else {
- 0x00
- }
- }
- };
+ let byte = union_value_branch_byte(null_order, is_null);
writer
.write_all(&[byte])
.map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ byte: u8,
+ },
+ Nullable {
+ nulls: NullBuffer,
+ null_order: Nullability,
+ },
+}
+
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: &FieldPlan,
+ nullability: Option<Nullability>,
+ ) -> Result<Self, ArrowError> {
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ // Compute the effective null state from writer-declared nullability
and data nulls.
+ let null_state = match (nullability, has_nulls) {
+ (None, false) => NullState::NonNullable,
+ (None, true) => {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Avro site '{}' is non-nullable, but array contains nulls",
+ field.name()
+ )));
+ }
+ (Some(order), false) => {
+ // Optimization: drop any bitmap; emit a constant "value"
branch byte.
+ let byte = union_value_branch_byte(order, false);
+ NullState::NullableNoNulls { byte }
+ }
+ (Some(null_order), true) => {
+ let null_buffer = array.nulls().cloned().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!(
+ "Array for Avro site '{}' reports nulls but has no
null buffer",
+ field.name()
+ ))
+ })?;
+ NullState::Nullable {
+ nulls: null_buffer,
+ null_order,
+ }
+ }
+ };
+ Ok(Self {
+ encoder,
+ null_state,
+ })
+ }
+
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ match &self.null_state {
+ NullState::NonNullable => self.encoder.encode(idx, out),
+ NullState::NullableNoNulls { byte } => {
+ // Constant non-null branch byte, then value.
+ out.write_all(&[*byte]).map_err(|e| {
+ ArrowError::IoError(format!("write union value branch:
{e}"), e)
+ })?;
+ self.encoder.encode(idx, out)
+ }
+ NullState::Nullable { nulls, null_order } => {
+ let is_null = nulls.is_null(idx);
+ write_optional_index(out, is_null, *null_order)?;
+ if is_null {
+ Ok(())
+ } else {
+ self.encoder.encode(idx, out)
+ }
+ }
+ }
Review Comment:
nit -- We could remove some duplication by:
```suggestion
NullState::NonNullable => {}
NullState::NullableNoNulls { byte } =>
out.write_all(&[*byte]).map_err(|e| {
ArrowError::IoError(format!("write union value branch:
{e}"), e)
})?,
NullState::Nullable { nulls, null_order } => {
let is_null = nulls.is_null(idx);
write_optional_index(out, is_null, *null_order)?;
if is_null {
return Ok(()); // no value to write
}
}
}
self.encoder.encode(idx, out)
```
or even
```suggestion
NullState::NonNullable => {}
NullState::NullableNoNulls { byte } =>
out.write_all(&[*byte]).map_err(|e| {
ArrowError::IoError(format!("write union value branch:
{e}"), e)
})?,
NullState::Nullable { nulls, null_order } if nulls.is_null(idx)
=> {
return write_optional_index(out, true, *null_order); // no
value to write
}
NullState::Nullable { null_order, .. } => {
write_optional_index(out, false, *null_order)?;
}
}
self.encoder.encode(idx, out)
```
(splitting apart the `write_optional_index` calls will likely trigger a
bunch of jump threading and inlining optimizations that more than makes up for
the apparent duplication at the call site -- the compiler will basically put
half of the function in each location, thanks to the hard-wired `is_null` arg
value)
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -83,49 +81,358 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ let byte = union_value_branch_byte(null_order, is_null);
+ writer
+ .write_all(&[byte])
+ .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) ->
Result<(), ArrowError> {
- encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ byte: u8,
+ },
+ Nullable {
+ nulls: NullBuffer,
+ null_order: Nullability,
+ },
}
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
- batch: &RecordBatch,
- out: &mut W,
- opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
- let mut encoders = batch
- .schema()
- .fields()
- .iter()
- .zip(batch.columns())
- .map(|(field, array)| Ok((field.is_nullable(),
make_encoder(array.as_ref())?)))
- .collect::<Result<Vec<_>, ArrowError>>()?;
- (0..batch.num_rows()).try_for_each(|row| {
- encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
- if *is_nullable {
- let is_null = enc.is_null(row);
- write_optional_branch(out, is_null, opts.impala_mode)?;
- if is_null {
- return Ok(());
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: &FieldPlan,
+ nullability: Option<Nullability>,
+ ) -> Result<Self, ArrowError> {
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ // Compute the effective null state from writer-declared nullability
and data nulls.
+ let null_state = match (nullability, has_nulls) {
+ (None, false) => NullState::NonNullable,
+ (None, true) => {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Avro site '{}' is non-nullable, but array contains nulls",
+ field.name()
+ )));
+ }
+ (Some(order), false) => {
+ // Optimization: drop any bitmap; emit a constant "value"
branch byte.
+ let byte = union_value_branch_byte(order, false);
+ NullState::NullableNoNulls { byte }
+ }
+ (Some(null_order), true) => {
+ let null_buffer = array.nulls().cloned().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!(
+ "Array for Avro site '{}' reports nulls but has no
null buffer",
+ field.name()
+ ))
+ })?;
+ NullState::Nullable {
+ nulls: null_buffer,
+ null_order,
}
}
- enc.encode(row, out)
+ };
+ Ok(Self {
+ encoder,
+ null_state,
})
- })
+ }
+
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ match &self.null_state {
+ NullState::NonNullable => self.encoder.encode(idx, out),
+ NullState::NullableNoNulls { byte } => {
+ // Constant non-null branch byte, then value.
+ out.write_all(&[*byte]).map_err(|e| {
+ ArrowError::IoError(format!("write union value branch:
{e}"), e)
+ })?;
+ self.encoder.encode(idx, out)
+ }
+ NullState::Nullable { nulls, null_order } => {
+ let is_null = nulls.is_null(idx);
+ write_optional_index(out, is_null, *null_order)?;
+ if is_null {
+ Ok(())
+ } else {
+ self.encoder.encode(idx, out)
+ }
+ }
+ }
+ }
+}
+
+fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
+ let nulls_first = null_order == Nullability::default();
+ if nulls_first == is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+}
+
+/// Per‑site encoder plan for a field. This mirrors the Avro structure, so
nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+ /// Non-nested scalar/logical type
+ Scalar,
+ /// Record/Struct with Avro‑ordered children
+ Struct { encoders: Vec<FieldBinding> },
+ /// Array with item‑site nullability and nested plan
+ List {
+ items_nullability: Option<Nullability>,
+ item_plan: Box<FieldPlan>,
+ },
+}
+
+#[derive(Debug, Clone)]
+struct FieldBinding {
+ /// Index of the Arrow field/column associated with this Avro field site
+ arrow_index: usize,
+ /// Nullability/order for this site (None if required)
+ nullability: Option<Nullability>,
+ /// Nested plan for this site
+ plan: FieldPlan,
+}
+
+/// Builder for `RecordEncoder` write plan
+#[derive(Debug)]
+pub struct RecordEncoderBuilder<'a> {
+ avro_root: &'a AvroField,
+ arrow_schema: &'a ArrowSchema,
+}
+
+impl<'a> RecordEncoderBuilder<'a> {
+ /// Create a new builder from the Avro root and Arrow schema.
+ pub fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) ->
Self {
+ Self {
+ avro_root,
+ arrow_schema,
+ }
+ }
+
+ /// Build the `RecordEncoder` by walking the Avro **record** root in Avro
order,
+ /// resolving each field to an Arrow index by name.
+ pub fn build(self) -> Result<RecordEncoder, ArrowError> {
+ let avro_root_dt = self.avro_root.data_type();
+ let root_fields = match avro_root_dt.codec() {
+ Codec::Struct(fields) => fields,
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Top-level Avro schema must be a record/struct".into(),
+ ))
+ }
+ };
+ let mut columns = Vec::with_capacity(root_fields.len());
+ for root_field in root_fields.iter() {
Review Comment:
nit: the `iter()` call should be unnecssary (thanks to `impl IntoIterator
for &Self`)?
```suggestion
for root_field in root_fields {
```
##########
arrow-avro/src/schema.rs:
##########
@@ -1118,26 +1056,37 @@ fn datatype_to_avro_with_order(
Ok((val, extras))
}
-fn arrow_field_to_avro_with_order(
+fn process_datatype(
+ dt: &DataType,
+ field_name: &str,
+ metadata: &HashMap<String, String>,
+ name_gen: &mut NameGenerator,
+ null_order: Nullability,
+ is_nullable: bool,
+) -> Result<Value, ArrowError> {
+ let (schema, extras) = datatype_to_avro(dt, field_name, metadata,
name_gen, null_order)?;
+ let merged = merge_extras(schema, extras);
+ Ok(if is_nullable {
+ wrap_nullable(merged, null_order)
+ } else {
+ merged
+ })
Review Comment:
nit:
```suggestion
let mut merged = merge_extras(schema, extras);
if is_nullable {
merged = wrap_nullable(merged, null_order)
}
Ok(merged)
```
(`mut` is underutilized in situations like this, and IMO `if` inside parens
is hard to read+maintain)
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- order: Nullability,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- // For NullFirst: null => 0x00, value => 0x02
- // For NullSecond: value => 0x00, null => 0x02
- let byte = match order {
- Nullability::NullFirst => {
- if is_null {
- 0x00
- } else {
- 0x02
- }
- }
- Nullability::NullSecond => {
- if is_null {
- 0x02
- } else {
- 0x00
- }
- }
- };
+ let byte = union_value_branch_byte(null_order, is_null);
writer
.write_all(&[byte])
.map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ byte: u8,
+ },
+ Nullable {
+ nulls: NullBuffer,
+ null_order: Nullability,
+ },
+}
+
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: &FieldPlan,
+ nullability: Option<Nullability>,
+ ) -> Result<Self, ArrowError> {
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ // Compute the effective null state from writer-declared nullability
and data nulls.
+ let null_state = match (nullability, has_nulls) {
+ (None, false) => NullState::NonNullable,
+ (None, true) => {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Avro site '{}' is non-nullable, but array contains nulls",
+ field.name()
+ )));
+ }
+ (Some(order), false) => {
+ // Optimization: drop any bitmap; emit a constant "value"
branch byte.
+ let byte = union_value_branch_byte(order, false);
+ NullState::NullableNoNulls { byte }
+ }
+ (Some(null_order), true) => {
+ let null_buffer = array.nulls().cloned().ok_or_else(|| {
Review Comment:
```suggestion
let nulls = array.nulls().cloned().ok_or_else(|| {
```
(simplifies the code below, and one less name to grok)
##########
arrow-avro/src/schema.rs:
##########
@@ -2056,4 +2133,15 @@ mod tests {
assert_eq!(arrow_field, expected);
}
+
+ #[test]
+ fn default_order_is_consistent() {
+ // Ensure TryFrom delegates to from_arrow_with_options(None)
+ let arrow_schema = ArrowSchema::new(vec![ArrowField::new("s",
DataType::Utf8, true)]);
+ let a = AvroSchema::try_from(&arrow_schema).unwrap().json_string;
+ let b = AvroSchema::from_arrow_with_options(&arrow_schema, None)
+ .unwrap()
+ .json_string;
+ assert_eq!(a, b);
Review Comment:
nit
```suggestion
let b = AvroSchema::from_arrow_with_options(&arrow_schema, None)
assert_eq!(a, b.unwrap().json_string);
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -83,49 +81,358 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ let byte = union_value_branch_byte(null_order, is_null);
+ writer
+ .write_all(&[byte])
+ .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) ->
Result<(), ArrowError> {
- encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ byte: u8,
+ },
+ Nullable {
+ nulls: NullBuffer,
+ null_order: Nullability,
+ },
}
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
- batch: &RecordBatch,
- out: &mut W,
- opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
- let mut encoders = batch
- .schema()
- .fields()
- .iter()
- .zip(batch.columns())
- .map(|(field, array)| Ok((field.is_nullable(),
make_encoder(array.as_ref())?)))
- .collect::<Result<Vec<_>, ArrowError>>()?;
- (0..batch.num_rows()).try_for_each(|row| {
- encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
- if *is_nullable {
- let is_null = enc.is_null(row);
- write_optional_branch(out, is_null, opts.impala_mode)?;
- if is_null {
- return Ok(());
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: &FieldPlan,
+ nullability: Option<Nullability>,
+ ) -> Result<Self, ArrowError> {
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ // Compute the effective null state from writer-declared nullability
and data nulls.
+ let null_state = match (nullability, has_nulls) {
+ (None, false) => NullState::NonNullable,
+ (None, true) => {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Avro site '{}' is non-nullable, but array contains nulls",
+ field.name()
+ )));
+ }
+ (Some(order), false) => {
+ // Optimization: drop any bitmap; emit a constant "value"
branch byte.
+ let byte = union_value_branch_byte(order, false);
+ NullState::NullableNoNulls { byte }
+ }
+ (Some(null_order), true) => {
+ let null_buffer = array.nulls().cloned().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!(
+ "Array for Avro site '{}' reports nulls but has no
null buffer",
+ field.name()
+ ))
+ })?;
+ NullState::Nullable {
+ nulls: null_buffer,
+ null_order,
}
}
- enc.encode(row, out)
+ };
+ Ok(Self {
+ encoder,
+ null_state,
})
- })
+ }
+
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ match &self.null_state {
+ NullState::NonNullable => self.encoder.encode(idx, out),
+ NullState::NullableNoNulls { byte } => {
+ // Constant non-null branch byte, then value.
+ out.write_all(&[*byte]).map_err(|e| {
+ ArrowError::IoError(format!("write union value branch:
{e}"), e)
+ })?;
+ self.encoder.encode(idx, out)
+ }
+ NullState::Nullable { nulls, null_order } => {
+ let is_null = nulls.is_null(idx);
+ write_optional_index(out, is_null, *null_order)?;
+ if is_null {
+ Ok(())
+ } else {
+ self.encoder.encode(idx, out)
+ }
+ }
+ }
+ }
+}
+
+fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
+ let nulls_first = null_order == Nullability::default();
+ if nulls_first == is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+}
+
+/// Per‑site encoder plan for a field. This mirrors the Avro structure, so
nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+ /// Non-nested scalar/logical type
+ Scalar,
+ /// Record/Struct with Avro‑ordered children
+ Struct { encoders: Vec<FieldBinding> },
+ /// Array with item‑site nullability and nested plan
+ List {
+ items_nullability: Option<Nullability>,
+ item_plan: Box<FieldPlan>,
+ },
+}
+
+#[derive(Debug, Clone)]
+struct FieldBinding {
+ /// Index of the Arrow field/column associated with this Avro field site
+ arrow_index: usize,
+ /// Nullability/order for this site (None if required)
Review Comment:
nit:
```suggestion
/// Nullability/order for this site (None for required fields)
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -267,11 +513,275 @@ impl F32Encoder<'_> {
struct F64Encoder<'a>(&'a arrow_array::Float64Array);
impl F64Encoder<'_> {
- #[inline]
fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
// Avro double: 8 bytes, IEEE-754 little-endian
let bits = self.0.value(idx).to_bits();
out.write_all(&bits.to_le_bytes())
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+struct StructEncoder<'a> {
+ encoders: Vec<FieldEncoder<'a>>,
+}
+
+impl<'a> StructEncoder<'a> {
+ fn try_new(
+ array: &'a StructArray,
+ field_bindings: &[FieldBinding],
+ ) -> Result<Self, ArrowError> {
+ let fields = match array.data_type() {
+ DataType::Struct(struct_fields) => struct_fields,
+ _ => return Err(ArrowError::SchemaError("Expected Struct".into())),
+ };
+ let mut encoders = Vec::with_capacity(field_bindings.len());
+ for field_binding in field_bindings {
+ let idx = field_binding.arrow_index;
+ let column = array.columns().get(idx).ok_or_else(|| {
+ ArrowError::SchemaError(format!("Struct child index {idx} out
of range"))
+ })?;
+ let field = fields
+ .get(idx)
+ .ok_or_else(|| {
+ ArrowError::SchemaError(format!("Struct child index {idx}
out of range"))
+ })?
+ .as_ref();
Review Comment:
nit:
```suggestion
})?;
```
(just pass `field.as_ref()` to the function below)
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,36 +81,186 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- order: Nullability,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- // For NullFirst: null => 0x00, value => 0x02
- // For NullSecond: value => 0x00, null => 0x02
- let byte = match order {
- Nullability::NullFirst => {
- if is_null {
- 0x00
- } else {
- 0x02
- }
- }
- Nullability::NullSecond => {
- if is_null {
- 0x02
- } else {
- 0x00
- }
- }
- };
+ let byte = union_value_branch_byte(null_order, is_null);
writer
.write_all(&[byte])
.map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ byte: u8,
+ },
+ Nullable {
+ nulls: NullBuffer,
+ null_order: Nullability,
+ },
+}
+
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: &FieldPlan,
+ nullability: Option<Nullability>,
+ ) -> Result<Self, ArrowError> {
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ // Compute the effective null state from writer-declared nullability
and data nulls.
+ let null_state = match (nullability, has_nulls) {
+ (None, false) => NullState::NonNullable,
+ (None, true) => {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Avro site '{}' is non-nullable, but array contains nulls",
+ field.name()
+ )));
+ }
+ (Some(order), false) => {
+ // Optimization: drop any bitmap; emit a constant "value"
branch byte.
+ let byte = union_value_branch_byte(order, false);
+ NullState::NullableNoNulls { byte }
+ }
+ (Some(null_order), true) => {
+ let null_buffer = array.nulls().cloned().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!(
+ "Array for Avro site '{}' reports nulls but has no
null buffer",
+ field.name()
+ ))
+ })?;
Review Comment:
aside: I'm not sure `ok_or_else` plus `?` is actually helpful... unless the
thirteen extra chars from `return Err(...);` makes the inner part spill to
multiple lines (which it doesn't, in this case)
```suggestion
let Some(nulls) = array.nulls().cloned() else {
return Err(ArrowError::InvalidArgumentError(format!(
"Array for Avro site '{}' reports nulls but has no
null buffer",
field.name()
)));
};
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -267,11 +513,275 @@ impl F32Encoder<'_> {
struct F64Encoder<'a>(&'a arrow_array::Float64Array);
impl F64Encoder<'_> {
- #[inline]
fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
// Avro double: 8 bytes, IEEE-754 little-endian
let bits = self.0.value(idx).to_bits();
out.write_all(&bits.to_le_bytes())
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+struct StructEncoder<'a> {
+ encoders: Vec<FieldEncoder<'a>>,
+}
+
+impl<'a> StructEncoder<'a> {
+ fn try_new(
+ array: &'a StructArray,
+ field_bindings: &[FieldBinding],
+ ) -> Result<Self, ArrowError> {
+ let fields = match array.data_type() {
+ DataType::Struct(struct_fields) => struct_fields,
+ _ => return Err(ArrowError::SchemaError("Expected Struct".into())),
Review Comment:
```suggestion
let DataType::Struct(fields) = array.data_type() else {
return Err(ArrowError::SchemaError("Expected Struct".into()));
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -83,49 +81,358 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// Branch index is 0-based per Avro unions:
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
-#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ null_order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ let byte = union_value_branch_byte(null_order, is_null);
+ writer
+ .write_all(&[byte])
+ .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) ->
Result<(), ArrowError> {
- encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+#[derive(Debug, Clone)]
+enum NullState {
+ NonNullable,
+ NullableNoNulls {
+ byte: u8,
+ },
+ Nullable {
+ nulls: NullBuffer,
+ null_order: Nullability,
+ },
}
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
- batch: &RecordBatch,
- out: &mut W,
- opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
- let mut encoders = batch
- .schema()
- .fields()
- .iter()
- .zip(batch.columns())
- .map(|(field, array)| Ok((field.is_nullable(),
make_encoder(array.as_ref())?)))
- .collect::<Result<Vec<_>, ArrowError>>()?;
- (0..batch.num_rows()).try_for_each(|row| {
- encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
- if *is_nullable {
- let is_null = enc.is_null(row);
- write_optional_branch(out, is_null, opts.impala_mode)?;
- if is_null {
- return Ok(());
+/// Arrow to Avro FieldEncoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Carries the per-site nullability **state** as a single enum that
enforces invariants
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ null_state: NullState,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: &FieldPlan,
+ nullability: Option<Nullability>,
+ ) -> Result<Self, ArrowError> {
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ // Compute the effective null state from writer-declared nullability
and data nulls.
+ let null_state = match (nullability, has_nulls) {
+ (None, false) => NullState::NonNullable,
+ (None, true) => {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Avro site '{}' is non-nullable, but array contains nulls",
+ field.name()
+ )));
+ }
+ (Some(order), false) => {
+ // Optimization: drop any bitmap; emit a constant "value"
branch byte.
+ let byte = union_value_branch_byte(order, false);
+ NullState::NullableNoNulls { byte }
+ }
+ (Some(null_order), true) => {
+ let null_buffer = array.nulls().cloned().ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!(
+ "Array for Avro site '{}' reports nulls but has no
null buffer",
+ field.name()
+ ))
+ })?;
+ NullState::Nullable {
+ nulls: null_buffer,
+ null_order,
}
}
- enc.encode(row, out)
+ };
+ Ok(Self {
+ encoder,
+ null_state,
})
- })
+ }
+
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ match &self.null_state {
+ NullState::NonNullable => self.encoder.encode(idx, out),
+ NullState::NullableNoNulls { byte } => {
+ // Constant non-null branch byte, then value.
+ out.write_all(&[*byte]).map_err(|e| {
+ ArrowError::IoError(format!("write union value branch:
{e}"), e)
+ })?;
+ self.encoder.encode(idx, out)
+ }
+ NullState::Nullable { nulls, null_order } => {
+ let is_null = nulls.is_null(idx);
+ write_optional_index(out, is_null, *null_order)?;
+ if is_null {
+ Ok(())
+ } else {
+ self.encoder.encode(idx, out)
+ }
+ }
+ }
+ }
+}
+
+fn union_value_branch_byte(null_order: Nullability, is_null: bool) -> u8 {
+ let nulls_first = null_order == Nullability::default();
+ if nulls_first == is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+}
+
+/// Per‑site encoder plan for a field. This mirrors the Avro structure, so
nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+ /// Non-nested scalar/logical type
+ Scalar,
+ /// Record/Struct with Avro‑ordered children
+ Struct { encoders: Vec<FieldBinding> },
+ /// Array with item‑site nullability and nested plan
+ List {
+ items_nullability: Option<Nullability>,
+ item_plan: Box<FieldPlan>,
+ },
+}
+
+#[derive(Debug, Clone)]
+struct FieldBinding {
+ /// Index of the Arrow field/column associated with this Avro field site
+ arrow_index: usize,
+ /// Nullability/order for this site (None if required)
+ nullability: Option<Nullability>,
+ /// Nested plan for this site
+ plan: FieldPlan,
+}
+
+/// Builder for `RecordEncoder` write plan
+#[derive(Debug)]
+pub struct RecordEncoderBuilder<'a> {
+ avro_root: &'a AvroField,
+ arrow_schema: &'a ArrowSchema,
+}
+
+impl<'a> RecordEncoderBuilder<'a> {
+ /// Create a new builder from the Avro root and Arrow schema.
+ pub fn new(avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema) ->
Self {
+ Self {
+ avro_root,
+ arrow_schema,
+ }
+ }
+
+ /// Build the `RecordEncoder` by walking the Avro **record** root in Avro
order,
+ /// resolving each field to an Arrow index by name.
+ pub fn build(self) -> Result<RecordEncoder, ArrowError> {
+ let avro_root_dt = self.avro_root.data_type();
+ let root_fields = match avro_root_dt.codec() {
+ Codec::Struct(fields) => fields,
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Top-level Avro schema must be a record/struct".into(),
+ ))
+ }
Review Comment:
```suggestion
let Codec::Struct(root_fields) = avro_root_dt.codec() else {
return Err(ArrowError::SchemaError(
"Top-level Avro schema must be a record/struct".into(),
));
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -267,11 +513,275 @@ impl F32Encoder<'_> {
struct F64Encoder<'a>(&'a arrow_array::Float64Array);
impl F64Encoder<'_> {
- #[inline]
fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
// Avro double: 8 bytes, IEEE-754 little-endian
let bits = self.0.value(idx).to_bits();
out.write_all(&bits.to_le_bytes())
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+struct StructEncoder<'a> {
+ encoders: Vec<FieldEncoder<'a>>,
+}
+
+impl<'a> StructEncoder<'a> {
+ fn try_new(
+ array: &'a StructArray,
+ field_bindings: &[FieldBinding],
+ ) -> Result<Self, ArrowError> {
+ let fields = match array.data_type() {
+ DataType::Struct(struct_fields) => struct_fields,
+ _ => return Err(ArrowError::SchemaError("Expected Struct".into())),
+ };
+ let mut encoders = Vec::with_capacity(field_bindings.len());
+ for field_binding in field_bindings {
+ let idx = field_binding.arrow_index;
+ let column = array.columns().get(idx).ok_or_else(|| {
+ ArrowError::SchemaError(format!("Struct child index {idx} out
of range"))
+ })?;
+ let field = fields
+ .get(idx)
+ .ok_or_else(|| {
+ ArrowError::SchemaError(format!("Struct child index {idx}
out of range"))
+ })?
+ .as_ref();
+ let encoder = prepare_value_site_encoder(
+ column.as_ref(),
+ field,
+ field_binding.nullability,
+ &field_binding.plan,
+ )?;
+ encoders.push(encoder);
+ }
+ Ok(Self { encoders })
+ }
+
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ for encoder in self.encoders.iter_mut() {
+ encoder.encode(idx, out)?;
+ }
+ Ok(())
+ }
+}
+
+fn encode_blocked_range<W: Write + ?Sized, F>(
+ out: &mut W,
+ start: usize,
+ end: usize,
+ mut write_item: F,
+) -> Result<(), ArrowError>
+where
+ F: FnMut(usize, &mut W) -> Result<(), ArrowError>,
+{
+ let len = end.saturating_sub(start);
+ if len == 0 {
+ // Zero-length terminator per Avro spec
+ write_long(out, 0)?;
+ return Ok(());
+ }
+ // Emit a single positive block for performance, then the end marker.
+ write_long(out, len as i64)?;
+ for j in start..end {
+ write_item(j, out)?;
Review Comment:
aside: It's a bit disorienting that some methods (like this one and encoder
`encode`) take `out` as the second param, while others (like `write_long`
below) take `out` as the first param? Can we harmonize them? I would suggest
`out` is always the first param, but consistency is the main concern.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]