scovich commented on code in PR #8349:
URL: https://github.com/apache/arrow-rs/pull/8349#discussion_r2370425729


##########
arrow-avro/src/reader/record.rs:
##########
@@ -425,12 +443,34 @@ impl Decoder {
                     Box::new(val_dec),
                 )
             }
-            (Codec::Uuid, _) => 
Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),
-            (&Codec::Union(_, _, _), _) => {
+            (Codec::Union(encodings, fields, mode), _) if *mode == 
UnionMode::Dense => {
+                let decoders = encodings
+                    .iter()
+                    .map(Self::try_new_internal)
+                    .collect::<Result<Vec<_>, _>>()?;
+                if fields.len() != decoders.len() {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Union has {} fields but {} decoders",
+                        fields.len(),
+                        decoders.len()
+                    )));
+                }
+                let mut builder = UnionDecoderBuilder::new()
+                    .with_fields(fields.clone())
+                    .with_branches(decoders);
+                if let Some(ResolutionInfo::Union(info)) = 
data_type.resolution.as_ref() {
+                    if info.reader_is_union {
+                        builder = builder.with_resolved_union(info.clone());
+                    }
+                }
+                Self::Union(builder.build()?)
+            }
+            (Codec::Union(_, _, _), _) => {
                 return Err(ArrowError::NotYetImplemented(
-                    "Union type decoding is not yet supported".to_string(),
-                ))
+                    "Sparse Arrow unions are not yet supported".to_string(),
+                ));
             }
+            (Codec::Uuid, _) => 
Self::Uuid(Vec::with_capacity(DEFAULT_CAPACITY)),

Review Comment:
   Uuid didn't change? Can/should we leave it in its original position to 
reduce diff churn?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -468,10 +508,14 @@ impl Decoder {
             Self::Uuid(v) => {
                 v.extend([0; 16]);
             }
-            Self::Array(_, offsets, e) => {
+            Self::Array(_, offsets, _) => {
                 offsets.push_length(0);
             }
-            Self::Record(_, e, _) => e.iter_mut().for_each(|e| 
e.append_null()),
+            Self::Record(_, e, _) => {
+                for encoding in e.iter_mut() {
+                    encoding.append_null();
+                }
+            }

Review Comment:
   Why changed, out of curiosity? Doesn't seem related to this PR? 
   Did clippy flag it for some reason?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -259,12 +258,31 @@ enum Decoder {
     Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
     Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
     Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
+    Union(UnionDecoder),
     Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
 }
 
 impl Decoder {
     fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
         // Extract just the Promotion (if any) to simplify pattern matching

Review Comment:
   This comment might be a bit stale?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -991,10 +1096,327 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
         })
     }
 }
 
+#[derive(Debug)]
+struct DispatchLookupTable {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,

Review Comment:
   Out of curiosity, why boxed slice instead of vec? 
   (they're technically equivalent in read-only context, but I don't see boxed 
slices very often)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -458,12 +443,7 @@ impl Decoder {
                     Box::new(val_dec),
                 )
             }
-            (Codec::Union(encodings, fields, mode), _) => {
-                if *mode != UnionMode::Dense {
-                    return Err(ArrowError::NotYetImplemented(
-                        "Sparse Arrow unions are not yet 
supported".to_string(),
-                    ));
-                }
+            (Codec::Union(encodings, fields, mode), _) if *mode == 
UnionMode::Dense => {

Review Comment:
   ```suggestion
               (Codec::Union(encodings, fields, UnionMode::Dense), _) => {
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);

Review Comment:
   aside: picking on the docs a bit --
   
   > the implementation expects them to be positive
   
   Technically, zero is not mathematically positive... it's the only value that 
is both non-negative and non-positive.
   
   Perhaps rust officially has some other definition, but Google search AI 
overview doesn't seem to think so:
   <img width="812" height="794" alt="image" 
src="https://github.com/user-attachments/assets/5a3a3766-14c7-4646-9687-2d9451525a7c";
 />
   



##########
arrow-avro/src/reader/record.rs:
##########
@@ -259,12 +258,31 @@ enum Decoder {
     Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
     Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
     Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
+    Union(UnionDecoder),
     Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
 }
 
 impl Decoder {
     fn try_new(data_type: &AvroDataType) -> Result<Self, ArrowError> {
         // Extract just the Promotion (if any) to simplify pattern matching

Review Comment:
   Or maybe just needs to move down to `try_new_internal` that it seems to have 
been split apart from?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -852,6 +1303,94 @@ impl Decoder {
         Ok(())
     }
 
+    fn decode_with_promotion(
+        &mut self,
+        buf: &mut AvroCursor<'_>,
+        promotion: Promotion,
+    ) -> Result<(), ArrowError> {
+        match promotion {
+            Promotion::Direct => self.decode(buf),
+            Promotion::IntToLong => match self {
+                Self::Int64(v) => {
+                    v.push(buf.get_int()? as i64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Long target mismatch".into(),
+                )),
+            },
+            Promotion::IntToFloat => match self {
+                Self::Float32(v) => {
+                    v.push(buf.get_int()? as f32);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Float target mismatch".into(),
+                )),
+            },
+            Promotion::IntToDouble => match self {
+                Self::Float64(v) => {
+                    v.push(buf.get_int()? as f64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Int->Double target mismatch".into(),
+                )),
+            },
+            Promotion::LongToFloat => match self {
+                Self::Float32(v) => {
+                    v.push(buf.get_long()? as f32);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Long->Float target mismatch".into(),
+                )),
+            },
+            Promotion::LongToDouble => match self {
+                Self::Float64(v) => {
+                    v.push(buf.get_long()? as f64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Long->Double target mismatch".into(),
+                )),
+            },
+            Promotion::FloatToDouble => match self {
+                Self::Float64(v) => {
+                    v.push(buf.get_float()? as f64);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion Float->Double target mismatch".into(),
+                )),
+            },
+            Promotion::StringToBytes => match self {
+                Self::Binary(offsets, values) | Self::StringToBytes(offsets, 
values) => {
+                    let data = buf.get_bytes()?;
+                    offsets.push_length(data.len());
+                    values.extend_from_slice(data);
+                    Ok(())
+                }
+                _ => Err(ArrowError::ParseError(
+                    "Promotion String->Bytes target mismatch".into(),
+                )),
+            },
+            Promotion::BytesToString => match self {

Review Comment:
   No check that the input bytes are valid utf-8?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -991,10 +1096,327 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
         })
     }
 }
 
+#[derive(Debug)]
+struct DispatchLookupTable {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLookupTable {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);

Review Comment:
   Isn't this technically
   ```suggestion
                       debug_assert!(idx <= i8::MAX as usize);
   ```
   (even if we decide to store i16, only `0..=i8::MAX` value range is valid if 
I understood correctly)?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -991,10 +1096,327 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
         })
     }
 }
 
+#[derive(Debug)]
+struct DispatchLookupTable {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLookupTable {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
+                }
+            }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+        let reader_index = *self.to_reader.get(writer_index)?;
+        (reader_index >= 0).then(|| (reader_index as usize, 
self.promotion[writer_index]))
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Vec<i8>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Vec::new(),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLookupTable,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLookupTable,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes = fields.iter().map(|(tid, _)| 
tid).collect::<Vec<i8>>();
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,

Review Comment:
   Any reason these names need to differ? Can we just choose one and use it 
everywhere?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1421,6 +1849,24 @@ impl Skipper {
                 }
                 Ok(())
             }
+            Self::Union(encodings) => {
+                // Union tag must be ZigZag-decoded
+                let branch = buf.get_long()?;
+                if branch < 0 {
+                    return Err(ArrowError::ParseError(format!(
+                        "Negative union branch index {branch}"
+                    )));
+                }
+                let idx = branch as usize;
+                if let Some(encoding) = encodings.get_mut(idx) {
+                    encoding.skip(buf)
+                } else {
+                    Err(ArrowError::ParseError(format!(
+                        "Union branch index {idx} out of range for skipper ({} 
branches)",
+                        encodings.len()
+                    )))
+                }

Review Comment:
   nit: Why not just call it `index` or `idx` from the start, and then:
   ```suggestion
                   let Some(encoding) = encodings.get_mut(idx as usize) else {
                       return Err(ArrowError::ParseError(format!(
                           "Union branch index {idx} out of range for skipper 
({} branches)",
                           encodings.len()
                       )));
                   };
                   encoding.skip(buf)
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -991,10 +1096,327 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
         })
     }
 }
 
+#[derive(Debug)]
+struct DispatchLookupTable {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLookupTable {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
+                }
+            }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+        let reader_index = *self.to_reader.get(writer_index)?;
+        (reader_index >= 0).then(|| (reader_index as usize, 
self.promotion[writer_index]))
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Vec<i8>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Vec::new(),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLookupTable,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLookupTable,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes = fields.iter().map(|(tid, _)| 
tid).collect::<Vec<i8>>();
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan: Self::plan_from_resolved(resolved)?,
+        })
+    }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        let Some(info) = resolved else {
+            return Ok(UnionReadPlan::Passthrough);
+        };
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let lookup_table =
+                    
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+                Ok(UnionReadPlan::ReaderUnion { lookup_table })
+            }
+            (false, true) => {
+                let Some(&(reader_idx, promotion)) =
+                    info.writer_to_reader.first().and_then(Option::as_ref)
+                else {
+                    return Err(ArrowError::SchemaError(
+                        "Writer type does not match any reader union 
branch".to_string(),
+                    ));
+                };
+                Ok(UnionReadPlan::FromSingle {
+                    reader_idx,
+                    promotion,
+                })
+            }
+            (true, false) => Err(ArrowError::InvalidArgumentError(
+                "UnionDecoder::try_new cannot build writer-union to single; 
use UnionDecoderBuilder with a target"
+                    .to_string(),
+            )),
+            (false, false) => Ok(UnionReadPlan::Passthrough),
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)
+    }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        if reader_idx >= self.branches.len() {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({} branches)",
+                self.branches.len()
+            )));
+        }
+        self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(&mut self.branches[reader_idx])
+    }
+
+    #[inline]
+    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), 
ArrowError>
+    where
+        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+    {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return action(target);
+        }
+        let reader_idx = match &self.plan {
+            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
+            _ => fallback_idx,
+        };
+        self.emit_to(reader_idx).and_then(action)
+    }
+
+    fn append_null(&mut self) -> Result<(), ArrowError> {
+        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
+    }
+
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        self.on_decoder(self.default_emit_idx, |decoder| 
decoder.append_default(lit))
+    }
+
+    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+        let (reader_idx, promotion) = match &mut self.plan {
+            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, 
Promotion::Direct),
+            UnionReadPlan::ReaderUnion { lookup_table } => {
+                let idx = Self::read_tag(buf)?;
+                lookup_table.resolve(idx).ok_or_else(|| {
+                    ArrowError::ParseError(format!(
+                        "Union branch index {idx} not resolvable by reader 
schema"
+                    ))
+                })?
+            }
+            UnionReadPlan::FromSingle {
+                reader_idx,
+                promotion,
+            } => (*reader_idx, *promotion),
+            UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            } => {
+                let idx = Self::read_tag(buf)?;
+                return match lookup_table.resolve(idx) {
+                    Some((_, promotion)) => target.decode_with_promotion(buf, 
promotion),
+                    None => Err(ArrowError::ParseError(format!(
+                        "Writer union branch {idx} does not resolve to reader 
type"
+                    ))),
+                };
+            }
+        };
+        let decoder = self.emit_to(reader_idx)?;
+        decoder.decode_with_promotion(buf, promotion)
+    }
+
+    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, 
ArrowError> {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return target.flush(nulls);
+        }
+        debug_assert!(
+            nulls.is_none(),
+            "UnionArray does not accept a validity bitmap; \
+                     nulls should have been materialized as a Null child 
during decode"
+        );
+        let children = self
+            .branches
+            .iter_mut()
+            .map(|d| d.flush(None))
+            .collect::<Result<Vec<_>, _>>()?;
+        let type_ids_buf = flush_values(&mut 
self.type_ids).into_iter().collect();
+        let offsets_buf = flush_values(&mut 
self.offsets).into_iter().collect();

Review Comment:
   nit: Do these stay one-liners if folded in to their respective single use 
sites (below)? 
   Or does that hurt readability?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -991,10 +1096,327 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
         })
     }
 }
 
+#[derive(Debug)]
+struct DispatchLookupTable {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLookupTable {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
+                }
+            }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+        let reader_index = *self.to_reader.get(writer_index)?;
+        (reader_index >= 0).then(|| (reader_index as usize, 
self.promotion[writer_index]))
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Vec<i8>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Vec::new(),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLookupTable,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLookupTable,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes = fields.iter().map(|(tid, _)| 
tid).collect::<Vec<i8>>();
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan: Self::plan_from_resolved(resolved)?,
+        })
+    }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        let Some(info) = resolved else {
+            return Ok(UnionReadPlan::Passthrough);
+        };
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let lookup_table =
+                    
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+                Ok(UnionReadPlan::ReaderUnion { lookup_table })
+            }
+            (false, true) => {
+                let Some(&(reader_idx, promotion)) =
+                    info.writer_to_reader.first().and_then(Option::as_ref)
+                else {
+                    return Err(ArrowError::SchemaError(
+                        "Writer type does not match any reader union 
branch".to_string(),
+                    ));
+                };
+                Ok(UnionReadPlan::FromSingle {
+                    reader_idx,
+                    promotion,
+                })
+            }
+            (true, false) => Err(ArrowError::InvalidArgumentError(
+                "UnionDecoder::try_new cannot build writer-union to single; 
use UnionDecoderBuilder with a target"
+                    .to_string(),
+            )),
+            (false, false) => Ok(UnionReadPlan::Passthrough),

Review Comment:
   Is there a way to systematically eliminate this false/false case? 
   Does any legitimate (transitive) call site actually need it? 
   Or can we replace the pair of booleans with a three-variant enum?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -991,10 +1096,327 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
+            Self::Union(u) => u.flush(nulls)?,
         })
     }
 }
 
+#[derive(Debug)]
+struct DispatchLookupTable {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLookupTable {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
+                }
+            }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_index: usize) -> Option<(usize, Promotion)> {
+        let reader_index = *self.to_reader.get(writer_index)?;
+        (reader_index >= 0).then(|| (reader_index as usize, 
self.promotion[writer_index]))
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Vec<i8>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Vec::new(),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLookupTable,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLookupTable,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes = fields.iter().map(|(tid, _)| 
tid).collect::<Vec<i8>>();
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan: Self::plan_from_resolved(resolved)?,
+        })
+    }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        let Some(info) = resolved else {
+            return Ok(UnionReadPlan::Passthrough);
+        };
+        match (info.writer_is_union, info.reader_is_union) {
+            (true, true) => {
+                let lookup_table =
+                    
DispatchLookupTable::from_writer_to_reader(&info.writer_to_reader);
+                Ok(UnionReadPlan::ReaderUnion { lookup_table })
+            }
+            (false, true) => {
+                let Some(&(reader_idx, promotion)) =
+                    info.writer_to_reader.first().and_then(Option::as_ref)
+                else {
+                    return Err(ArrowError::SchemaError(
+                        "Writer type does not match any reader union 
branch".to_string(),
+                    ));
+                };
+                Ok(UnionReadPlan::FromSingle {
+                    reader_idx,
+                    promotion,
+                })
+            }
+            (true, false) => Err(ArrowError::InvalidArgumentError(
+                "UnionDecoder::try_new cannot build writer-union to single; 
use UnionDecoderBuilder with a target"
+                    .to_string(),
+            )),
+            (false, false) => Ok(UnionReadPlan::Passthrough),
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)
+    }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        if reader_idx >= self.branches.len() {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({} branches)",
+                self.branches.len()
+            )));
+        }
+        self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(&mut self.branches[reader_idx])

Review Comment:
   nit:
   ```suggestion
           let Some(reader_branch) = self.branches.get_mut(reader_idx) else {
               return Err(ArrowError::ParseError(format!(
                   "Union branch index {reader_idx} out of range ({} branches)",
                   self.branches.len()
               )));
           };
           self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
           self.offsets.push(self.counts[reader_idx]);
           self.counts[reader_idx] += 1;
           Ok(reader_branch)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to