scovich commented on code in PR #8349:
URL: https://github.com/apache/arrow-rs/pull/8349#discussion_r2368085959


##########
arrow-avro/src/reader/record.rs:
##########
@@ -708,10 +528,14 @@ impl Decoder {
             Self::Uuid(v) => {
                 v.extend([0; 16]);
             }
-            Self::Array(_, offsets, _e) => {
+            Self::Array(_, offsets, e) => {

Review Comment:
   unused?
   ```suggestion
               Self::Array(_, offsets, _) => {
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1298,70 +926,28 @@ impl Decoder {
     ) -> Result<(), ArrowError> {
         match promotion {
             Promotion::Direct => self.decode(buf),
-            Promotion::IntToLong => match self {
-                Self::Int64(v) => {
-                    v.push(buf.get_int()?.into());
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Int->Long target mismatch".into(),
-                )),
-            },
-            Promotion::IntToFloat => match self {
-                Self::Float32(v) => {
-                    v.push(buf.get_int()? as f32);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Int->Float target mismatch".into(),
-                )),
-            },
-            Promotion::IntToDouble => match self {
-                Self::Float64(v) => {
-                    v.push(buf.get_int()? as f64);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Int->Double target mismatch".into(),
-                )),
-            },
-            Promotion::LongToFloat => match self {
-                Self::Float32(v) => {
-                    v.push(buf.get_long()? as f32);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Long->Float target mismatch".into(),
-                )),
-            },
-            Promotion::LongToDouble => match self {
-                Self::Float64(v) => {
-                    v.push(buf.get_long()? as f64);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Long->Double target mismatch".into(),
-                )),
-            },
-            Promotion::FloatToDouble => match self {
-                Self::Float64(v) => {
-                    v.push(buf.get_float()? as f64);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Float->Double target mismatch".into(),
-                )),
-            },
+            Promotion::IntToLong => promote_numeric!(self, buf, Int64, 
get_int, i64, promotion),
+            Promotion::IntToFloat => promote_numeric!(self, buf, Float32, 
get_int, f32, promotion),
+            Promotion::IntToDouble => promote_numeric!(self, buf, Float64, 
get_int, f64, promotion),
+            Promotion::LongToFloat => {
+                promote_numeric!(self, buf, Float32, get_long, f32, promotion)
+            }
+            Promotion::LongToDouble => {
+                promote_numeric!(self, buf, Float64, get_long, f64, promotion)

Review Comment:
   If you define the macro inside this method, it will have access to `self`, 
`buf`, and `promotion` without needing to pass them as args. Which also fits 
the "define near first use" principle. Or do you expect to need that macro 
elsewhere as well?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);

Review Comment:
   why is i16 (vs e.g. usize or isize) meaninful?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {

Review Comment:
   What is "lut" ?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1193,88 +900,9 @@ impl Decoder {
                 let nanos = (millis as i64) * 1_000_000;
                 builder.append_value(IntervalMonthDayNano::new(months as i32, 
days as i32, nanos));
             }
-            Self::Union(fields, type_ids, offsets, encodings, encoding_counts, 
None) => {
-                let branch = buf.get_long()?;
-                if branch < 0 {
-                    return Err(ArrowError::ParseError(format!(
-                        "Negative union branch index {branch}"
-                    )));
-                }
-                let idx = branch as usize;
-                if idx >= encodings.len() {
-                    return Err(ArrowError::ParseError(format!(
-                        "Union branch index {idx} out of range ({} branches)",
-                        encodings.len()
-                    )));
-                }
-                let type_id = fields
-                    .iter()
-                    .nth(idx)
-                    .map(|(type_id, _)| type_id)
-                    .unwrap_or_else(|| i8::try_from(idx).unwrap_or(0));
-                type_ids.push(type_id);
-                offsets.push(encoding_counts[idx]);
-                encodings[idx].decode(buf)?;
-                encoding_counts[idx] += 1;
-            }
-            Self::Union(
-                _,
-                type_ids,
-                offsets,
-                encodings,
-                encoding_counts,
-                Some(union_resolution),
-            ) => match &mut union_resolution.kind {
-                UnionResolvedKind::Both {
-                    reader_type_codes, ..
-                } => {
-                    let (idx, action) = get_writer_union_action!(buf, 
union_resolution);
-                    match action {
-                        BranchDispatch::NoMatch => {
-                            return Err(ArrowError::ParseError(format!(
-                                "Union branch index {idx} not resolvable by 
reader schema"
-                            )));
-                        }
-                        BranchDispatch::ToReader {
-                            reader_idx,
-                            promotion,
-                        } => {
-                            let type_id = reader_type_codes[reader_idx];
-                            type_ids.push(type_id);
-                            offsets.push(encoding_counts[reader_idx]);
-                            encodings[reader_idx].decode_with_promotion(buf, 
promotion)?;
-                            encoding_counts[reader_idx] += 1;
-                        }
-                    }
-                }
-                UnionResolvedKind::ToSingle { target } => {
-                    let (idx, action) = get_writer_union_action!(buf, 
union_resolution);
-                    match action {
-                        BranchDispatch::NoMatch => {
-                            return Err(ArrowError::ParseError(format!(
-                                "Writer union branch {idx} does not resolve to 
reader type"
-                            )));
-                        }
-                        BranchDispatch::ToReader { promotion, .. } => {
-                            target.decode_with_promotion(buf, promotion)?;
-                        }
-                    }
-                }
-                UnionResolvedKind::FromSingle {
-                    reader_type_codes,
-                    target_reader_index,
-                    promotion,
-                    ..
-                } => {
-                    let type_id = reader_type_codes[*target_reader_index];
-                    type_ids.push(type_id);
-                    offsets.push(encoding_counts[*target_reader_index]);
-                    encodings[*target_reader_index].decode_with_promotion(buf, 
*promotion)?;
-                    encoding_counts[*target_reader_index] += 1;
-                }
-            },
+            Self::Union(u) => u.decode(buf)?,
             Self::Nullable(order, nb, encoding) => {
-                let branch = buf.get_long()?;
+                let branch = buf.read_vlq()?;

Review Comment:
   was the previous call to `get_long` somehow a bug?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);

Review Comment:
   what does -1 mean?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }

Review Comment:
   ```suggestion
           (reader_index >= 0).then(|| {
               (reader_index as usize, self.promotion[writer_idx])
           })
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());

Review Comment:
   What happens if there are more reader type codes than branches? 



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];

Review Comment:
   ```suggestion
           let reader_index = self.to_reader.get(writer_index)?;
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,

Review Comment:
   Can you just `#[derive(Default)]` for `UnionReadPlan` and mark `Passthrough` 
as `#[default]`? '
   Then all his code could go away...



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,

Review Comment:
   I _think_ `plan` is only used here and can be folded in?
   ```suggestion
               plan: Self::plan_from_resolved(resolved)?,
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {
+                (true, true) => {
+                    let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+                    Ok(UnionReadPlan::ReaderUnion { lookup_table })
+                }
+                (false, true) => {
+                    let (reader_idx, promotion) =
+                        info.writer_to_reader.first().and_then(|x| 
*x).ok_or_else(|| {
+                            ArrowError::SchemaError(
+                                "Writer type does not match any reader union 
branch".to_string(),
+                            )
+                        })?;

Review Comment:
   Is this equivalent?
   ```suggestion
                       let Some((&reader_idx, &promotion)) = 
info.writer_to_reader.first() else {
                           return Err(ArrowError::SchemaError(
                               "Writer type does not match any reader union 
branch".to_string(),
                           ));
                       };
   
   ```
   (not shorter, but possibly easier to read)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {

Review Comment:
   nit: Seems like the indentation would be cleaner with:
   ```rust
   let Some(info) = resolved else {
       return Ok(UnionReadPlan::Passthrough);
   };
   
   match (info.writer_is_union, info.reader_is_union) {
     ...
   }
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {
+                (true, true) => {
+                    let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+                    Ok(UnionReadPlan::ReaderUnion { lookup_table })
+                }
+                (false, true) => {
+                    let (reader_idx, promotion) =
+                        info.writer_to_reader.first().and_then(|x| 
*x).ok_or_else(|| {
+                            ArrowError::SchemaError(
+                                "Writer type does not match any reader union 
branch".to_string(),
+                            )
+                        })?;
+                    Ok(UnionReadPlan::FromSingle {
+                        reader_idx,
+                        promotion,
+                    })
+                }
+                (true, false) => Err(ArrowError::InvalidArgumentError(
+                    "UnionDecoder::try_new cannot build writer-union to 
single; use UnionDecoderBuilder with a target"
+                        .to_string(),
+                )),
+                (false, false) => Ok(UnionReadPlan::Passthrough),
+            },
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)
+    }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        if reader_idx >= self.branches.len() {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({} branches)",
+                self.branches.len()
+            )));
+        }
+        self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(&mut self.branches[reader_idx])
+    }
+
+    #[inline]
+    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), 
ArrowError>
+    where
+        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+    {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return action(target);
+        }
+        let reader_idx = match &self.plan {
+            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
+            _ => fallback_idx,
+        };
+        self.emit_to(reader_idx).and_then(action)
+    }
+
+    fn append_null(&mut self) -> Result<(), ArrowError> {
+        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
+    }
+
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        self.on_decoder(self.default_emit_idx, |decoder| 
decoder.append_default(lit))

Review Comment:
   Nice, that's really slick!



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());

Review Comment:
   nit:
   ```suggestion
           let reader_type_codes = fields.iter().map(|(tid, _)| 
tid).collect::<Vec<i8>>();
   ```
   (and then do `reader_type_codes: Arc::from(reader_type_codes),` below)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {
+                (true, true) => {
+                    let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+                    Ok(UnionReadPlan::ReaderUnion { lookup_table })
+                }
+                (false, true) => {
+                    let (reader_idx, promotion) =
+                        info.writer_to_reader.first().and_then(|x| 
*x).ok_or_else(|| {
+                            ArrowError::SchemaError(
+                                "Writer type does not match any reader union 
branch".to_string(),
+                            )
+                        })?;
+                    Ok(UnionReadPlan::FromSingle {
+                        reader_idx,
+                        promotion,
+                    })
+                }
+                (true, false) => Err(ArrowError::InvalidArgumentError(
+                    "UnionDecoder::try_new cannot build writer-union to 
single; use UnionDecoderBuilder with a target"
+                        .to_string(),
+                )),
+                (false, false) => Ok(UnionReadPlan::Passthrough),
+            },
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)
+    }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        if reader_idx >= self.branches.len() {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({} branches)",
+                self.branches.len()
+            )));
+        }
+        self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(&mut self.branches[reader_idx])
+    }
+
+    #[inline]
+    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), 
ArrowError>
+    where
+        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+    {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return action(target);
+        }
+        let reader_idx = match &self.plan {
+            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
+            _ => fallback_idx,
+        };
+        self.emit_to(reader_idx).and_then(action)
+    }
+
+    fn append_null(&mut self) -> Result<(), ArrowError> {
+        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
+    }
+
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        self.on_decoder(self.default_emit_idx, |decoder| 
decoder.append_default(lit))
+    }
+
+    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+        let (reader_idx, promotion) = match &mut self.plan {
+            UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            } => {
+                let idx = Self::read_tag(buf)?;
+                return match lookup_table.resolve(idx) {
+                    Some((_, promotion)) => target.decode_with_promotion(buf, 
promotion),
+                    None => Err(ArrowError::ParseError(format!(
+                        "Writer union branch {idx} does not resolve to reader 
type"
+                    ))),
+                };
+            }
+            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, 
Promotion::Direct),
+            UnionReadPlan::ReaderUnion { lookup_table } => {
+                let idx = Self::read_tag(buf)?;
+                lookup_table.resolve(idx).ok_or_else(|| {
+                    ArrowError::ParseError(format!(
+                        "Union branch index {idx} not resolvable by reader 
schema"
+                    ))
+                })?
+            }
+            UnionReadPlan::FromSingle {
+                reader_idx,
+                promotion,
+            } => (*reader_idx, *promotion),
+            UnionReadPlan::ToSingle { .. } => {
+                return Err(ArrowError::ParseError(
+                    "Invalid union read plan state".to_string(),
+                ));
+            }
+        };
+        let decoder = self.emit_to(reader_idx)?;
+        decoder.decode_with_promotion(buf, promotion)
+    }
+
+    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, 
ArrowError> {
+        match &mut self.plan {
+            UnionReadPlan::ToSingle { target, .. } => target.flush(nulls),
+            _ => {
+                debug_assert!(
+                    nulls.is_none(),
+                    "UnionArray does not accept a validity bitmap; \
+                     nulls should have been materialized as a Null child 
during decode"
+                );

Review Comment:
   What could cause this debug assert to fail? Something the user did? Some 
wrong internal choreography?
   (if the user could cause it, we probably need proper error checking instead 
-- if not here, somewhere earlier)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {
+                (true, true) => {
+                    let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+                    Ok(UnionReadPlan::ReaderUnion { lookup_table })
+                }
+                (false, true) => {
+                    let (reader_idx, promotion) =
+                        info.writer_to_reader.first().and_then(|x| 
*x).ok_or_else(|| {
+                            ArrowError::SchemaError(
+                                "Writer type does not match any reader union 
branch".to_string(),
+                            )
+                        })?;
+                    Ok(UnionReadPlan::FromSingle {
+                        reader_idx,
+                        promotion,
+                    })
+                }
+                (true, false) => Err(ArrowError::InvalidArgumentError(
+                    "UnionDecoder::try_new cannot build writer-union to 
single; use UnionDecoderBuilder with a target"
+                        .to_string(),
+                )),
+                (false, false) => Ok(UnionReadPlan::Passthrough),
+            },
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)
+    }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        if reader_idx >= self.branches.len() {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({} branches)",
+                self.branches.len()
+            )));
+        }
+        self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(&mut self.branches[reader_idx])
+    }
+
+    #[inline]
+    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), 
ArrowError>
+    where
+        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+    {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return action(target);
+        }
+        let reader_idx = match &self.plan {
+            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
+            _ => fallback_idx,
+        };
+        self.emit_to(reader_idx).and_then(action)
+    }
+
+    fn append_null(&mut self) -> Result<(), ArrowError> {
+        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
+    }
+
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        self.on_decoder(self.default_emit_idx, |decoder| 
decoder.append_default(lit))
+    }
+
+    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+        let (reader_idx, promotion) = match &mut self.plan {
+            UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            } => {
+                let idx = Self::read_tag(buf)?;
+                return match lookup_table.resolve(idx) {
+                    Some((_, promotion)) => target.decode_with_promotion(buf, 
promotion),
+                    None => Err(ArrowError::ParseError(format!(
+                        "Writer union branch {idx} does not resolve to reader 
type"
+                    ))),
+                };
+            }
+            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, 
Promotion::Direct),
+            UnionReadPlan::ReaderUnion { lookup_table } => {
+                let idx = Self::read_tag(buf)?;
+                lookup_table.resolve(idx).ok_or_else(|| {
+                    ArrowError::ParseError(format!(
+                        "Union branch index {idx} not resolvable by reader 
schema"
+                    ))
+                })?
+            }
+            UnionReadPlan::FromSingle {
+                reader_idx,
+                promotion,
+            } => (*reader_idx, *promotion),
+            UnionReadPlan::ToSingle { .. } => {
+                return Err(ArrowError::ParseError(
+                    "Invalid union read plan state".to_string(),
+                ));
+            }
+        };
+        let decoder = self.emit_to(reader_idx)?;
+        decoder.decode_with_promotion(buf, promotion)
+    }
+
+    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, 
ArrowError> {
+        match &mut self.plan {
+            UnionReadPlan::ToSingle { target, .. } => target.flush(nulls),
+            _ => {

Review Comment:
   This looks like a good spot for `if let`:
   ```rust
   if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
       return target.flush(nulls);
   }
   
   debug_assert!(...)
     ...
   Ok(Arc::new(arr))
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {

Review Comment:
   Oh... lookup table? That's dredging up lingo from hardware design classes I 
took ages ago... not sure it's a well-known acronym?
   
   Can we just call it `DispatchLookupTable`? I don't see any use sites where 
that would cause inordinate line wrapping?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {
+                (true, true) => {
+                    let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+                    Ok(UnionReadPlan::ReaderUnion { lookup_table })
+                }
+                (false, true) => {
+                    let (reader_idx, promotion) =
+                        info.writer_to_reader.first().and_then(|x| 
*x).ok_or_else(|| {
+                            ArrowError::SchemaError(
+                                "Writer type does not match any reader union 
branch".to_string(),
+                            )
+                        })?;
+                    Ok(UnionReadPlan::FromSingle {
+                        reader_idx,
+                        promotion,
+                    })
+                }
+                (true, false) => Err(ArrowError::InvalidArgumentError(
+                    "UnionDecoder::try_new cannot build writer-union to 
single; use UnionDecoderBuilder with a target"
+                        .to_string(),
+                )),
+                (false, false) => Ok(UnionReadPlan::Passthrough),
+            },
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)
+    }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        if reader_idx >= self.branches.len() {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({} branches)",
+                self.branches.len()
+            )));
+        }
+        self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(&mut self.branches[reader_idx])
+    }
+
+    #[inline]
+    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), 
ArrowError>
+    where
+        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+    {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return action(target);
+        }
+        let reader_idx = match &self.plan {
+            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
+            _ => fallback_idx,
+        };
+        self.emit_to(reader_idx).and_then(action)
+    }
+
+    fn append_null(&mut self) -> Result<(), ArrowError> {
+        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
+    }
+
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        self.on_decoder(self.default_emit_idx, |decoder| 
decoder.append_default(lit))
+    }
+
+    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+        let (reader_idx, promotion) = match &mut self.plan {
+            UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            } => {
+                let idx = Self::read_tag(buf)?;
+                return match lookup_table.resolve(idx) {
+                    Some((_, promotion)) => target.decode_with_promotion(buf, 
promotion),
+                    None => Err(ArrowError::ParseError(format!(
+                        "Writer union branch {idx} does not resolve to reader 
type"
+                    ))),
+                };
+            }
+            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, 
Promotion::Direct),
+            UnionReadPlan::ReaderUnion { lookup_table } => {
+                let idx = Self::read_tag(buf)?;
+                lookup_table.resolve(idx).ok_or_else(|| {
+                    ArrowError::ParseError(format!(
+                        "Union branch index {idx} not resolvable by reader 
schema"
+                    ))
+                })?
+            }
+            UnionReadPlan::FromSingle {
+                reader_idx,
+                promotion,
+            } => (*reader_idx, *promotion),
+            UnionReadPlan::ToSingle { .. } => {
+                return Err(ArrowError::ParseError(
+                    "Invalid union read plan state".to_string(),
+                ));
+            }
+        };
+        let decoder = self.emit_to(reader_idx)?;
+        decoder.decode_with_promotion(buf, promotion)
+    }
+
+    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, 
ArrowError> {
+        match &mut self.plan {
+            UnionReadPlan::ToSingle { target, .. } => target.flush(nulls),
+            _ => {
+                debug_assert!(
+                    nulls.is_none(),
+                    "UnionArray does not accept a validity bitmap; \
+                     nulls should have been materialized as a Null child 
during decode"
+                );
+                let children = self
+                    .branches
+                    .iter_mut()
+                    .map(|d| d.flush(None))
+                    .collect::<Result<Vec<_>, _>>()?;
+                let type_ids_buf: ScalarBuffer<i8> =
+                    flush_values(&mut self.type_ids).into_iter().collect();
+                let offsets_buf: ScalarBuffer<i32> =
+                    flush_values(&mut self.offsets).into_iter().collect();
+                let arr = UnionArray::try_new(
+                    self.fields.clone(),
+                    type_ids_buf,
+                    Some(offsets_buf),
+                    children,
+                )
+                .map_err(|e| ArrowError::ParseError(e.to_string()))?;
+                Ok(Arc::new(arr))
+            }
+        }
+    }
+}
+
+#[derive(Debug, Default)]
+struct UnionDecoderBuilder {
+    fields: Option<UnionFields>,
+    branches: Option<Vec<Decoder>>,
+    resolved: Option<ResolvedUnion>,
+    target: Option<Box<Decoder>>,
+}
+
+impl UnionDecoderBuilder {
+    fn new() -> Self {
+        Self::default()
+    }
+
+    fn with_fields(mut self, fields: UnionFields) -> Self {
+        self.fields = Some(fields);
+        self
+    }
+
+    fn with_branches(mut self, branches: Vec<Decoder>) -> Self {
+        self.branches = Some(branches);
+        self
+    }
+
+    fn with_resolved_union(mut self, resolved_union: ResolvedUnion) -> Self {
+        self.resolved = Some(resolved_union);
+        self
+    }
+
+    fn with_target(mut self, target: Box<Decoder>) -> Self {
+        self.target = Some(target);
+        self
+    }
+
+    fn build(self) -> Result<UnionDecoder, ArrowError> {
+        match (self.resolved, self.fields, self.branches, self.target) {
+            (resolved, Some(fields), Some(branches), _) => {

Review Comment:
   What if there was `Some(target)`? Seems like that should be an error, and we 
should hard-wire `None` here so it falls through to the catch-all below?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());

Review Comment:
   ... but why does this specific one need to be an Arc-slice instead of a Vec 
like all the others?



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1298,70 +926,28 @@ impl Decoder {
     ) -> Result<(), ArrowError> {
         match promotion {
             Promotion::Direct => self.decode(buf),
-            Promotion::IntToLong => match self {
-                Self::Int64(v) => {
-                    v.push(buf.get_int()?.into());
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Int->Long target mismatch".into(),
-                )),
-            },
-            Promotion::IntToFloat => match self {
-                Self::Float32(v) => {
-                    v.push(buf.get_int()? as f32);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Int->Float target mismatch".into(),
-                )),
-            },
-            Promotion::IntToDouble => match self {
-                Self::Float64(v) => {
-                    v.push(buf.get_int()? as f64);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Int->Double target mismatch".into(),
-                )),
-            },
-            Promotion::LongToFloat => match self {
-                Self::Float32(v) => {
-                    v.push(buf.get_long()? as f32);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Long->Float target mismatch".into(),
-                )),
-            },
-            Promotion::LongToDouble => match self {
-                Self::Float64(v) => {
-                    v.push(buf.get_long()? as f64);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Long->Double target mismatch".into(),
-                )),
-            },
-            Promotion::FloatToDouble => match self {
-                Self::Float64(v) => {
-                    v.push(buf.get_float()? as f64);
-                    Ok(())
-                }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion Float->Double target mismatch".into(),
-                )),
-            },
+            Promotion::IntToLong => promote_numeric!(self, buf, Int64, 
get_int, i64, promotion),
+            Promotion::IntToFloat => promote_numeric!(self, buf, Float32, 
get_int, f32, promotion),
+            Promotion::IntToDouble => promote_numeric!(self, buf, Float64, 
get_int, f64, promotion),
+            Promotion::LongToFloat => {
+                promote_numeric!(self, buf, Float32, get_long, f32, promotion)
+            }
+            Promotion::LongToDouble => {
+                promote_numeric!(self, buf, Float64, get_long, f64, promotion)
+            }
+            Promotion::FloatToDouble => {
+                promote_numeric!(self, buf, Float64, get_float, f64, promotion)
+            }
             Promotion::StringToBytes => match self {
                 Self::Binary(offsets, values) | Self::StringToBytes(offsets, 
values) => {
                     let data = buf.get_bytes()?;
                     offsets.push_length(data.len());
                     values.extend_from_slice(data);
                     Ok(())
                 }
-                _ => Err(ArrowError::ParseError(
-                    "Promotion String->Bytes target mismatch".into(),
-                )),
+                _ => Err(ArrowError::ParseError(format!(
+                    "Promotion {promotion} target mismatch",

Review Comment:
   Should the error message also include the offending data type that couldn't 
be promoted?
   (again below)



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {
+                (true, true) => {
+                    let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+                    Ok(UnionReadPlan::ReaderUnion { lookup_table })
+                }
+                (false, true) => {
+                    let (reader_idx, promotion) =
+                        info.writer_to_reader.first().and_then(|x| 
*x).ok_or_else(|| {
+                            ArrowError::SchemaError(
+                                "Writer type does not match any reader union 
branch".to_string(),
+                            )
+                        })?;
+                    Ok(UnionReadPlan::FromSingle {
+                        reader_idx,
+                        promotion,
+                    })
+                }
+                (true, false) => Err(ArrowError::InvalidArgumentError(
+                    "UnionDecoder::try_new cannot build writer-union to 
single; use UnionDecoderBuilder with a target"
+                        .to_string(),
+                )),
+                (false, false) => Ok(UnionReadPlan::Passthrough),
+            },
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)
+    }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        if reader_idx >= self.branches.len() {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({} branches)",
+                self.branches.len()
+            )));
+        }
+        self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(&mut self.branches[reader_idx])
+    }
+
+    #[inline]
+    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), 
ArrowError>
+    where
+        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+    {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return action(target);
+        }
+        let reader_idx = match &self.plan {
+            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
+            _ => fallback_idx,
+        };
+        self.emit_to(reader_idx).and_then(action)
+    }
+
+    fn append_null(&mut self) -> Result<(), ArrowError> {
+        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
+    }
+
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        self.on_decoder(self.default_emit_idx, |decoder| 
decoder.append_default(lit))
+    }
+
+    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+        let (reader_idx, promotion) = match &mut self.plan {
+            UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            } => {
+                let idx = Self::read_tag(buf)?;
+                return match lookup_table.resolve(idx) {
+                    Some((_, promotion)) => target.decode_with_promotion(buf, 
promotion),
+                    None => Err(ArrowError::ParseError(format!(
+                        "Writer union branch {idx} does not resolve to reader 
type"
+                    ))),
+                };
+            }
+            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, 
Promotion::Direct),
+            UnionReadPlan::ReaderUnion { lookup_table } => {
+                let idx = Self::read_tag(buf)?;
+                lookup_table.resolve(idx).ok_or_else(|| {
+                    ArrowError::ParseError(format!(
+                        "Union branch index {idx} not resolvable by reader 
schema"
+                    ))
+                })?
+            }
+            UnionReadPlan::FromSingle {
+                reader_idx,
+                promotion,
+            } => (*reader_idx, *promotion),
+            UnionReadPlan::ToSingle { .. } => {
+                return Err(ArrowError::ParseError(
+                    "Invalid union read plan state".to_string(),
+                ));
+            }
+        };
+        let decoder = self.emit_to(reader_idx)?;
+        decoder.decode_with_promotion(buf, promotion)
+    }
+
+    fn flush(&mut self, nulls: Option<NullBuffer>) -> Result<ArrayRef, 
ArrowError> {
+        match &mut self.plan {
+            UnionReadPlan::ToSingle { target, .. } => target.flush(nulls),
+            _ => {
+                debug_assert!(
+                    nulls.is_none(),
+                    "UnionArray does not accept a validity bitmap; \
+                     nulls should have been materialized as a Null child 
during decode"
+                );
+                let children = self
+                    .branches
+                    .iter_mut()
+                    .map(|d| d.flush(None))
+                    .collect::<Result<Vec<_>, _>>()?;
+                let type_ids_buf: ScalarBuffer<i8> =
+                    flush_values(&mut self.type_ids).into_iter().collect();
+                let offsets_buf: ScalarBuffer<i32> =
+                    flush_values(&mut self.offsets).into_iter().collect();

Review Comment:
   The type annotations should be unnecessary (unless they're deliberate as 
some kind of documentation):
   ```suggestion
                   let type_ids_buf = flush_values(&mut 
self.type_ids).into_iter().collect();
                   let offsets_buf = flush_values(&mut 
self.offsets).into_iter().collect();
   ```



##########
arrow-avro/src/reader/record.rs:
##########
@@ -1518,19 +1104,340 @@ impl Decoder {
                     .map_err(|e| ArrowError::ParseError(e.to_string()))?;
                 Arc::new(vals)
             }
-            Self::Union(fields, type_ids, offsets, encodings, _, None) => {
-                flush_union!(fields, type_ids, offsets, encodings)
-            }
-            Self::Union(fields, type_ids, offsets, encodings, _, 
Some(union_resolution)) => {
-                match &mut union_resolution.kind {
-                    UnionResolvedKind::Both { .. } | 
UnionResolvedKind::FromSingle { .. } => {
-                        flush_union!(fields, type_ids, offsets, encodings)
-                    }
-                    UnionResolvedKind::ToSingle { target } => 
target.flush(nulls)?,
+            Self::Union(u) => u.flush(nulls)?,
+        })
+    }
+}
+
+#[derive(Debug)]
+struct DispatchLut {
+    to_reader: Box<[i16]>,
+    promotion: Box<[Promotion]>,
+}
+
+impl DispatchLut {
+    fn from_writer_to_reader(promotion_map: &[Option<(usize, Promotion)>]) -> 
Self {
+        let mut to_reader = Vec::with_capacity(promotion_map.len());
+        let mut promotion = Vec::with_capacity(promotion_map.len());
+        for map in promotion_map {
+            match *map {
+                Some((idx, promo)) => {
+                    debug_assert!(idx <= i16::MAX as usize);
+                    to_reader.push(idx as i16);
+                    promotion.push(promo);
+                }
+                None => {
+                    to_reader.push(-1);
+                    promotion.push(Promotion::Direct);
                 }
             }
+        }
+        Self {
+            to_reader: to_reader.into_boxed_slice(),
+            promotion: promotion.into_boxed_slice(),
+        }
+    }
+
+    // Resolve a writer branch index to (reader_idx, promotion)
+    #[inline]
+    fn resolve(&self, writer_idx: usize) -> Option<(usize, Promotion)> {
+        if writer_idx >= self.to_reader.len() {
+            return None;
+        }
+        let reader_index = self.to_reader[writer_idx];
+        if reader_index < 0 {
+            None
+        } else {
+            Some((reader_index as usize, self.promotion[writer_idx]))
+        }
+    }
+}
+
+#[derive(Debug)]
+struct UnionDecoder {
+    fields: UnionFields,
+    type_ids: Vec<i8>,
+    offsets: Vec<i32>,
+    branches: Vec<Decoder>,
+    counts: Vec<i32>,
+    type_id_by_reader_idx: Arc<[i8]>,
+    null_branch: Option<usize>,
+    default_emit_idx: usize,
+    null_emit_idx: usize,
+    plan: UnionReadPlan,
+}
+
+impl Default for UnionDecoder {
+    fn default() -> Self {
+        Self {
+            fields: UnionFields::empty(),
+            type_ids: Vec::new(),
+            offsets: Vec::new(),
+            branches: Vec::new(),
+            counts: Vec::new(),
+            type_id_by_reader_idx: Arc::from([]),
+            null_branch: None,
+            default_emit_idx: 0,
+            null_emit_idx: 0,
+            plan: UnionReadPlan::Passthrough,
+        }
+    }
+}
+
+#[derive(Debug)]
+enum UnionReadPlan {
+    ReaderUnion {
+        lookup_table: DispatchLut,
+    },
+    FromSingle {
+        reader_idx: usize,
+        promotion: Promotion,
+    },
+    ToSingle {
+        target: Box<Decoder>,
+        lookup_table: DispatchLut,
+    },
+    Passthrough,
+}
+
+impl UnionDecoder {
+    fn try_new(
+        fields: UnionFields,
+        branches: Vec<Decoder>,
+        resolved: Option<ResolvedUnion>,
+    ) -> Result<Self, ArrowError> {
+        let reader_type_codes: Arc<[i8]> =
+            Arc::from(fields.iter().map(|(tid, _)| tid).collect::<Vec<i8>>());
+        let null_branch = branches.iter().position(|b| matches!(b, 
Decoder::Null(_)));
+        let default_emit_idx = 0;
+        let null_emit_idx = null_branch.unwrap_or(default_emit_idx);
+        let plan = Self::plan_from_resolved(resolved)?;
+        let branch_len = branches.len().max(reader_type_codes.len());
+        Ok(Self {
+            fields,
+            type_ids: Vec::with_capacity(DEFAULT_CAPACITY),
+            offsets: Vec::with_capacity(DEFAULT_CAPACITY),
+            branches,
+            counts: vec![0; branch_len],
+            type_id_by_reader_idx: reader_type_codes,
+            null_branch,
+            default_emit_idx,
+            null_emit_idx,
+            plan,
         })
     }
+
+    fn try_new_from_writer_union(
+        info: ResolvedUnion,
+        target: Box<Decoder>,
+    ) -> Result<Self, ArrowError> {
+        // This constructor is only for writer-union to single-type resolution
+        debug_assert!(info.writer_is_union && !info.reader_is_union);
+        let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+        Ok(Self {
+            plan: UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            },
+            ..Self::default()
+        })
+    }
+
+    fn plan_from_resolved(resolved: Option<ResolvedUnion>) -> 
Result<UnionReadPlan, ArrowError> {
+        match resolved {
+            None => Ok(UnionReadPlan::Passthrough),
+            Some(info) => match (info.writer_is_union, info.reader_is_union) {
+                (true, true) => {
+                    let lookup_table = 
DispatchLut::from_writer_to_reader(&info.writer_to_reader);
+                    Ok(UnionReadPlan::ReaderUnion { lookup_table })
+                }
+                (false, true) => {
+                    let (reader_idx, promotion) =
+                        info.writer_to_reader.first().and_then(|x| 
*x).ok_or_else(|| {
+                            ArrowError::SchemaError(
+                                "Writer type does not match any reader union 
branch".to_string(),
+                            )
+                        })?;
+                    Ok(UnionReadPlan::FromSingle {
+                        reader_idx,
+                        promotion,
+                    })
+                }
+                (true, false) => Err(ArrowError::InvalidArgumentError(
+                    "UnionDecoder::try_new cannot build writer-union to 
single; use UnionDecoderBuilder with a target"
+                        .to_string(),
+                )),
+                (false, false) => Ok(UnionReadPlan::Passthrough),
+            },
+        }
+    }
+
+    #[inline]
+    fn read_tag(buf: &mut AvroCursor<'_>) -> Result<usize, ArrowError> {
+        let tag = buf.get_long()?;
+        if tag < 0 {
+            return Err(ArrowError::ParseError(format!(
+                "Negative union branch index {tag}"
+            )));
+        }
+        Ok(tag as usize)
+    }
+
+    #[inline]
+    fn emit_to(&mut self, reader_idx: usize) -> Result<&mut Decoder, 
ArrowError> {
+        if reader_idx >= self.branches.len() {
+            return Err(ArrowError::ParseError(format!(
+                "Union branch index {reader_idx} out of range ({} branches)",
+                self.branches.len()
+            )));
+        }
+        self.type_ids.push(self.type_id_by_reader_idx[reader_idx]);
+        self.offsets.push(self.counts[reader_idx]);
+        self.counts[reader_idx] += 1;
+        Ok(&mut self.branches[reader_idx])
+    }
+
+    #[inline]
+    fn on_decoder<F>(&mut self, fallback_idx: usize, action: F) -> Result<(), 
ArrowError>
+    where
+        F: FnOnce(&mut Decoder) -> Result<(), ArrowError>,
+    {
+        if let UnionReadPlan::ToSingle { target, .. } = &mut self.plan {
+            return action(target);
+        }
+        let reader_idx = match &self.plan {
+            UnionReadPlan::FromSingle { reader_idx, .. } => *reader_idx,
+            _ => fallback_idx,
+        };
+        self.emit_to(reader_idx).and_then(action)
+    }
+
+    fn append_null(&mut self) -> Result<(), ArrowError> {
+        self.on_decoder(self.null_emit_idx, |decoder| decoder.append_null())
+    }
+
+    fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+        self.on_decoder(self.default_emit_idx, |decoder| 
decoder.append_default(lit))
+    }
+
+    fn decode(&mut self, buf: &mut AvroCursor<'_>) -> Result<(), ArrowError> {
+        let (reader_idx, promotion) = match &mut self.plan {
+            UnionReadPlan::ToSingle {
+                target,
+                lookup_table,
+            } => {
+                let idx = Self::read_tag(buf)?;
+                return match lookup_table.resolve(idx) {
+                    Some((_, promotion)) => target.decode_with_promotion(buf, 
promotion),
+                    None => Err(ArrowError::ParseError(format!(
+                        "Writer union branch {idx} does not resolve to reader 
type"
+                    ))),
+                };
+            }
+            UnionReadPlan::Passthrough => (Self::read_tag(buf)?, 
Promotion::Direct),
+            UnionReadPlan::ReaderUnion { lookup_table } => {
+                let idx = Self::read_tag(buf)?;
+                lookup_table.resolve(idx).ok_or_else(|| {
+                    ArrowError::ParseError(format!(
+                        "Union branch index {idx} not resolvable by reader 
schema"
+                    ))
+                })?
+            }
+            UnionReadPlan::FromSingle {
+                reader_idx,
+                promotion,
+            } => (*reader_idx, *promotion),
+            UnionReadPlan::ToSingle { .. } => {
+                return Err(ArrowError::ParseError(
+                    "Invalid union read plan state".to_string(),
+                ));
+            }

Review Comment:
   I don't think this match arm is reachable? There's no `if` guard on the the 
first match arm at L1325 above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to