scovich commented on code in PR #8349:
URL: https://github.com/apache/arrow-rs/pull/8349#discussion_r2362528166


##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
     default_index: i32,
 }
 
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+    NoMatch,
+    ToReader {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+    dispatch: Option<Arc<[BranchDispatch]>>,
+    kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+    Both {
+        reader_type_codes: Arc<[i8]>,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+    },
+    FromSingle {
+        reader_type_codes: Arc<[i8]>,
+        target_reader_index: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+    fields: Option<UnionFields>,
+    resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+    #[inline]

Review Comment:
   It seems highly unlikely that building union resolvers would be on the 
critical path of any workload that involves any actual data? Is `#[inline]` 
really needed on these methods?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
     default_index: i32,
 }
 
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+    NoMatch,
+    ToReader {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+    dispatch: Option<Arc<[BranchDispatch]>>,
+    kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+    Both {
+        reader_type_codes: Arc<[i8]>,
+    },
+    ToSingle {
+        target: Box<Decoder>,

Review Comment:
   `Decoder` seems to be a concrete type, so why boxed? Is there somehow 
infinite type recursion?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -486,6 +734,70 @@ impl Decoder {
             Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
             Self::Enum(indices, _, _) => indices.push(0),
             Self::Duration(builder) => builder.append_null(),
+            Self::Union(fields, type_ids, offsets, encodings, encoding_counts, 
None) => {
+                let mut chosen = None;
+                for (i, ch) in encodings.iter().enumerate() {
+                    if matches!(ch, Decoder::Null(_)) {
+                        chosen = Some(i);
+                        break;
+                    }
+                }
+                let idx = chosen.unwrap_or(0);
+                let type_id = fields
+                    .iter()
+                    .nth(idx)
+                    .map(|(type_id, _)| type_id)
+                    .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));

Review Comment:
   When could the iter-nth call come up empty? Are unions allowed to be empty, 
and our choice of defaulting to idx=0 could produce an out of bounds access? Or 
is it possible that `encodings.len() != fields.len()`?
   
   If the former, recommend to keep `idx` as an `Option`, so the control flow 
is more direct (= easier to follow). 
   
   If the latter... I don't think I understand what this code is doing (which 
is probably true either way, if I'm being honest)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -425,6 +645,34 @@ impl Decoder {
                     Box::new(val_dec),
                 )
             }
+            (Codec::Union(encodings, fields, mode), _) => {
+                if *mode != UnionMode::Dense {
+                    return Err(ArrowError::NotYetImplemented(
+                        "Sparse Arrow unions are not yet 
supported".to_string(),
+                    ));
+                }
+                let mut decoders = Vec::with_capacity(encodings.len());
+                for c in encodings.iter() {
+                    decoders.push(Self::try_new_internal(c)?);
+                }

Review Comment:
   ```suggestion
                   let decoders = encodings.iter().map(Self::try_new_internal);
   ```
   and then below, pass
   ```rust
   decoders.collect::<Result<Vec<_>>()?,
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -700,6 +1012,65 @@ impl Decoder {
                     "Default for enum must be a symbol".to_string(),
                 )),
             },
+            Self::Union(fields, type_ids, offsets, encodings, encoding_counts, 
None) => {
+                if encodings.is_empty() {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Union default cannot be applied to empty 
union".to_string(),
+                    ));
+                }
+                let type_id = fields
+                    .iter()
+                    .nth(0)
+                    .map(|(type_id, _)| type_id)
+                    .unwrap_or(0_i8);
+                type_ids.push(type_id);
+                offsets.push(encoding_counts[0]);
+                encodings[0].append_default(lit)?;
+                encoding_counts[0] += 1;
+                Ok(())
+            }
+            Self::Union(
+                fields,
+                type_ids,
+                offsets,
+                encodings,
+                encoding_counts,
+                Some(union_resolution),
+            ) => match &mut union_resolution.kind {
+                UnionResolvedKind::Both { .. } => {
+                    if encodings.is_empty() {
+                        return Err(ArrowError::InvalidArgumentError(
+                            "Union default cannot be applied to empty 
union".to_string(),
+                        ));
+                    }
+                    let type_id = fields
+                        .iter()
+                        .nth(0)
+                        .map(|(type_id, _)| type_id)
+                        .unwrap_or(0_i8);
+                    type_ids.push(type_id);
+                    offsets.push(encoding_counts[0]);
+                    encodings[0].append_default(lit)?;
+                    encoding_counts[0] += 1;
+                    Ok(())
+                }
+                UnionResolvedKind::ToSingle { target } => 
target.append_default(lit),
+                UnionResolvedKind::FromSingle {
+                    target_reader_index,
+                    ..
+                } => {
+                    let type_id = fields

Review Comment:
   Seems like a lot of redundancy between `Both` and `FromSingle` cases? 
   I _think_ the former just hardwires `target_reader_index=0`?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
         Ok(())
     }
 
+    fn decode_with_promotion(
+        &mut self,
+        buf: &mut AvroCursor<'_>,
+        promotion: Promotion,
+    ) -> Result<(), ArrowError> {
+        match promotion {
+            Promotion::Direct => self.decode(buf),
+            Promotion::IntToLong => match self {
+                Self::Int64(v) => {
+                    v.push(buf.get_int()? as i64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Long target mismatch".into(),
+                )),
+            },
+            Promotion::IntToFloat => match self {
+                Self::Float32(v) => {
+                    v.push(buf.get_int()? as f32);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Float target mismatch".into(),
+                )),
+            },
+            Promotion::IntToDouble => match self {
+                Self::Float64(v) => {
+                    v.push(buf.get_int()? as f64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Double target mismatch".into(),
+                )),
+            },
+            Promotion::LongToFloat => match self {
+                Self::Float32(v) => {
+                    v.push(buf.get_long()? as f32);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Long->Float target mismatch".into(),
+                )),
+            },
+            Promotion::LongToDouble => match self {
+                Self::Float64(v) => {
+                    v.push(buf.get_long()? as f64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Long->Double target mismatch".into(),
+                )),
+            },
+            Promotion::FloatToDouble => match self {
+                Self::Float64(v) => {
+                    v.push(buf.get_float()? as f64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Float->Double target mismatch".into(),
+                )),
+            },
+            Promotion::StringToBytes => match self {
+                Self::Binary(offsets, values) | Self::StringToBytes(offsets, 
values) => {
+                    let data = buf.get_bytes()?;
+                    offsets.push_length(data.len());
+                    values.extend_from_slice(data);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion String->Bytes target mismatch".into(),
+                )),
+            },
+            Promotion::BytesToString => match self {

Review Comment:
   This one isn't the same as the others... it's technically a narrowing cast 
which could fail if the input bytes are not valid utf-8. At least the int -> 
float conversions are converting casts (= infallible). 
   
   Does the spec require this? And if so, should we validate the data here?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
         Ok(())
     }
 
+    fn decode_with_promotion(
+        &mut self,
+        buf: &mut AvroCursor<'_>,
+        promotion: Promotion,
+    ) -> Result<(), ArrowError> {
+        match promotion {
+            Promotion::Direct => self.decode(buf),
+            Promotion::IntToLong => match self {
+                Self::Int64(v) => {
+                    v.push(buf.get_int()? as i64);

Review Comment:
   Seems better to document the lossless conversions by e.g.
   ```suggestion
                       v.push(i64::from(buf.get_int()?));
   ```
   or even
   ```suggestion
                       v.push(buf.get_int()?.into());
   ```
   ?
   
   (I think most of these promotions are lossless and could benefit from the 
same)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
     default_index: i32,
 }
 
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+    NoMatch,
+    ToReader {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+    dispatch: Option<Arc<[BranchDispatch]>>,
+    kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+    Both {
+        reader_type_codes: Arc<[i8]>,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+    },
+    FromSingle {
+        reader_type_codes: Arc<[i8]>,
+        target_reader_index: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+    fields: Option<UnionFields>,
+    resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+    #[inline]
+    fn new() -> Self {
+        Self {
+            fields: None,
+            resolved: None,
+        }
+    }
+
+    #[inline]
+    fn with_fields(mut self, fields: UnionFields) -> Self {
+        self.fields = Some(fields);
+        self
+    }
+
+    #[inline]
+    fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+        self.resolved = Some(resolved_union.clone());
+        self
+    }
+
+    fn build(self) -> Result<UnionResolution, ArrowError> {
+        let info = self.resolved.ok_or_else(|| {
+            ArrowError::InvalidArgumentError(
+                "UnionResolutionBuilder requires resolved_union to be 
provided".to_string(),
+            )
+        })?;
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let fields = self.fields.ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(
+                        "UnionResolutionBuilder for reader union requires 
fields".to_string(),
+                    )
+                })?;
+                let reader_type_codes: Vec<i8> =
+                    fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();

Review Comment:
   nit
   ```suggestion
                   let reader_type_codes: Vec<i8> = fields.iter().map(|(tid, 
_)| tid).collect();
   ```
   (tho fmt will probably split the monadic chain across several lines because 
"too complex")
   
   (again below)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
     default_index: i32,
 }
 
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+    NoMatch,
+    ToReader {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+    dispatch: Option<Arc<[BranchDispatch]>>,
+    kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+    Both {
+        reader_type_codes: Arc<[i8]>,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+    },
+    FromSingle {
+        reader_type_codes: Arc<[i8]>,
+        target_reader_index: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+    fields: Option<UnionFields>,
+    resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+    #[inline]
+    fn new() -> Self {
+        Self {
+            fields: None,
+            resolved: None,
+        }
+    }
+
+    #[inline]
+    fn with_fields(mut self, fields: UnionFields) -> Self {
+        self.fields = Some(fields);
+        self
+    }
+
+    #[inline]
+    fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+        self.resolved = Some(resolved_union.clone());

Review Comment:
   I believe it's recommended to take an owned arg, if owned is needed. 
Otherwise, a caller who _could_ relinquish ownership of a value cannot do so, 
and we end up needlessly cloning. 
   
   ... but it looks like all callers of this (private) method only have a 
reference, so maybe it's fine to leave as-is?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
     default_index: i32,
 }
 
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+    NoMatch,
+    ToReader {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+    dispatch: Option<Arc<[BranchDispatch]>>,
+    kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+    Both {
+        reader_type_codes: Arc<[i8]>,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+    },
+    FromSingle {
+        reader_type_codes: Arc<[i8]>,
+        target_reader_index: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+    fields: Option<UnionFields>,
+    resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+    #[inline]
+    fn new() -> Self {
+        Self {
+            fields: None,
+            resolved: None,
+        }
+    }
+
+    #[inline]
+    fn with_fields(mut self, fields: UnionFields) -> Self {
+        self.fields = Some(fields);
+        self
+    }
+
+    #[inline]
+    fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+        self.resolved = Some(resolved_union.clone());
+        self
+    }
+
+    fn build(self) -> Result<UnionResolution, ArrowError> {
+        let info = self.resolved.ok_or_else(|| {
+            ArrowError::InvalidArgumentError(
+                "UnionResolutionBuilder requires resolved_union to be 
provided".to_string(),
+            )
+        })?;
+        match (info.writer_is_union, info.reader_is_union) {

Review Comment:
   Lots of redundancy in the various match arms... can we pre-compute the main 
bits to reduce the bug surface?
   ```rust
   let writer_dispatch = info.writer_is_union.then(|| {
       info.writer_to_reader
           .iter()
           .map(...)
           .collect::<Vec<_>>()
   });
   
   let reader_type_codes = if info.reader_is_union {
       let Some(self.fields) = self.fields else {
           return Error(...);
       };
       let reader_type_codes: Vec<i8> = fields.iter().map(...).collect();
       Some(reader_type_codes)
   } else {
       None
   };
   
   match (writer_dispatch, reader_type_codes) {
       (Some(dispatch), Some(reader_type_codes) => Ok(UnionResolution { ... 
Both ... }},
       (Some(dispatch), None) => Ok(UnionResolution { ... FromSingle ...}),
       (None, Some(reader_type_codes) => Ok(UnionResolution { ... ToSingle ... 
}},
       (None, None) => Err(...),
   }
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -83,6 +84,46 @@ macro_rules! append_decimal_default {
     }};
 }
 
+macro_rules! flush_union {
+    ($fields:expr, $type_ids:expr, $offsets:expr, $encodings:expr) => {{
+        let encoding_arrays = $encodings
+            .iter_mut()
+            .map(|d| d.flush(None))
+            .collect::<Result<Vec<_>, _>>()?;
+        let type_ids_buf: ScalarBuffer<i8> = 
flush_values($type_ids).into_iter().collect();
+        let offsets_buf: ScalarBuffer<i32> = 
flush_values($offsets).into_iter().collect();
+        let arr = UnionArray::try_new(
+            $fields.clone(),
+            type_ids_buf,
+            Some(offsets_buf),
+            encoding_arrays,
+        )
+        .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+        Arc::new(arr)
+    }};
+}
+
+macro_rules! get_writer_union_action {
+    ($buf:expr, $union_resolution:expr) => {{
+        let branch = $buf.get_long()?;
+        if branch < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {branch}"
+            )));
+        }
+        let idx = branch as usize;
+        let dispatch = match $union_resolution.dispatch.as_deref() {
+            Some(d) => d,
+            None => {
+                return Err(ArrowError::SchemaError(
+                    "dispatch table missing for writer=union".to_string(),
+                ));
+            }
+        };

Review Comment:
   ```suggestion
           let Some(dispatch) = $union_resolution.dispatch.as_deref() else {
               return Err(ArrowError::SchemaError(
                   "dispatch table missing for writer=union".to_string(),
               ));
           };
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -259,12 +441,50 @@ enum Decoder {
     Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
     Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
     Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
+    Union(
+        UnionFields,
+        Vec<i8>,
+        Vec<i32>,
+        Vec<Decoder>,
+        Vec<i32>,
+        Option<UnionResolution>,
+    ),
     Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
 }
 
 impl Decoder {
     fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
-        // Extract just the Promotion (if any) to simplify pattern matching
+        if let Some(ResolutionInfo::Union(info)) = 
data_type.resolution.as_ref() {
+            if info.writer_is_union && !info.reader_is_union {
+                let mut clone = data_type.clone();
+                clone.resolution = None;
+                let target = Box::new(Self::try_new_internal(&clone)?);
+                let mut union_resolution = UnionResolutionBuilder::new()
+                    .with_resolved_union(info)
+                    .build()?;
+                if let UnionResolvedKind::ToSingle { target: t } = &mut 
union_resolution.kind {
+                    *t = target;
+                }
+                let base = Self::Union(
+                    UnionFields::empty(),
+                    Vec::new(),
+                    Vec::new(),
+                    Vec::new(),
+                    Vec::new(),
+                    Some(union_resolution),
+                );
+                return Ok(match data_type.nullability() {
+                    Some(n) => {
+                        Self::Nullable(n, 
NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(base))
+                    }
+                    None => base,
+                });

Review Comment:
   With a `mut base` could do:
   ```suggestion
                   if let Some(n) = match data_type.nullability() {
                       base = Self::Nullable(n, 
NullBufferBuilder::new(DEFAULT_CAPACITY), Box::new(base));
                   }
                   return Ok(base);
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
     default_index: i32,
 }
 
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+    NoMatch,
+    ToReader {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+    dispatch: Option<Arc<[BranchDispatch]>>,
+    kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+    Both {
+        reader_type_codes: Arc<[i8]>,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+    },
+    FromSingle {
+        reader_type_codes: Arc<[i8]>,
+        target_reader_index: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+    fields: Option<UnionFields>,
+    resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+    #[inline]
+    fn new() -> Self {
+        Self {
+            fields: None,
+            resolved: None,
+        }
+    }
+
+    #[inline]
+    fn with_fields(mut self, fields: UnionFields) -> Self {
+        self.fields = Some(fields);
+        self
+    }
+
+    #[inline]
+    fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+        self.resolved = Some(resolved_union.clone());
+        self
+    }
+
+    fn build(self) -> Result<UnionResolution, ArrowError> {
+        let info = self.resolved.ok_or_else(|| {
+            ArrowError::InvalidArgumentError(
+                "UnionResolutionBuilder requires resolved_union to be 
provided".to_string(),
+            )
+        })?;
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let fields = self.fields.ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(
+                        "UnionResolutionBuilder for reader union requires 
fields".to_string(),
+                    )
+                })?;
+                let reader_type_codes: Vec<i8> =
+                    fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
+                let dispatch: Vec<BranchDispatch> = info
+                    .writer_to_reader
+                    .iter()
+                    .map(|m| match m {
+                        Some((reader_index, promotion)) => 
BranchDispatch::ToReader {
+                            reader_idx: *reader_index,
+                            promotion: *promotion,
+                        },
+                        None => BranchDispatch::NoMatch,
+                    })
+                    .collect();
+                Ok(UnionResolution {
+                    dispatch: Some(Arc::from(dispatch)),
+                    kind: UnionResolvedKind::Both {
+                        reader_type_codes: Arc::from(reader_type_codes),
+                    },
+                })
+            }
+            (false, true) => {
+                let fields = self.fields.ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(
+                        "UnionResolutionBuilder for reader union requires 
fields".to_string(),
+                    )
+                })?;
+                let reader_type_codes: Vec<i8> =
+                    fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
+                let (target_reader_index, promotion) =
+                    match info.writer_to_reader.first().and_then(|x| *x) {
+                        Some(pair) => pair,
+                        None => {
+                            return Err(ArrowError::SchemaError(
+                                "Writer schema does not match any reader union 
branch".to_string(),

Review Comment:
   "any" ? The code only looks at "first" ?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -214,6 +254,148 @@ struct EnumResolution {
     default_index: i32,
 }
 
+#[derive(Debug, Clone, Copy)]
+enum BranchDispatch {
+    NoMatch,
+    ToReader {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug)]
+struct UnionResolution {
+    dispatch: Option<Arc<[BranchDispatch]>>,
+    kind: UnionResolvedKind,
+}
+
+#[derive(Debug)]
+enum UnionResolvedKind {
+    Both {
+        reader_type_codes: Arc<[i8]>,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+    },
+    FromSingle {
+        reader_type_codes: Arc<[i8]>,
+        target_reader_index: usize,
+        promotion: Promotion,
+    },
+}
+
+#[derive(Debug, Default)]
+struct UnionResolutionBuilder {
+    fields: Option<UnionFields>,
+    resolved: Option<ResolvedUnion>,
+}
+
+impl UnionResolutionBuilder {
+    #[inline]
+    fn new() -> Self {
+        Self {
+            fields: None,
+            resolved: None,
+        }
+    }
+
+    #[inline]
+    fn with_fields(mut self, fields: UnionFields) -> Self {
+        self.fields = Some(fields);
+        self
+    }
+
+    #[inline]
+    fn with_resolved_union(mut self, resolved_union: &ResolvedUnion) -> Self {
+        self.resolved = Some(resolved_union.clone());
+        self
+    }
+
+    fn build(self) -> Result<UnionResolution, ArrowError> {
+        let info = self.resolved.ok_or_else(|| {
+            ArrowError::InvalidArgumentError(
+                "UnionResolutionBuilder requires resolved_union to be 
provided".to_string(),
+            )
+        })?;
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let fields = self.fields.ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(
+                        "UnionResolutionBuilder for reader union requires 
fields".to_string(),
+                    )
+                })?;
+                let reader_type_codes: Vec<i8> =
+                    fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
+                let dispatch: Vec<BranchDispatch> = info
+                    .writer_to_reader
+                    .iter()
+                    .map(|m| match m {
+                        Some((reader_index, promotion)) => 
BranchDispatch::ToReader {
+                            reader_idx: *reader_index,
+                            promotion: *promotion,
+                        },
+                        None => BranchDispatch::NoMatch,
+                    })
+                    .collect();
+                Ok(UnionResolution {
+                    dispatch: Some(Arc::from(dispatch)),
+                    kind: UnionResolvedKind::Both {
+                        reader_type_codes: Arc::from(reader_type_codes),
+                    },
+                })
+            }
+            (false, true) => {
+                let fields = self.fields.ok_or_else(|| {
+                    ArrowError::InvalidArgumentError(
+                        "UnionResolutionBuilder for reader union requires 
fields".to_string(),
+                    )
+                })?;
+                let reader_type_codes: Vec<i8> =
+                    fields.iter().map(|(tid, _)| tid).collect::<Vec<_>>();
+                let (target_reader_index, promotion) =
+                    match info.writer_to_reader.first().and_then(|x| *x) {
+                        Some(pair) => pair,
+                        None => {
+                            return Err(ArrowError::SchemaError(

Review Comment:
   Would this be simpler?
   ```suggestion
                   let Some(Some((target_reader_index, promotion))) = 
info.writer_to_reader.first() else {
                       return Err(ArrowError::SchemaError(
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -486,6 +734,70 @@ impl Decoder {
             Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
             Self::Enum(indices, _, _) => indices.push(0),
             Self::Duration(builder) => builder.append_null(),
+            Self::Union(fields, type_ids, offsets, encodings, encoding_counts, 
None) => {
+                let mut chosen = None;
+                for (i, ch) in encodings.iter().enumerate() {
+                    if matches!(ch, Decoder::Null(_)) {
+                        chosen = Some(i);
+                        break;
+                    }
+                }
+                let idx = chosen.unwrap_or(0);

Review Comment:
   ```suggestion
                   let idx = encodings
                       .iter()
                       .position(|ch| matches!(ch, Decoder::Null(_)))
                       .unwrap_or(0);
   ```
   (but why is defaulting to 0 correct?)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -834,8 +1205,88 @@ impl Decoder {
                 let nanos = (millis as i64) * 1_000_000;
                 builder.append_value(IntervalMonthDayNano::new(months as i32, 
days as i32, nanos));
             }
+            Self::Union(fields, type_ids, offsets, encodings, encoding_counts, 
None) => {
+                let branch = buf.get_long()?;
+                if branch < 0 {
+                    return Err(ArrowError::ParseError(format!(
+                        "Negative union branch index {branch}"
+                    )));
+                }
+                let idx = branch as usize;
+                if idx >= encodings.len() {
+                    return Err(ArrowError::ParseError(format!(
+                        "Union branch index {idx} out of range ({} branches)",
+                        encodings.len()
+                    )));
+                }
+                let type_id = fields
+                    .iter()
+                    .nth(idx)
+                    .map(|(type_id, _)| type_id)
+                    .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));
+                type_ids.push(type_id);
+                offsets.push(encoding_counts[idx]);
+                encodings[idx].decode(buf)?;
+                encoding_counts[idx] += 1;

Review Comment:
   More redundancy with the previous block of code?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -486,6 +734,70 @@ impl Decoder {
             Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
             Self::Enum(indices, _, _) => indices.push(0),
             Self::Duration(builder) => builder.append_null(),
+            Self::Union(fields, type_ids, offsets, encodings, encoding_counts, 
None) => {
+                let mut chosen = None;
+                for (i, ch) in encodings.iter().enumerate() {
+                    if matches!(ch, Decoder::Null(_)) {
+                        chosen = Some(i);
+                        break;
+                    }
+                }
+                let idx = chosen.unwrap_or(0);
+                let type_id = fields
+                    .iter()
+                    .nth(idx)
+                    .map(|(type_id, _)| type_id)
+                    .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));
+                type_ids.push(type_id);
+                offsets.push(encoding_counts[idx]);
+                encodings[idx].append_null();
+                encoding_counts[idx] += 1;
+            }
+            Self::Union(
+                fields,
+                type_ids,
+                offsets,
+                encodings,
+                encoding_counts,
+                Some(union_resolution),
+            ) => match &mut union_resolution.kind {
+                UnionResolvedKind::Both { .. } => {
+                    let mut chosen = None;
+                    for (i, ch) in encodings.iter().enumerate() {
+                        if matches!(ch, Decoder::Null(_)) {
+                            chosen = Some(i);
+                            break;
+                        }
+                    }
+                    let idx = chosen.unwrap_or(0);
+                    let type_id = fields
+                        .iter()
+                        .nth(idx)
+                        .map(|(type_id, _)| type_id)
+                        .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));

Review Comment:
   I don't understand why it's safe to interpret a field index as a type id? 
   AFAIK type ids are not required to be contiguous, nor ordered?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
         Ok(())
     }
 
+    fn decode_with_promotion(
+        &mut self,
+        buf: &mut AvroCursor<'_>,
+        promotion: Promotion,
+    ) -> Result<(), ArrowError> {
+        match promotion {
+            Promotion::Direct => self.decode(buf),
+            Promotion::IntToLong => match self {
+                Self::Int64(v) => {
+                    v.push(buf.get_int()? as i64);

Review Comment:
   Actually, a macro might really help here
   ```rust
   Promotion::IntToLong => handle_promotion!(Int64, get_int),
   Promotion::IntToFloat => handle_promotion!(Float32, get_int),
   ```
   etc
   (might need to either `impl Display for Promotion`, or pass the `"Int -> 
Long"` type strings for error messages)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -486,6 +734,70 @@ impl Decoder {
             Self::Decimal256(_, _, _, builder) => 
builder.append_value(i256::ZERO),
             Self::Enum(indices, _, _) => indices.push(0),
             Self::Duration(builder) => builder.append_null(),
+            Self::Union(fields, type_ids, offsets, encodings, encoding_counts, 
None) => {
+                let mut chosen = None;
+                for (i, ch) in encodings.iter().enumerate() {
+                    if matches!(ch, Decoder::Null(_)) {
+                        chosen = Some(i);
+                        break;
+                    }
+                }
+                let idx = chosen.unwrap_or(0);
+                let type_id = fields
+                    .iter()
+                    .nth(idx)
+                    .map(|(type_id, _)| type_id)
+                    .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));

Review Comment:
   Actually, maybe I do understand!
   * If NULL is one of the union branches, emit that directly
   * Otherwise, default to the first branch (whatever that is), and emit a NULL 
value for it
   
   Is that correct?
   
   (but if so, then I don't understand why `idx` could ever be out of bounds?)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
         Ok(())
     }
 
+    fn decode_with_promotion(
+        &mut self,
+        buf: &mut AvroCursor<'_>,
+        promotion: Promotion,
+    ) -> Result<(), ArrowError> {
+        match promotion {
+            Promotion::Direct => self.decode(buf),
+            Promotion::IntToLong => match self {
+                Self::Int64(v) => {
+                    v.push(buf.get_int()? as i64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Long target mismatch".into(),
+                )),
+            },
+            Promotion::IntToFloat => match self {
+                Self::Float32(v) => {
+                    v.push(buf.get_int()? as f32);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Float target mismatch".into(),
+                )),
+            },
+            Promotion::IntToDouble => match self {
+                Self::Float64(v) => {
+                    v.push(buf.get_int()? as f64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Double target mismatch".into(),
+                )),
+            },
+            Promotion::LongToFloat => match self {
+                Self::Float32(v) => {
+                    v.push(buf.get_long()? as f32);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Long->Float target mismatch".into(),
+                )),
+            },
+            Promotion::LongToDouble => match self {

Review Comment:
   Technically these are narrowing/converting casts because f32 and f64 
respectively have only 24 and 53 bits of precision (same problem casting i32 to 
f32).
   
   Does the spec require treating this as a "promotion" even tho it's lossy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to