klion26 commented on code in PR #8299:
URL: https://github.com/apache/arrow-rs/pull/8299#discussion_r2347785006


##########
parquet-variant-compute/src/arrow_to_variant.rs:
##########
@@ -0,0 +1,1995 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use crate::type_conversion::{decimal_to_variant_decimal, CastOptions};
+use arrow::array::{
+    Array, AsArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow::compute::kernels::cast;
+use arrow::datatypes::{
+    ArrowNativeType, ArrowPrimitiveType, ArrowTemporalType, 
ArrowTimestampType, Date32Type,
+    Date64Type, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, 
Int64Type, Int8Type,
+    RunEndIndexType, Time32MillisecondType, Time32SecondType, 
Time64MicrosecondType,
+    Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, 
UInt64Type, UInt8Type,
+};
+use arrow::temporal_conversions::{as_date, as_datetime, as_time};
+use arrow_schema::{ArrowError, DataType, TimeUnit};
+use chrono::{DateTime, TimeZone, Utc};
+use parquet_variant::{
+    ObjectFieldBuilder, Variant, VariantBuilderExt, VariantDecimal16, 
VariantDecimal4,
+    VariantDecimal8,
+};
+
+// ============================================================================
+// Row-oriented builders for efficient Arrow-to-Variant conversion
+// ============================================================================
+
+/// Row builder for converting Arrow arrays to VariantArray row by row
+pub(crate) enum ArrowToVariantRowBuilder<'a> {
+    Null(NullArrowToVariantBuilder),
+    Boolean(BooleanArrowToVariantBuilder<'a>),
+    PrimitiveInt8(PrimitiveArrowToVariantBuilder<'a, Int8Type>),
+    PrimitiveInt16(PrimitiveArrowToVariantBuilder<'a, Int16Type>),
+    PrimitiveInt32(PrimitiveArrowToVariantBuilder<'a, Int32Type>),
+    PrimitiveInt64(PrimitiveArrowToVariantBuilder<'a, Int64Type>),
+    PrimitiveUInt8(PrimitiveArrowToVariantBuilder<'a, UInt8Type>),
+    PrimitiveUInt16(PrimitiveArrowToVariantBuilder<'a, UInt16Type>),
+    PrimitiveUInt32(PrimitiveArrowToVariantBuilder<'a, UInt32Type>),
+    PrimitiveUInt64(PrimitiveArrowToVariantBuilder<'a, UInt64Type>),
+    PrimitiveFloat16(PrimitiveArrowToVariantBuilder<'a, Float16Type>),
+    PrimitiveFloat32(PrimitiveArrowToVariantBuilder<'a, Float32Type>),
+    PrimitiveFloat64(PrimitiveArrowToVariantBuilder<'a, Float64Type>),
+    Decimal32(Decimal32ArrowToVariantBuilder<'a>),
+    Decimal64(Decimal64ArrowToVariantBuilder<'a>),
+    Decimal128(Decimal128ArrowToVariantBuilder<'a>),
+    Decimal256(Decimal256ArrowToVariantBuilder<'a>),
+    TimestampSecond(TimestampArrowToVariantBuilder<'a, TimestampSecondType>),
+    TimestampMillisecond(TimestampArrowToVariantBuilder<'a, 
TimestampMillisecondType>),
+    TimestampMicrosecond(TimestampArrowToVariantBuilder<'a, 
TimestampMicrosecondType>),
+    TimestampNanosecond(TimestampArrowToVariantBuilder<'a, 
TimestampNanosecondType>),
+    Date32(DateArrowToVariantBuilder<'a, Date32Type>),
+    Date64(DateArrowToVariantBuilder<'a, Date64Type>),
+    Time32Second(TimeArrowToVariantBuilder<'a, Time32SecondType>),
+    Time32Millisecond(TimeArrowToVariantBuilder<'a, Time32MillisecondType>),
+    Time64Microsecond(TimeArrowToVariantBuilder<'a, Time64MicrosecondType>),
+    Time64Nanosecond(TimeArrowToVariantBuilder<'a, Time64NanosecondType>),
+    Binary(BinaryArrowToVariantBuilder<'a, i32>),
+    LargeBinary(BinaryArrowToVariantBuilder<'a, i64>),
+    BinaryView(BinaryViewArrowToVariantBuilder<'a>),
+    FixedSizeBinary(FixedSizeBinaryArrowToVariantBuilder<'a>),
+    Utf8(StringArrowToVariantBuilder<'a, i32>),
+    LargeUtf8(StringArrowToVariantBuilder<'a, i64>),
+    Utf8View(StringViewArrowToVariantBuilder<'a>),
+    List(ListArrowToVariantBuilder<'a, i32>),
+    LargeList(ListArrowToVariantBuilder<'a, i64>),
+    Struct(StructArrowToVariantBuilder<'a>),
+    Map(MapArrowToVariantBuilder<'a>),
+    Union(UnionArrowToVariantBuilder<'a>),
+    Dictionary(DictionaryArrowToVariantBuilder<'a>),
+    RunEndEncodedInt16(RunEndEncodedArrowToVariantBuilder<'a, Int16Type>),
+    RunEndEncodedInt32(RunEndEncodedArrowToVariantBuilder<'a, Int32Type>),
+    RunEndEncodedInt64(RunEndEncodedArrowToVariantBuilder<'a, Int64Type>),
+}
+
+impl<'a> ArrowToVariantRowBuilder<'a> {
+    /// Appends a single row at the given index to the supplied builder.
+    pub fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        use ArrowToVariantRowBuilder::*;
+        match self {
+            Null(b) => b.append_row(builder, index),
+            Boolean(b) => b.append_row(builder, index),
+            PrimitiveInt8(b) => b.append_row(builder, index),
+            PrimitiveInt16(b) => b.append_row(builder, index),
+            PrimitiveInt32(b) => b.append_row(builder, index),
+            PrimitiveInt64(b) => b.append_row(builder, index),
+            PrimitiveUInt8(b) => b.append_row(builder, index),
+            PrimitiveUInt16(b) => b.append_row(builder, index),
+            PrimitiveUInt32(b) => b.append_row(builder, index),
+            PrimitiveUInt64(b) => b.append_row(builder, index),
+            PrimitiveFloat16(b) => b.append_row(builder, index),
+            PrimitiveFloat32(b) => b.append_row(builder, index),
+            PrimitiveFloat64(b) => b.append_row(builder, index),
+            Decimal32(b) => b.append_row(builder, index),
+            Decimal64(b) => b.append_row(builder, index),
+            Decimal128(b) => b.append_row(builder, index),
+            Decimal256(b) => b.append_row(builder, index),
+            TimestampSecond(b) => b.append_row(builder, index),
+            TimestampMillisecond(b) => b.append_row(builder, index),
+            TimestampMicrosecond(b) => b.append_row(builder, index),
+            TimestampNanosecond(b) => b.append_row(builder, index),
+            Date32(b) => b.append_row(builder, index),
+            Date64(b) => b.append_row(builder, index),
+            Time32Second(b) => b.append_row(builder, index),
+            Time32Millisecond(b) => b.append_row(builder, index),
+            Time64Microsecond(b) => b.append_row(builder, index),
+            Time64Nanosecond(b) => b.append_row(builder, index),
+            Binary(b) => b.append_row(builder, index),
+            LargeBinary(b) => b.append_row(builder, index),
+            BinaryView(b) => b.append_row(builder, index),
+            FixedSizeBinary(b) => b.append_row(builder, index),
+            Utf8(b) => b.append_row(builder, index),
+            LargeUtf8(b) => b.append_row(builder, index),
+            Utf8View(b) => b.append_row(builder, index),
+            List(b) => b.append_row(builder, index),
+            LargeList(b) => b.append_row(builder, index),
+            Struct(b) => b.append_row(builder, index),
+            Map(b) => b.append_row(builder, index),
+            Union(b) => b.append_row(builder, index),
+            Dictionary(b) => b.append_row(builder, index),
+            RunEndEncodedInt16(b) => b.append_row(builder, index),
+            RunEndEncodedInt32(b) => b.append_row(builder, index),
+            RunEndEncodedInt64(b) => b.append_row(builder, index),
+        }
+    }
+}
+
+/// Factory function to create the appropriate row builder for a given DataType
+pub(crate) fn make_arrow_to_variant_row_builder<'a>(
+    data_type: &'a DataType,
+    array: &'a dyn Array,
+    options: &'a CastOptions,
+) -> Result<ArrowToVariantRowBuilder<'a>, ArrowError> {
+    use ArrowToVariantRowBuilder::*;
+    let builder =
+        match data_type {
+            DataType::Null => Null(NullArrowToVariantBuilder),
+            DataType::Boolean => 
Boolean(BooleanArrowToVariantBuilder::new(array)),
+            DataType::Int8 => 
PrimitiveInt8(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Int16 => 
PrimitiveInt16(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Int32 => 
PrimitiveInt32(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Int64 => 
PrimitiveInt64(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::UInt8 => 
PrimitiveUInt8(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::UInt16 => 
PrimitiveUInt16(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::UInt32 => 
PrimitiveUInt32(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::UInt64 => 
PrimitiveUInt64(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Float16 => 
PrimitiveFloat16(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Float32 => 
PrimitiveFloat32(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Float64 => 
PrimitiveFloat64(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Decimal32(_, scale) => {
+                Decimal32(Decimal32ArrowToVariantBuilder::new(array, *scale))
+            }
+            DataType::Decimal64(_, scale) => {
+                Decimal64(Decimal64ArrowToVariantBuilder::new(array, *scale))
+            }
+            DataType::Decimal128(_, scale) => {
+                Decimal128(Decimal128ArrowToVariantBuilder::new(array, *scale))
+            }
+            DataType::Decimal256(_, scale) => {
+                Decimal256(Decimal256ArrowToVariantBuilder::new(array, *scale))
+            }
+            DataType::Timestamp(time_unit, time_zone) => {
+                match time_unit {
+                    TimeUnit::Second => 
TimestampSecond(TimestampArrowToVariantBuilder::new(
+                        array,
+                        options,
+                        time_zone.is_some(),
+                    )),
+                    TimeUnit::Millisecond => TimestampMillisecond(
+                        TimestampArrowToVariantBuilder::new(array, options, 
time_zone.is_some()),
+                    ),
+                    TimeUnit::Microsecond => TimestampMicrosecond(
+                        TimestampArrowToVariantBuilder::new(array, options, 
time_zone.is_some()),
+                    ),
+                    TimeUnit::Nanosecond => TimestampNanosecond(
+                        TimestampArrowToVariantBuilder::new(array, options, 
time_zone.is_some()),
+                    ),
+                }
+            }
+            DataType::Date32 => Date32(DateArrowToVariantBuilder::new(array, 
options)),
+            DataType::Date64 => Date64(DateArrowToVariantBuilder::new(array, 
options)),
+            DataType::Time32(time_unit) => match time_unit {
+                TimeUnit::Second => 
Time32Second(TimeArrowToVariantBuilder::new(array, options)),
+                TimeUnit::Millisecond => {
+                    Time32Millisecond(TimeArrowToVariantBuilder::new(array, 
options))
+                }
+                _ => {
+                    return Err(ArrowError::CastError(format!(
+                        "Unsupported Time32 unit: {time_unit:?}"
+                    )))
+                }
+            },
+            DataType::Time64(time_unit) => match time_unit {
+                TimeUnit::Microsecond => {
+                    Time64Microsecond(TimeArrowToVariantBuilder::new(array, 
options))
+                }
+                TimeUnit::Nanosecond => {
+                    Time64Nanosecond(TimeArrowToVariantBuilder::new(array, 
options))
+                }
+                _ => {
+                    return Err(ArrowError::CastError(format!(
+                        "Unsupported Time64 unit: {time_unit:?}"
+                    )))
+                }
+            },
+            DataType::Duration(_) | DataType::Interval(_) => {
+                return Err(ArrowError::InvalidArgumentError(
+                    "Casting duration/interval types to Variant is not 
supported. \
+                 The Variant format does not define duration/interval types."
+                        .to_string(),
+                ))
+            }
+            DataType::Binary => 
Binary(BinaryArrowToVariantBuilder::new(array)),
+            DataType::LargeBinary => 
LargeBinary(BinaryArrowToVariantBuilder::new(array)),
+            DataType::BinaryView => 
BinaryView(BinaryViewArrowToVariantBuilder::new(array)),
+            DataType::FixedSizeBinary(_) => {
+                
FixedSizeBinary(FixedSizeBinaryArrowToVariantBuilder::new(array))
+            }
+            DataType::Utf8 => Utf8(StringArrowToVariantBuilder::new(array)),
+            DataType::LargeUtf8 => 
LargeUtf8(StringArrowToVariantBuilder::new(array)),
+            DataType::Utf8View => 
Utf8View(StringViewArrowToVariantBuilder::new(array)),
+            DataType::List(_) => List(ListArrowToVariantBuilder::new(array, 
options)?),
+            DataType::LargeList(_) => 
LargeList(ListArrowToVariantBuilder::new(array, options)?),
+            DataType::Struct(_) => Struct(StructArrowToVariantBuilder::new(
+                array.as_struct(),
+                options,
+            )?),
+            DataType::Map(_, _) => Map(MapArrowToVariantBuilder::new(array, 
options)?),
+            DataType::Union(_, _) => 
Union(UnionArrowToVariantBuilder::new(array, options)?),
+            DataType::Dictionary(_, _) => {
+                Dictionary(DictionaryArrowToVariantBuilder::new(array, 
options)?)
+            }
+            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() 
{
+                DataType::Int16 => {
+                    
RunEndEncodedInt16(RunEndEncodedArrowToVariantBuilder::new(array, options)?)
+                }
+                DataType::Int32 => {
+                    
RunEndEncodedInt32(RunEndEncodedArrowToVariantBuilder::new(array, options)?)
+                }
+                DataType::Int64 => {
+                    
RunEndEncodedInt64(RunEndEncodedArrowToVariantBuilder::new(array, options)?)
+                }
+                _ => {
+                    return Err(ArrowError::CastError(format!(
+                        "Unsupported run ends type: {:?}",
+                        run_ends.data_type()
+                    )));
+                }
+            },
+            dt => {
+                return Err(ArrowError::CastError(format!(
+                    "Unsupported data type for casting to Variant: {dt:?}",
+                )));
+            }
+        };
+    Ok(builder)
+}
+
+/// Macro to define (possibly generic) row builders with consistent structure 
and behavior.
+///
+/// The macro optionally allows to define a transform for values read from the 
underlying
+/// array. Transforms of the form `|value| { ... }` are infallible (and should 
produce something
+/// that implements `Into<Variant>`), while transforms of the form `|value| -> 
Option<_> { ... }`
+/// are fallible (and should produce `Option<impl Into<Variant>>`); a failed 
tarnsform will either
+/// append null to the builder or return an error, depending on cast options.
+///
+/// Also supports optional extra fields that are passed to the constructor and 
which are available
+/// by reference in the value transform. Providing a fallible value transform 
requires also
+/// providing the extra field `options: &'a CastOptions`.
+// TODO: If/when the macro_metavar_expr feature stabilizes, the `ignore` 
meta-function would allow
+// us to "use" captured tokens without emitting them:
+//
+// ```
+// $(
+//     ${ignore($value)}
+//     $(
+//         ${ignore($option_ty)}
+//         options: &$lifetime CastOptions,
+//     )?
+// )?
+// ```
+//
+// That, in turn, would allow us to inject the `options` field whenever the 
user specifies a
+// fallible value transform, instead of requiring them to manually define it. 
This might not be
+// worth the trouble, tho, because it makes for some pretty bulky and unwieldy 
macro expansions.
+macro_rules! define_row_builder {
+    (
+        struct $name:ident<$lifetime:lifetime $(, $generic:ident: $bound:path 
)?>
+        $( where $where_path:path: $where_bound:path $(,)? )?
+        $({ $($field:ident: $field_type:ty),+ $(,)? })?,
+        |$array_param:ident| -> $array_type:ty { $init_expr:expr }
+        $(, |$value:ident| $(-> Option<$option_ty:ty>)? $value_transform:expr)?
+    ) => {
+        pub(crate) struct $name<$lifetime $(, $generic: $bound )?>
+        $( where $where_path: $where_bound )?
+        {
+            array: &$lifetime $array_type,
+            $( $( $field: $field_type, )+ )?
+        }
+
+        impl<$lifetime $(, $generic: $bound+ )?> $name<$lifetime $(, 
$generic)?>
+        $( where $where_path: $where_bound )?
+        {
+            pub(crate) fn new($array_param: &$lifetime dyn Array $(, $( 
$field: $field_type ),+ )?) -> Self {
+                Self {
+                    array: $init_expr,
+                    $( $( $field, )+ )?
+                }
+            }
+
+            fn append_row(&self, builder: &mut impl VariantBuilderExt, index: 
usize) -> Result<(), ArrowError> {
+                if self.array.is_null(index) {
+                    builder.append_null();
+                } else {
+                    // Macro hygiene: Give any extra fields names the value 
transform can access.
+                    //
+                    // The value transform doesn't normally reference cast 
options, but the macro's
+                    // caller still has to declare the field because stable 
rust has no way to "use"
+                    // a captured token without emitting it. So, silence 
unused variable warnings,
+                    // assuming that's the `options` field. Unfortunately, 
that also silences
+                    // legitimate compiler warnings if an infallible value 
transform fails to use
+                    // its first extra field.
+                    $(
+                        #[allow(unused)]
+                        $( let $field = &self.$field; )+
+                    )?
+
+                    // Apply the value transform, if any (with name swapping 
for hygiene)
+                    let value = self.array.value(index);
+                    $(
+                        let $value = value;
+                        let value = $value_transform;
+                        $(
+                            // NOTE: The `?` macro expansion fails without the 
type annotation.
+                            let Some(value): Option<$option_ty> = value else {
+                                if self.options.strict {
+                                    return 
Err(ArrowError::ComputeError(format!(
+                                        "Failed to convert value at index 
{index}: conversion failed",
+                                    )));
+                                } else {
+                                    builder.append_null();
+                                    return Ok(());
+                                }
+                            };
+                        )?
+                    )?
+                    builder.append_value(value);
+                }
+                Ok(())
+            }
+        }
+    };
+}
+
+define_row_builder!(
+    struct BooleanArrowToVariantBuilder<'a>,
+    |array| -> arrow::array::BooleanArray { array.as_boolean() }
+);
+
+define_row_builder!(
+    struct PrimitiveArrowToVariantBuilder<'a, T: ArrowPrimitiveType>
+    where T::Native: Into<Variant<'a, 'a>>,
+    |array| -> PrimitiveArray<T> { array.as_primitive() }
+);
+
+define_row_builder!(
+    struct Decimal32ArrowToVariantBuilder<'a> {
+        scale: i8,
+    },
+    |array| -> arrow::array::Decimal32Array { array.as_primitive() },
+    |value| decimal_to_variant_decimal!(value, scale, i32, VariantDecimal4)
+);
+
+define_row_builder!(
+    struct Decimal64ArrowToVariantBuilder<'a> {
+        scale: i8,
+    },
+    |array| -> arrow::array::Decimal64Array { array.as_primitive() },
+    |value| decimal_to_variant_decimal!(value, scale, i64, VariantDecimal8)
+);
+
+define_row_builder!(
+    struct Decimal128ArrowToVariantBuilder<'a> {
+        scale: i8,
+    },
+    |array| -> arrow::array::Decimal128Array { array.as_primitive() },
+    |value| decimal_to_variant_decimal!(value, scale, i128, VariantDecimal16)
+);
+
+define_row_builder!(
+    struct Decimal256ArrowToVariantBuilder<'a> {
+        scale: i8,
+    },
+    |array| -> arrow::array::Decimal256Array { array.as_primitive() },
+    |value| {
+        // Decimal256 needs special handling - convert to i128 if possible
+        match value.to_i128() {
+            Some(i128_val) => decimal_to_variant_decimal!(i128_val, scale, 
i128, VariantDecimal16),
+            None => Variant::Null, // Value too large for i128
+        }
+    }
+);
+
+define_row_builder!(
+    struct TimestampArrowToVariantBuilder<'a, T: ArrowTimestampType> {
+        options: &'a CastOptions,
+        has_time_zone: bool,
+    },
+    |array| -> arrow::array::PrimitiveArray<T> { array.as_primitive() },
+    |value| -> Option<_> {
+        // Convert using Arrow's temporal conversion functions
+        as_datetime::<T>(value).map(|naive_datetime| {
+            if *has_time_zone {
+                // Has timezone -> DateTime<Utc> -> 
TimestampMicros/TimestampNanos
+                let utc_dt: DateTime<Utc> = 
Utc.from_utc_datetime(&naive_datetime);
+                Variant::from(utc_dt) // Uses From<DateTime<Utc>> for Variant
+            } else {
+                // No timezone -> NaiveDateTime -> 
TimestampNtzMicros/TimestampNtzNanos
+                Variant::from(naive_datetime) // Uses From<NaiveDateTime> for 
Variant
+            }
+        })
+    }
+);
+
+define_row_builder!(
+    struct DateArrowToVariantBuilder<'a, T: ArrowTemporalType>
+    where
+        i64: From<T::Native>,
+    {
+        options: &'a CastOptions,
+    },
+    |array| -> PrimitiveArray<T> { array.as_primitive() },
+    |value| -> Option<_> {
+        let date_value = i64::from(value);
+        as_date::<T>(date_value)
+    }
+);
+
+define_row_builder!(
+    struct TimeArrowToVariantBuilder<'a, T: ArrowTemporalType>
+    where
+        i64: From<T::Native>,
+    {
+        options: &'a CastOptions,
+    },
+    |array| -> PrimitiveArray<T> { array.as_primitive() },
+    |value| -> Option<_> {
+        let time_value = i64::from(value);
+        as_time::<T>(time_value)
+    }
+);
+
+define_row_builder!(
+    struct BinaryArrowToVariantBuilder<'a, O: OffsetSizeTrait>,
+    |array| -> GenericBinaryArray<O> { array.as_binary() }
+);
+
+define_row_builder!(
+    struct BinaryViewArrowToVariantBuilder<'a>,
+    |array| -> arrow::array::BinaryViewArray { array.as_byte_view() }
+);
+
+define_row_builder!(
+    struct FixedSizeBinaryArrowToVariantBuilder<'a>,
+    |array| -> arrow::array::FixedSizeBinaryArray { 
array.as_fixed_size_binary() }
+);
+
+define_row_builder!(
+    struct StringArrowToVariantBuilder<'a, O: OffsetSizeTrait>,
+    |array| -> GenericStringArray<O> { array.as_string() }
+);
+
+define_row_builder!(
+    struct StringViewArrowToVariantBuilder<'a>,
+    |array| -> arrow::array::StringViewArray { array.as_string_view() }
+);
+
+/// Null builder that always appends null
+pub(crate) struct NullArrowToVariantBuilder;
+
+impl NullArrowToVariantBuilder {
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        _index: usize,
+    ) -> Result<(), ArrowError> {
+        builder.append_null();
+        Ok(())
+    }
+}
+
+/// Generic list builder for List and LargeList types
+pub(crate) struct ListArrowToVariantBuilder<'a, O: OffsetSizeTrait> {
+    list_array: &'a arrow::array::GenericListArray<O>,
+    values_builder: Box<ArrowToVariantRowBuilder<'a>>,
+}
+
+impl<'a, O: OffsetSizeTrait> ListArrowToVariantBuilder<'a, O> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let list_array = array.as_list();
+        let values = list_array.values();
+        let values_builder =
+            make_arrow_to_variant_row_builder(values.data_type(), 
values.as_ref(), options)?;
+
+        Ok(Self {
+            list_array,
+            values_builder: Box::new(values_builder),
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        if self.list_array.is_null(index) {
+            builder.append_null();
+            return Ok(());
+        }
+
+        let offsets = self.list_array.offsets();
+        let start = offsets[index].as_usize();
+        let end = offsets[index + 1].as_usize();
+
+        let mut list_builder = builder.try_new_list()?;
+        for value_index in start..end {
+            self.values_builder
+                .append_row(&mut list_builder, value_index)?;
+        }
+        list_builder.finish();
+        Ok(())
+    }
+}
+
+/// Struct builder for StructArray
+pub(crate) struct StructArrowToVariantBuilder<'a> {
+    struct_array: &'a arrow::array::StructArray,
+    field_builders: Vec<(&'a str, ArrowToVariantRowBuilder<'a>)>,
+}
+
+impl<'a> StructArrowToVariantBuilder<'a> {
+    pub(crate) fn new(
+        struct_array: &'a arrow::array::StructArray,
+        options: &'a CastOptions,
+    ) -> Result<Self, ArrowError> {
+        let mut field_builders = Vec::new();
+
+        // Create a row builder for each field
+        for (field_name, field_array) in struct_array
+            .column_names()
+            .iter()
+            .zip(struct_array.columns().iter())
+        {
+            let field_builder = make_arrow_to_variant_row_builder(
+                field_array.data_type(),
+                field_array.as_ref(),
+                options,
+            )?;
+            field_builders.push((*field_name, field_builder));
+        }
+
+        Ok(Self {
+            struct_array,
+            field_builders,
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        if self.struct_array.is_null(index) {
+            builder.append_null();
+        } else {
+            // Create object builder for this struct row
+            let mut obj_builder = builder.try_new_object()?;
+
+            // Process each field
+            for (field_name, row_builder) in &mut self.field_builders {
+                let mut field_builder =
+                    parquet_variant::ObjectFieldBuilder::new(field_name, &mut 
obj_builder);
+                row_builder.append_row(&mut field_builder, index)?;
+            }
+
+            obj_builder.finish();
+        }
+        Ok(())
+    }
+}
+
+/// Map builder for MapArray types
+pub(crate) struct MapArrowToVariantBuilder<'a> {
+    map_array: &'a arrow::array::MapArray,
+    key_strings: arrow::array::StringArray,
+    values_builder: Box<ArrowToVariantRowBuilder<'a>>,
+}
+
+impl<'a> MapArrowToVariantBuilder<'a> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let map_array = array.as_map();
+
+        // Pre-cast keys to strings once
+        let keys = cast(map_array.keys(), &DataType::Utf8)?;
+        let key_strings = keys.as_string::<i32>().clone();
+
+        // Create recursive builder for values
+        let values = map_array.values();
+        let values_builder =
+            make_arrow_to_variant_row_builder(values.data_type(), 
values.as_ref(), options)?;
+
+        Ok(Self {
+            map_array,
+            key_strings,
+            values_builder: Box::new(values_builder),
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        // Check for NULL map first (via null bitmap)
+        if self.map_array.is_null(index) {
+            builder.append_null();
+            return Ok(());
+        }
+
+        let offsets = self.map_array.offsets();
+        let start = offsets[index].as_usize();
+        let end = offsets[index + 1].as_usize();
+
+        // Create object builder for this map
+        let mut object_builder = builder.try_new_object()?;
+
+        // Add each key-value pair (loop does nothing for empty maps - 
correct!)
+        for kv_index in start..end {
+            let key = self.key_strings.value(kv_index);
+            let mut field_builder = ObjectFieldBuilder::new(key, &mut 
object_builder);
+            self.values_builder
+                .append_row(&mut field_builder, kv_index)?;
+        }
+
+        object_builder.finish();
+        Ok(())
+    }
+}
+
+/// Union builder for both sparse and dense union arrays
+///
+/// NOTE: Union type ids are _not_ required to be dense, hence the hash map 
for child builders.
+pub(crate) struct UnionArrowToVariantBuilder<'a> {
+    union_array: &'a arrow::array::UnionArray,
+    child_builders: HashMap<i8, Box<ArrowToVariantRowBuilder<'a>>>,
+}
+
+impl<'a> UnionArrowToVariantBuilder<'a> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let union_array = array.as_union();
+        let type_ids = union_array.type_ids();
+
+        // Create child builders for each union field
+        let mut child_builders = HashMap::new();
+        for &type_id in type_ids {
+            let child_array = union_array.child(type_id);
+            let child_builder = make_arrow_to_variant_row_builder(
+                child_array.data_type(),
+                child_array.as_ref(),
+                options,
+            )?;
+            child_builders.insert(type_id, Box::new(child_builder));
+        }
+
+        Ok(Self {
+            union_array,
+            child_builders,
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        let type_id = self.union_array.type_id(index);
+        let value_offset = self.union_array.value_offset(index);
+
+        // Delegate to the appropriate child builder, or append null to handle 
an invalid type_id
+        match self.child_builders.get_mut(&type_id) {
+            Some(child_builder) => child_builder.append_row(builder, 
value_offset)?,
+            None => builder.append_null(),
+        }
+
+        Ok(())
+    }
+}
+
+/// Dictionary array builder with simple O(1) indexing
+pub(crate) struct DictionaryArrowToVariantBuilder<'a> {
+    keys: &'a dyn Array, // only needed for null checks
+    normalized_keys: Vec<usize>,
+    values_builder: Box<ArrowToVariantRowBuilder<'a>>,
+}
+
+impl<'a> DictionaryArrowToVariantBuilder<'a> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let dict_array = array.as_any_dictionary();
+        let values = dict_array.values();
+        let values_builder =
+            make_arrow_to_variant_row_builder(values.data_type(), 
values.as_ref(), options)?;
+
+        // WARNING: normalized_keys panics if values is empty
+        let normalized_keys = match values.len() {
+            0 => Vec::new(),
+            _ => dict_array.normalized_keys(),
+        };
+
+        Ok(Self {
+            keys: dict_array.keys(),
+            normalized_keys,
+            values_builder: Box::new(values_builder),
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        if self.keys.is_null(index) {
+            builder.append_null();
+        } else {
+            let normalized_key = self.normalized_keys[index];
+            self.values_builder.append_row(builder, normalized_key)?;
+        }
+        Ok(())
+    }
+}
+
+/// Run-end encoded array builder with efficient sequential access
+pub(crate) struct RunEndEncodedArrowToVariantBuilder<'a, R: RunEndIndexType> {
+    run_array: &'a arrow::array::RunArray<R>,
+    values_builder: Box<ArrowToVariantRowBuilder<'a>>,
+
+    run_ends: &'a [R::Native],
+    run_number: usize, // Physical index into run_ends and values
+    run_start: usize,  // Logical start index of current run
+}
+
+impl<'a, R: RunEndIndexType> RunEndEncodedArrowToVariantBuilder<'a, R> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let Some(run_array) = array.as_run_opt() else {
+            return Err(ArrowError::CastError("Expected RunArray".to_string()));
+        };
+
+        let values = run_array.values();
+        let values_builder =
+            make_arrow_to_variant_row_builder(values.data_type(), 
values.as_ref(), options)?;
+
+        Ok(Self {
+            run_array,
+            values_builder: Box::new(values_builder),
+            run_ends: run_array.run_ends().values(),
+            run_number: 0,
+            run_start: 0,
+        })
+    }
+
+    fn set_run_for_index(&mut self, index: usize) -> Result<(), ArrowError> {
+        if index >= self.run_start {
+            let Some(run_end) = self.run_ends.get(self.run_number) else {
+                return Err(ArrowError::CastError(format!(
+                    "Index {index} beyond run array"
+                )));
+            };
+            if index < run_end.as_usize() {
+                return Ok(());
+            }
+            if index == run_end.as_usize() {
+                self.run_number += 1;
+                self.run_start = run_end.as_usize();
+                return Ok(());
+            }
+        }
+
+        // Use partition_point for all non-sequential cases
+        let run_number = self
+            .run_ends
+            .partition_point(|&run_end| run_end.as_usize() <= index);
+        if run_number >= self.run_ends.len() {
+            return Err(ArrowError::CastError(format!(
+                "Index {index} beyond run array"
+            )));
+        }
+        self.run_number = run_number;
+        self.run_start = match run_number {
+            0 => 0,
+            _ => self.run_ends[run_number - 1].as_usize(),
+        };
+        Ok(())
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        self.set_run_for_index(index)?;
+
+        // Handle null values
+        if self.run_array.values().is_null(self.run_number) {
+            builder.append_null();
+            return Ok(());
+        }
+
+        // Re-encode the value
+        self.values_builder.append_row(builder, self.run_number)?;
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{VariantArray, VariantArrayBuilder};
+    use arrow::array::{ArrayRef, BooleanArray, Int32Array, StringArray};
+    use std::sync::Arc;
+
+    /// Builds a VariantArray from an Arrow array using the row builder.
+    fn execute_row_builder_test(array: &dyn Array) -> VariantArray {
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(array.data_type(), array, 
&options).unwrap();
+
+        let mut array_builder = VariantArrayBuilder::new(array.len());
+
+        // The repetitive loop that appears in every test
+        for i in 0..array.len() {
+            let mut variant_builder = array_builder.variant_builder();
+            row_builder.append_row(&mut variant_builder, i).unwrap();
+            variant_builder.finish();
+        }
+
+        let variant_array = array_builder.build();
+        assert_eq!(variant_array.len(), array.len());
+        variant_array
+    }
+
+    /// Generic helper function to test row builders with basic assertion 
patterns.
+    /// Uses execute_row_builder_test and adds simple value comparison 
assertions.
+    fn test_row_builder_basic(array: &dyn Array, expected_values: 
Vec<Option<Variant>>) {
+        let variant_array = execute_row_builder_test(array);
+
+        // The repetitive assertion pattern
+        for (i, expected) in expected_values.iter().enumerate() {
+            match expected {
+                Some(variant) => {
+                    assert_eq!(variant_array.value(i), *variant, "Mismatch at 
index {}", i)
+                }
+                None => assert!(variant_array.is_null(i), "Expected null at 
index {}", i),
+            }
+        }
+    }
+
+    #[test]
+    fn test_primitive_row_builder() {

Review Comment:
   Very nice tests! it's very help to understand the variant logic and the 
respective arrow type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to