nathaniel-d-ef commented on code in PR #8274:
URL: https://github.com/apache/arrow-rs/pull/8274#discussion_r2319092823
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -275,3 +384,434 @@ impl F64Encoder<'_> {
.map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
}
}
+
+struct Utf8GenericEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
+
+impl<'a, O: OffsetSizeTrait> Utf8GenericEncoder<'a, O> {
+ #[inline]
+ fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) ->
Result<(), ArrowError> {
+ write_len_prefixed(out, self.0.value(idx).as_bytes())
+ }
+}
+
+type Utf8Encoder<'a> = Utf8GenericEncoder<'a, i32>;
+type Utf8LargeEncoder<'a> = Utf8GenericEncoder<'a, i64>;
+
+/// Unified field encoder:
+/// - Holds the inner `Encoder` (by value)
+/// - Tracks the column/site null buffer and whether any nulls exist
+/// - Carries per-site Avro `Nullability` and precomputed union branch (fast
path)
+pub struct FieldEncoder<'a> {
+ encoder: Encoder<'a>,
+ nulls: Option<NullBuffer>,
+ has_nulls: bool,
+ nullability: Option<Nullability>,
+ /// Precomputed constant branch byte if the site is nullable but contains
no nulls
+ pre: Option<u8>,
+}
+
+impl<'a> FieldEncoder<'a> {
+ fn make_encoder(
+ array: &'a dyn Array,
+ field: &Field,
+ plan: PlanRef<'_>,
+ ) -> Result<Self, ArrowError> {
+ let nulls = array.nulls().cloned();
+ let has_nulls = array.null_count() > 0;
+ let encoder = match plan {
+ FieldPlan::Struct { encoders } => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
StructArray".into()))?;
+ Encoder::Struct(Box::new(StructEncoder::try_new(arr,
encoders)?))
+ }
+ FieldPlan::List {
+ items_nullability,
+ item_plan,
+ } => match array.data_type() {
+ DataType::List(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
ListArray".into()))?;
+ Encoder::List(Box::new(ListEncoder32::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ DataType::LargeList(_) => {
+ let arr = array
+ .as_any()
+ .downcast_ref::<LargeListArray>()
+ .ok_or_else(|| ArrowError::SchemaError("Expected
LargeListArray".into()))?;
+ Encoder::LargeList(Box::new(ListEncoder64::try_new(
+ arr,
+ *items_nullability,
+ item_plan.as_ref(),
+ )?))
+ }
+ other => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro array site requires Arrow List/LargeList, found:
{other:?}"
+ )))
+ }
+ },
+ FieldPlan::Scalar => match array.data_type() {
+ DataType::Boolean =>
Encoder::Boolean(BooleanEncoder(array.as_boolean())),
+ DataType::Utf8 => {
+
Encoder::Utf8(Utf8GenericEncoder::<i32>(array.as_string::<i32>()))
+ }
+ DataType::LargeUtf8 => {
+
Encoder::Utf8Large(Utf8GenericEncoder::<i64>(array.as_string::<i64>()))
+ }
+ DataType::Int32 =>
Encoder::Int(IntEncoder(array.as_primitive::<Int32Type>())),
+ DataType::Int64 =>
Encoder::Long(LongEncoder(array.as_primitive::<Int64Type>())),
+ DataType::Float32 => {
+
Encoder::Float32(F32Encoder(array.as_primitive::<Float32Type>()))
+ }
+ DataType::Float64 => {
+
Encoder::Float64(F64Encoder(array.as_primitive::<Float64Type>()))
+ }
+ DataType::Binary =>
Encoder::Binary(BinaryEncoder(array.as_binary::<i32>())),
+ DataType::LargeBinary => {
+
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
+ }
+ DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
+ array.as_primitive::<TimestampMicrosecondType>(),
+ )),
+ other => {
+ return Err(ArrowError::NotYetImplemented(format!(
+ "Avro scalar type not yet supported: {other:?}"
+ )));
+ }
+ },
+ other => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: {other:?} not yet supported".into(),
+ ));
+ }
+ };
+ Ok(Self {
+ encoder,
+ nulls,
+ has_nulls,
+ nullability: None,
+ pre: None,
+ })
+ }
+
+ #[inline]
+ fn has_nulls(&self) -> bool {
+ self.has_nulls
+ }
+
+ #[inline]
+ fn is_null(&self, idx: usize) -> bool {
+ self.nulls
+ .as_ref()
+ .is_some_and(|null_buffer| null_buffer.is_null(idx))
+ }
+
+ #[inline]
+ fn with_effective_nullability(mut self, order: Option<Nullability>) ->
Self {
+ self.nullability = order;
+ self.pre = self.precomputed_union_value_branch(order);
+ self
+ }
+
+ #[inline]
+ fn precomputed_union_value_branch(&self, order: Option<Nullability>) ->
Option<u8> {
+ match (order, self.has_nulls()) {
+ (Some(Nullability::NullFirst), false) => Some(0x02), // value
branch index 1
Review Comment:
This is explained elsewhere, I think we can eliminate the comments. Maybe
alias the values as constants globally.
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -84,48 +85,211 @@ fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool)
-> Result<(), ArrowErr
/// - Null-first (default): null => 0, value => 1
/// - Null-second (Impala): value => 0, null => 1
#[inline]
-fn write_optional_branch<W: Write + ?Sized>(
+fn write_optional_index<W: Write + ?Sized>(
writer: &mut W,
is_null: bool,
- impala_mode: bool,
+ order: Nullability,
) -> Result<(), ArrowError> {
- let branch = if impala_mode == is_null { 1 } else { 0 };
- write_int(writer, branch)
+ // For NullFirst: null => 0x00, value => 0x02
+ // For NullSecond: value => 0x00, null => 0x02
+ let byte = match order {
+ Nullability::NullFirst => {
+ if is_null {
+ 0x00
+ } else {
+ 0x02
+ }
+ }
+ Nullability::NullSecond => {
+ if is_null {
+ 0x02
+ } else {
+ 0x00
+ }
+ }
+ };
+ writer
+ .write_all(&[byte])
+ .map_err(|e| ArrowError::IoError(format!("write union branch: {e}"),
e))
}
-/// Encode a `RecordBatch` in Avro binary format using **default options**.
-pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) ->
Result<(), ArrowError> {
- encode_record_batch_with_options(batch, out, &EncoderOptions::default())
+/// Per‑site encoder plan for a field. This mirrors Avro structure so nested
+/// optional branch order can be honored exactly as declared by the schema.
+#[derive(Debug, Clone)]
+enum FieldPlan {
+ /// Non-nested scalar/logical type
+ Scalar,
+ /// Record/Struct with Avro‑ordered children
+ Struct { encoders: Vec<FieldBinding> },
+ /// Array with item‑site nullability and nested plan
+ List {
+ items_nullability: Option<Nullability>,
+ item_plan: Box<FieldPlan>,
+ },
}
-/// Encode a `RecordBatch` with explicit `EncoderOptions`.
-pub fn encode_record_batch_with_options<W: Write>(
- batch: &RecordBatch,
- out: &mut W,
- opts: &EncoderOptions,
-) -> Result<(), ArrowError> {
- let mut encoders = batch
- .schema()
- .fields()
- .iter()
- .zip(batch.columns())
- .map(|(field, array)| Ok((field.is_nullable(),
make_encoder(array.as_ref())?)))
- .collect::<Result<Vec<_>, ArrowError>>()?;
- (0..batch.num_rows()).try_for_each(|row| {
- encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
- if *is_nullable {
- let is_null = enc.is_null(row);
- write_optional_branch(out, is_null, opts.impala_mode)?;
- if is_null {
- return Ok(());
+/// Unified binding used for both top‑level columns and struct children.
+///
+/// This replaces the previous duplication between `StructChildPlan` and
`ColumnPlan`.
Review Comment:
I would omit this for committed code. The context is good but only as far as
the purposes of this PR IMO
##########
arrow-avro/src/schema.rs:
##########
@@ -909,20 +998,42 @@ fn datatype_to_avro(
if matches!(dt, DataType::LargeList(_)) {
extras.insert("arrowLargeList".into(), Value::Bool(true));
}
- let (items, ie) =
- datatype_to_avro(child.data_type(), child.name(),
child.metadata(), name_gen)?;
+ let (items_inner, ie) = datatype_to_avro_with_order(
+ child.data_type(),
+ child.name(),
+ child.metadata(),
+ name_gen,
+ order,
+ )?;
+ let items_with_meta = merge_extras(items_inner, ie);
+ let items_schema = if child.is_nullable() {
+ wrap_nullable(items_with_meta, order)
+ } else {
+ items_with_meta
+ };
json!({
"type": "array",
- "items": merge_extras(items, ie)
+ "items": items_schema
})
}
DataType::FixedSizeList(child, len) => {
extras.insert("arrowFixedSize".into(), json!(len));
- let (items, ie) =
- datatype_to_avro(child.data_type(), child.name(),
child.metadata(), name_gen)?;
+ let (items_inner, ie) = datatype_to_avro_with_order(
+ child.data_type(),
+ child.name(),
+ child.metadata(),
+ name_gen,
+ order,
+ )?;
+ let items_with_meta = merge_extras(items_inner, ie);
Review Comment:
Perhaps worth it to put this nullable logic in `datatype_to_avro_with_order`
as it's duplicated here and in `Map`?
##########
arrow-avro/src/writer/format.rs:
##########
@@ -85,7 +66,7 @@ impl AvroFormat for AvroOcfFormat {
Some(CompressionCodec::Xz) => "xz",
None => "null",
};
- write_long(writer, 2)?;
+ write_long(writer, 2)?; // two entries
Review Comment:
This note seems unnecessary
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]