github-actions[bot] commented on code in PR #63192:
URL: https://github.com/apache/doris/pull/63192#discussion_r3254362903


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java:
##########
@@ -1058,13 +1064,66 @@ public TFileFormatType getFileFormatType() throws 
UserException {
         if (icebergFormat.equalsIgnoreCase("parquet")) {
             type = TFileFormatType.FORMAT_PARQUET;
         } else if (icebergFormat.equalsIgnoreCase("orc")) {
+            validateVariantReadSupported(icebergFormat);
             type = TFileFormatType.FORMAT_ORC;
         } else {
             throw new DdlException(String.format("Unsupported format name: %s 
for iceberg table.", icebergFormat));
         }
         return type;
     }
 
+    private void validateVariantReadSupported(String icebergFormat) throws 
DdlException {
+        String variantColumnName = findVariantReadColumnName();
+        if (variantColumnName != null) {
+            throw new DdlException("Reading Iceberg VARIANT columns is only 
supported for Parquet files, "
+                    + "but table file format is " + icebergFormat + ": " + 
variantColumnName);
+        }
+    }
+
+    @VisibleForTesting
+    void validateVariantDataFileFormat(FileFormat dataFileFormat, String path) 
{
+        if (dataFileFormat == FileFormat.PARQUET) {
+            return;
+        }
+        String variantColumnName = findVariantReadColumnName();
+        if (variantColumnName != null) {
+            throw new NotSupportedException("Reading Iceberg VARIANT columns 
is only supported for Parquet files, "
+                    + "but data file format is " + dataFileFormat.name() + ": 
" + variantColumnName
+                    + " (" + path + ")");
+        }
+    }
+
+    private String findVariantReadColumnName() {
+        for (SlotDescriptor slot : desc.getSlots()) {

Review Comment:
   This now checks the full catalog column type, so it rejects non-Parquet 
Iceberg scans even when pruning/materialization only reads non-VARIANT 
subfields. For example, an ORC table with `s STRUCT<a INT, v VARIANT>` and a 
query `SELECT s.a FROM t` can materialize only `s.a`, but 
`slot.getColumn().getType()` still contains `s.v` and 
`findVariantReadColumnName()` throws the Parquet-only VARIANT error. The 
validation needs to inspect the selected/pruned access paths (or the effective 
materialized type) and reject only when a VARIANT subfield is actually read. 
This is distinct from the earlier recursive-validation threads: the recursive 
check was needed for selected nested VARIANTs, but applying it to the whole 
catalog column over-rejects legal non-VARIANT reads.



##########
be/src/format/parquet/parquet_variant_reader.cpp:
##########
@@ -0,0 +1,1161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/parquet/parquet_variant_reader.h"
+
+#include <algorithm>
+#include <cstring>
+#include <deque>
+#include <iomanip>
+#include <limits>
+#include <sstream>
+#include <string_view>
+#include <vector>
+
+#include "core/column/column_variant.h"
+#include "core/data_type/data_type_decimal.h"
+#include "core/value/jsonb_value.h"
+#include "exec/common/variant_util.h"
+
+namespace doris::parquet {
+
+std::string format_variant_uuid(const uint8_t* ptr) {
+    static constexpr char hex[] = "0123456789abcdef";
+    std::string uuid;
+    uuid.reserve(36);
+    for (int i = 0; i < 16; ++i) {
+        if (i == 4 || i == 6 || i == 8 || i == 10) {
+            uuid.push_back('-');
+        }
+        uuid.push_back(hex[ptr[i] >> 4]);
+        uuid.push_back(hex[ptr[i] & 0x0f]);
+    }
+    return uuid;
+}
+
+namespace {
+
+struct VariantMetadata {
+    std::vector<std::string> dictionary;
+};
+
+struct VariantObjectLayout {
+    std::vector<uint64_t> field_ids;
+    std::vector<uint64_t> field_offsets;
+    std::vector<uint64_t> field_ends;
+    const uint8_t* fields = nullptr;
+    uint64_t total_size = 0;
+};
+
+struct VariantArrayLayout {
+    std::vector<uint64_t> field_offsets;
+    const uint8_t* fields = nullptr;
+    uint64_t total_size = 0;
+};
+
+uint64_t read_unsigned_le(const uint8_t* ptr, int size) {
+    uint64_t value = 0;
+    for (int i = 0; i < size; ++i) {
+        value |= static_cast<uint64_t>(ptr[i]) << (i * 8);
+    }
+    return value;
+}
+
+int64_t read_signed_le(const uint8_t* ptr, int size) {
+    uint64_t value = read_unsigned_le(ptr, size);
+    if (size < 8) {
+        uint64_t sign_bit = uint64_t {1} << (size * 8 - 1);
+        if ((value & sign_bit) != 0) {
+            uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1);
+            value |= mask;
+        }
+    }
+    return static_cast<int64_t>(value);
+}
+
+__int128 read_signed_int128_le(const uint8_t* ptr) {
+    unsigned __int128 unsigned_value = 0;
+    for (int i = 15; i >= 0; --i) {
+        unsigned_value <<= 8;
+        unsigned_value |= ptr[i];
+    }
+    static constexpr unsigned __int128 sign_bit = static_cast<unsigned 
__int128>(1) << 127;
+    if ((unsigned_value & sign_bit) == 0) {
+        return static_cast<__int128>(unsigned_value);
+    }
+    static constexpr __int128 signed_half_range = static_cast<__int128>(1) << 
126;
+    return (static_cast<__int128>(unsigned_value & (sign_bit - 1)) - 
signed_half_range) -
+           signed_half_range;
+}
+
+Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size,
+                         std::string_view context) {
+    if (ptr > end) {
+        return Status::Corruption("Invalid Parquet VARIANT {} encoding", 
context);
+    }
+    if (size > static_cast<size_t>(end - ptr)) {
+        return Status::Corruption("Invalid Parquet VARIANT {} encoding", 
context);
+    }
+    return Status::OK();
+}
+
+Status require_available_entries(const uint8_t* ptr, const uint8_t* end, 
uint64_t entries,
+                                 size_t entry_size, std::string_view context) {
+    if (entries > std::numeric_limits<size_t>::max() / entry_size) {
+        return Status::Corruption("Invalid Parquet VARIANT {} encoding", 
context);
+    }
+    return require_available(ptr, end, static_cast<size_t>(entries) * 
entry_size, context);
+}
+
+bool variant_string_less(std::string_view lhs, std::string_view rhs) {
+    return std::lexicographical_compare(
+            lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), [](char left, char 
right) {
+                return static_cast<unsigned char>(left) < static_cast<unsigned 
char>(right);
+            });
+}
+
+bool is_valid_utf8(std::string_view value) {
+    const auto* data = reinterpret_cast<const uint8_t*>(value.data());
+    const auto* end = data + value.size();
+    while (data < end) {
+        const uint8_t first = *data++;
+        if (first <= 0x7f) {
+            continue;
+        }
+
+        uint32_t code_point = 0;
+        size_t continuation_bytes = 0;
+        if (first >= 0xc2 && first <= 0xdf) {
+            code_point = first & 0x1f;
+            continuation_bytes = 1;
+        } else if (first >= 0xe0 && first <= 0xef) {
+            code_point = first & 0x0f;
+            continuation_bytes = 2;
+        } else if (first >= 0xf0 && first <= 0xf4) {
+            code_point = first & 0x07;
+            continuation_bytes = 3;
+        } else {
+            return false;
+        }
+
+        if (static_cast<size_t>(end - data) < continuation_bytes) {
+            return false;
+        }
+        for (size_t i = 0; i < continuation_bytes; ++i) {
+            const uint8_t byte = *data++;
+            if ((byte & 0xc0) != 0x80) {
+                return false;
+            }
+            code_point = (code_point << 6) | (byte & 0x3f);
+        }
+
+        if ((continuation_bytes == 2 && code_point < 0x800) ||
+            (continuation_bytes == 3 && code_point < 0x10000) ||
+            (code_point >= 0xd800 && code_point <= 0xdfff) || code_point > 
0x10ffff) {
+            return false;
+        }
+    }
+    return true;
+}
+
+Status require_valid_utf8(std::string_view value, std::string_view context) {
+    if (!is_valid_utf8(value)) {
+        return Status::Corruption("Invalid Parquet VARIANT {} UTF-8 string", 
context);
+    }
+    return Status::OK();
+}
+
+Status validate_array_field_offsets(const std::vector<uint64_t>& 
field_offsets, uint64_t total_size,
+                                    std::string_view context) {
+    if (field_offsets.empty() || field_offsets.front() != 0) {
+        return Status::Corruption("Invalid Parquet VARIANT {} field offsets", 
context);
+    }
+    for (size_t i = 0; i < field_offsets.size(); ++i) {
+        if (field_offsets[i] > total_size) {
+            return Status::Corruption("Invalid Parquet VARIANT {} field offset 
{}", context,
+                                      field_offsets[i]);
+        }
+        if (i > 0 && field_offsets[i] < field_offsets[i - 1]) {
+            return Status::Corruption("Invalid Parquet VARIANT {} field 
offsets", context);
+        }
+    }
+    return Status::OK();
+}
+
+Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, 
uint64_t total_size,
+                                 std::vector<uint64_t>* field_ends) {
+    if (field_offsets.empty()) {
+        return Status::Corruption("Invalid Parquet VARIANT object field 
offsets");
+    }
+    size_t num_elements = field_offsets.size() - 1;
+    if (num_elements == 0) {
+        if (total_size != 0) {
+            return Status::Corruption("Invalid Parquet VARIANT object field 
offsets");
+        }
+        return Status::OK();
+    }
+
+    std::vector<std::pair<uint64_t, size_t>> physical_offsets;
+    physical_offsets.reserve(num_elements);
+    for (size_t i = 0; i < num_elements; ++i) {
+        if (field_offsets[i] >= total_size) {
+            return Status::Corruption("Invalid Parquet VARIANT object field 
offset {}",
+                                      field_offsets[i]);
+        }
+        physical_offsets.emplace_back(field_offsets[i], i);
+    }
+    std::sort(physical_offsets.begin(), physical_offsets.end());
+    if (physical_offsets.front().first != 0) {
+        return Status::Corruption("Invalid Parquet VARIANT object field 
offsets");
+    }
+
+    field_ends->assign(num_elements, 0);
+    for (size_t i = 0; i < physical_offsets.size(); ++i) {
+        if (i > 0 && physical_offsets[i].first == physical_offsets[i - 
1].first) {
+            return Status::Corruption("Invalid Parquet VARIANT object field 
offsets");
+        }
+        uint64_t child_end =
+                i + 1 < physical_offsets.size() ? physical_offsets[i + 
1].first : total_size;
+        (*field_ends)[physical_offsets[i].second] = child_end;
+    }
+    return Status::OK();
+}
+
+void append_json_string(std::string_view value, std::string* json, bool 
escape_non_ascii = false) {
+    json->push_back('"');
+    static constexpr char hex[] = "0123456789abcdef";
+    for (unsigned char c : value) {
+        switch (c) {
+        case '"':
+            json->append("\\\"");
+            break;
+        case '\\':
+            json->append("\\\\");
+            break;
+        case '\b':
+            json->append("\\b");
+            break;
+        case '\f':
+            json->append("\\f");
+            break;
+        case '\n':
+            json->append("\\n");
+            break;
+        case '\r':
+            json->append("\\r");
+            break;
+        case '\t':
+            json->append("\\t");
+            break;
+        default:
+            if (c < 0x20 || (escape_non_ascii && c >= 0x80)) {
+                json->append("\\u00");
+                json->push_back(hex[c >> 4]);
+                json->push_back(hex[c & 0x0f]);
+            } else {
+                json->push_back(static_cast<char>(c));
+            }
+            break;
+        }
+    }
+    json->push_back('"');
+}
+
+template <typename T>
+Status append_floating_json(T value, std::string* json) {
+    std::ostringstream oss;
+    oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value;
+    json->append(oss.str());
+    return Status::OK();
+}
+
+std::string int128_to_string(__int128 value) {
+    if (value == 0) {
+        return "0";
+    }
+    bool negative = value < 0;
+    unsigned __int128 unsigned_value = negative ? static_cast<unsigned 
__int128>(-(value + 1)) + 1
+                                                : static_cast<unsigned 
__int128>(value);
+    std::string digits;
+    while (unsigned_value > 0) {
+        digits.push_back(static_cast<char>('0' + unsigned_value % 10));
+        unsigned_value /= 10;
+    }
+    if (negative) {
+        digits.push_back('-');
+    }
+    std::reverse(digits.begin(), digits.end());
+    return digits;
+}
+
+void append_decimal_json(__int128 unscaled, int scale, std::string* json) {
+    std::string value = int128_to_string(unscaled);
+    bool negative = !value.empty() && value[0] == '-';
+    std::string digits = negative ? value.substr(1) : value;
+    if (scale == 0) {
+        json->append(value);
+        return;
+    }
+    if (scale > 0) {
+        if (digits.size() <= static_cast<size_t>(scale)) {
+            digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), 
'0');
+        }
+        digits.insert(digits.end() - scale, '.');
+        if (negative) {
+            json->push_back('-');
+        }
+        json->append(digits);
+        return;
+    }
+    if (negative) {
+        json->push_back('-');
+    }
+    json->append(digits);
+    json->append(static_cast<size_t>(-scale), '0');
+}
+
+Status decode_primitive(uint8_t primitive_header, const uint8_t* ptr, const 
uint8_t* end,
+                        std::string* json, const uint8_t** next);
+Status decode_value(const uint8_t* ptr, const uint8_t* end, const 
VariantMetadata& metadata,
+                    std::string* json, const uint8_t** next);
+
+void append_uuid_json(const uint8_t* ptr, std::string* json) {
+    json->push_back('"');
+    json->append(format_variant_uuid(ptr));
+    json->push_back('"');
+}
+
+Status make_jsonb_field(std::string_view json, FieldWithDataType* value) {
+    JsonBinaryValue jsonb_value;
+    RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size()));
+    value->field =
+            Field::create_field<TYPE_JSONB>(JsonbField(jsonb_value.value(), 
jsonb_value.size()));
+    value->base_scalar_type_id = TYPE_JSONB;
+    value->num_dimensions = 0;
+    value->precision = 0;
+    value->scale = 0;
+    return Status::OK();
+}
+
+std::string make_null_array_json(size_t elements) {
+    std::string json = "[";
+    for (size_t i = 0; i < elements; ++i) {
+        if (i != 0) {
+            json.push_back(',');
+        }
+        json.append("null");
+    }
+    json.push_back(']');
+    return json;
+}
+
+Status insert_empty_object_marker(const PathInData& path, VariantMap* values) {
+    FieldWithDataType value;
+    RETURN_IF_ERROR(make_jsonb_field("{}", &value));
+    (*values)[path] = std::move(value);
+    return Status::OK();
+}
+
+Status parse_json_to_variant_map(std::string_view json, const PathInData& 
prefix,
+                                 VariantMap* values) {
+    auto parsed_column = ColumnVariant::create(0, false);
+    ParseConfig parse_config;
+    StringRef json_ref(json.data(), json.size());
+    RETURN_IF_CATCH_EXCEPTION(
+            variant_util::parse_json_to_variant(*parsed_column, json_ref, 
nullptr, parse_config));
+    Field parsed = (*parsed_column)[0];
+    if (parsed.is_null()) {
+        (*values)[prefix] = FieldWithDataType {.field = Field()};
+        return Status::OK();
+    }
+
+    PathInDataBuilder path;
+    path.append(prefix.get_parts(), false);
+    for (auto& [parsed_path, value] : parsed.get<TYPE_VARIANT>()) {
+        path.append(parsed_path.get_parts(), false);
+        (*values)[path.build()] = std::move(value);
+        for (size_t i = 0; i < parsed_path.get_parts().size(); ++i) {
+            path.pop_back();
+        }
+    }
+    return Status::OK();
+}
+
+void fill_field_type_info(FieldWithDataType* value) {
+    FieldInfo info;
+    variant_util::get_field_info(value->field, &info);
+    value->base_scalar_type_id = info.scalar_type_id;
+    value->num_dimensions = static_cast<uint8_t>(info.num_dimensions);
+    value->precision = info.precision;
+    value->scale = info.scale;
+}
+
+template <PrimitiveType Primitive>
+void set_primitive_variant_field(const typename 
PrimitiveTypeTraits<Primitive>::CppType& data,
+                                 FieldWithDataType* value) {
+    value->field = Field::create_field<Primitive>(data);
+    fill_field_type_info(value);
+}
+
+Status read_decimal_primitive_field(uint8_t primitive_header, const uint8_t* 
ptr,
+                                    const uint8_t* end, FieldWithDataType* 
value,
+                                    const uint8_t** next) {
+    int value_size = 16;
+    if (primitive_header == 8) {
+        value_size = 4;
+    } else if (primitive_header == 9) {
+        value_size = 8;
+    }
+    RETURN_IF_ERROR(require_available(ptr, end, 1 + value_size, "decimal 
value"));
+    int scale = static_cast<int8_t>(*ptr++);
+    if (scale < 0 || scale > BeConsts::MAX_DECIMAL128_PRECISION) {
+        return Status::Corruption("Invalid Parquet VARIANT decimal scale {}", 
scale);
+    }
+
+    if (primitive_header == 8) {
+        set_primitive_variant_field<TYPE_DECIMAL32>(
+                Decimal32(static_cast<Int32>(read_signed_le(ptr, 
value_size))), value);
+        value->precision = BeConsts::MAX_DECIMAL32_PRECISION;
+    } else if (primitive_header == 9) {
+        set_primitive_variant_field<TYPE_DECIMAL64>(
+                Decimal64(static_cast<Int64>(read_signed_le(ptr, 
value_size))), value);
+        value->precision = BeConsts::MAX_DECIMAL64_PRECISION;
+    } else {
+        
set_primitive_variant_field<TYPE_DECIMAL128I>(Decimal128V3(read_signed_int128_le(ptr)),
+                                                      value);
+        value->precision = BeConsts::MAX_DECIMAL128_PRECISION;
+    }
+    value->scale = scale;
+    *next = ptr + value_size;
+    return Status::OK();
+}
+
+Status read_integral_primitive_field(uint8_t primitive_header, const uint8_t* 
ptr,
+                                     const uint8_t* end, FieldWithDataType* 
value,
+                                     const uint8_t** next) {
+    int value_size = 8;
+    if (primitive_header == 3) {
+        value_size = 1;
+    } else if (primitive_header == 4) {
+        value_size = 2;
+    } else if (primitive_header == 5 || primitive_header == 11) {
+        value_size = 4;
+    }
+    RETURN_IF_ERROR(require_available(ptr, end, value_size, "integer value"));
+    const auto data = static_cast<Int64>(read_signed_le(ptr, value_size));
+
+    switch (primitive_header) {
+    case 3:
+        set_primitive_variant_field<TYPE_TINYINT>(static_cast<Int8>(data), 
value);
+        break;
+    case 4:
+        set_primitive_variant_field<TYPE_SMALLINT>(static_cast<Int16>(data), 
value);
+        break;
+    case 5:
+        set_primitive_variant_field<TYPE_INT>(static_cast<Int32>(data), value);
+        break;
+    case 6:
+    case 11:
+    case 12:
+    case 13:
+    case 17:
+        set_primitive_variant_field<TYPE_BIGINT>(data, value);
+        break;
+    case 18:
+    case 19:
+        set_primitive_variant_field<TYPE_BIGINT>(data / 1000, value);
+        break;
+    default:
+        return Status::Corruption("Unsupported Parquet VARIANT primitive 
header {}",
+                                  primitive_header);
+    }
+    *next = ptr + value_size;
+    return Status::OK();
+}
+
+Status read_floating_primitive_field(uint8_t primitive_header, const uint8_t* 
ptr,
+                                     const uint8_t* end, FieldWithDataType* 
value,
+                                     const uint8_t** next) {
+    if (primitive_header == 14) {
+        RETURN_IF_ERROR(require_available(ptr, end, 4, "float value"));
+        auto bits = static_cast<uint32_t>(read_unsigned_le(ptr, 4));
+        float data;
+        std::memcpy(&data, &bits, sizeof(data));
+        set_primitive_variant_field<TYPE_FLOAT>(data, value);
+        *next = ptr + 4;
+        return Status::OK();
+    }
+
+    DCHECK_EQ(primitive_header, 7);
+    RETURN_IF_ERROR(require_available(ptr, end, 8, "double value"));
+    uint64_t bits = read_unsigned_le(ptr, 8);
+    double data;
+    std::memcpy(&data, &bits, sizeof(data));
+    set_primitive_variant_field<TYPE_DOUBLE>(data, value);
+    *next = ptr + 8;
+    return Status::OK();
+}
+
+Status read_binary_primitive_field(const uint8_t* ptr, const uint8_t* end, 
FieldWithDataType* value,
+                                   std::deque<std::string>* string_values, 
const uint8_t** next) {
+    RETURN_IF_ERROR(require_available(ptr, end, 4, "binary length"));
+    uint64_t size = read_unsigned_le(ptr, 4);
+    ptr += 4;
+    RETURN_IF_ERROR(require_available(ptr, end, size, "binary value"));
+    string_values->emplace_back(reinterpret_cast<const char*>(ptr), 
static_cast<size_t>(size));
+    value->field = 
Field::create_field<TYPE_VARBINARY>(StringView(string_values->back()));
+    fill_field_type_info(value);
+    *next = ptr + size;
+    return Status::OK();
+}
+
+Status read_string_primitive_field(const uint8_t* ptr, const uint8_t* end, 
FieldWithDataType* value,
+                                   const uint8_t** next) {
+    RETURN_IF_ERROR(require_available(ptr, end, 4, "binary or string length"));
+    uint64_t size = read_unsigned_le(ptr, 4);
+    ptr += 4;
+    RETURN_IF_ERROR(require_available(ptr, end, size, "string value"));
+    std::string_view data(reinterpret_cast<const char*>(ptr), 
static_cast<size_t>(size));
+    RETURN_IF_ERROR(require_valid_utf8(data, "string value"));
+    value->field = Field::create_field<TYPE_STRING>(String(data));
+    fill_field_type_info(value);
+    *next = ptr + size;
+    return Status::OK();
+}
+
+Status read_uuid_primitive_field(const uint8_t* ptr, const uint8_t* end, 
FieldWithDataType* value,
+                                 const uint8_t** next) {
+    RETURN_IF_ERROR(require_available(ptr, end, 16, "uuid value"));
+    value->field = Field::create_field<TYPE_STRING>(format_variant_uuid(ptr));
+    fill_field_type_info(value);
+    *next = ptr + 16;
+    return Status::OK();
+}
+
+Status read_array_layout(uint8_t value_header, const uint8_t* ptr, const 
uint8_t* end,
+                         VariantArrayLayout* layout) {
+    int field_offset_size = (value_header & 0x03) + 1;
+    int num_elements_size = (value_header & 0x04) != 0 ? 4 : 1;
+
+    RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "array 
element count"));
+    uint64_t num_elements = read_unsigned_le(ptr, num_elements_size);
+    ptr += num_elements_size;
+
+    RETURN_IF_ERROR(require_available_entries(ptr, end, num_elements + 1, 
field_offset_size,
+                                              "array field offsets"));
+    layout->field_offsets.resize(num_elements + 1);
+    for (uint64_t i = 0; i <= num_elements; ++i) {
+        layout->field_offsets[i] = read_unsigned_le(ptr, field_offset_size);
+        ptr += field_offset_size;
+    }
+
+    layout->total_size = layout->field_offsets.back();
+    layout->fields = ptr;
+    RETURN_IF_ERROR(
+            require_available(layout->fields, end, layout->total_size, "array 
field values"));
+    RETURN_IF_ERROR(
+            validate_array_field_offsets(layout->field_offsets, 
layout->total_size, "array"));
+    return Status::OK();
+}
+
+Status read_object_layout(uint8_t value_header, const uint8_t* ptr, const 
uint8_t* end,
+                          const VariantMetadata& metadata, 
VariantObjectLayout* layout) {
+    int field_offset_size = (value_header & 0x03) + 1;
+    int field_id_size = ((value_header >> 2) & 0x03) + 1;
+    int num_elements_size = (value_header & 0x10) != 0 ? 4 : 1;
+
+    RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "object 
element count"));
+    uint64_t num_elements = read_unsigned_le(ptr, num_elements_size);
+    ptr += num_elements_size;
+
+    RETURN_IF_ERROR(
+            require_available_entries(ptr, end, num_elements, field_id_size, 
"object field ids"));
+    layout->field_ids.resize(num_elements);
+    for (uint64_t i = 0; i < num_elements; ++i) {
+        layout->field_ids[i] = read_unsigned_le(ptr, field_id_size);
+        ptr += field_id_size;
+        if (layout->field_ids[i] >= metadata.dictionary.size()) {
+            return Status::Corruption("Invalid Parquet VARIANT object field id 
{}",
+                                      layout->field_ids[i]);
+        }
+        if (i > 0 && 
!variant_string_less(metadata.dictionary[layout->field_ids[i - 1]],
+                                          
metadata.dictionary[layout->field_ids[i]])) {
+            return Status::Corruption("Invalid Parquet VARIANT object field 
names");
+        }
+    }
+
+    RETURN_IF_ERROR(require_available_entries(ptr, end, num_elements + 1, 
field_offset_size,
+                                              "object field offsets"));
+    layout->field_offsets.resize(num_elements + 1);
+    for (uint64_t i = 0; i <= num_elements; ++i) {
+        layout->field_offsets[i] = read_unsigned_le(ptr, field_offset_size);
+        ptr += field_offset_size;
+    }
+
+    layout->total_size = layout->field_offsets.back();
+    layout->fields = ptr;
+    RETURN_IF_ERROR(
+            require_available(layout->fields, end, layout->total_size, "object 
field values"));
+    RETURN_IF_ERROR(compute_object_field_ends(layout->field_offsets, 
layout->total_size,
+                                              &layout->field_ends));
+    return Status::OK();
+}
+
+Status decode_value_to_variant_map(const uint8_t* ptr, const uint8_t* end,
+                                   const VariantMetadata& metadata, 
PathInDataBuilder* path,
+                                   VariantMap* values, 
std::deque<std::string>* string_values,
+                                   const uint8_t** next);
+
+Status decode_primitive_to_variant_map(uint8_t primitive_header, const 
uint8_t* ptr,
+                                       const uint8_t* end, const 
VariantMetadata&,
+                                       PathInDataBuilder* path, VariantMap* 
values,
+                                       std::deque<std::string>* string_values,
+                                       const uint8_t** next) {
+    FieldWithDataType value;
+    switch (primitive_header) {
+    case 0:
+        value.field = Field();
+        value.base_scalar_type_id = INVALID_TYPE;
+        *next = ptr;
+        break;
+    case 1:
+        set_primitive_variant_field<TYPE_BOOLEAN>(true, &value);
+        *next = ptr;
+        break;
+    case 2:
+        set_primitive_variant_field<TYPE_BOOLEAN>(false, &value);
+        *next = ptr;
+        break;
+    case 3:
+    case 4:
+    case 5:
+    case 6:
+    case 11:
+    case 12:
+    case 13:
+    case 17:
+    case 18:
+    case 19:
+        RETURN_IF_ERROR(read_integral_primitive_field(primitive_header, ptr, 
end, &value, next));
+        break;
+    case 7:
+    case 14:
+        RETURN_IF_ERROR(read_floating_primitive_field(primitive_header, ptr, 
end, &value, next));
+        break;
+    case 8:
+    case 9:
+    case 10:
+        RETURN_IF_ERROR(read_decimal_primitive_field(primitive_header, ptr, 
end, &value, next));
+        break;
+    case 15:
+        RETURN_IF_ERROR(read_binary_primitive_field(ptr, end, &value, 
string_values, next));
+        break;
+    case 16:
+        RETURN_IF_ERROR(read_string_primitive_field(ptr, end, &value, next));
+        break;
+    case 20:
+        RETURN_IF_ERROR(read_uuid_primitive_field(ptr, end, &value, next));
+        break;
+    default:
+        return Status::Corruption("Unsupported Parquet VARIANT primitive 
header {}",
+                                  primitive_header);
+    }
+    (*values)[path->build()] = std::move(value);
+    return Status::OK();
+}
+
+Status decode_object_to_variant_map(uint8_t value_header, const uint8_t* ptr, 
const uint8_t* end,
+                                    const VariantMetadata& metadata, 
PathInDataBuilder* path,
+                                    VariantMap* values, 
std::deque<std::string>* string_values,
+                                    const uint8_t** next) {
+    VariantObjectLayout layout;
+    RETURN_IF_ERROR(read_object_layout(value_header, ptr, end, metadata, 
&layout));
+
+    if (layout.field_ids.empty()) {
+        RETURN_IF_ERROR(insert_empty_object_marker(path->build(), values));
+    }
+
+    for (uint64_t i = 0; i < layout.field_ids.size(); ++i) {
+        const uint8_t* child_begin = layout.fields + layout.field_offsets[i];
+        const uint8_t* child_end = layout.fields + layout.field_ends[i];
+        const uint8_t* child_next = nullptr;
+        path->append(metadata.dictionary[layout.field_ids[i]], false);
+        RETURN_IF_ERROR(decode_value_to_variant_map(child_begin, child_end, 
metadata, path, values,
+                                                    string_values, 
&child_next));
+        path->pop_back();
+        if (child_next != child_end) {
+            return Status::Corruption("Invalid Parquet VARIANT object child 
value length");
+        }
+    }
+    *next = layout.fields + layout.total_size;
+    return Status::OK();
+}
+
+void move_variant_map_to_field(VariantMap&& element_values, FieldWithDataType* 
value) {
+    if (element_values.size() == 1 && element_values.begin()->first.empty()) {
+        *value = std::move(element_values.begin()->second);
+        return;
+    }
+    value->field = 
Field::create_field<TYPE_VARIANT>(std::move(element_values));
+    fill_field_type_info(value);
+}
+
+Status decode_array_element_to_field(const uint8_t* ptr, const uint8_t* end,
+                                     const VariantMetadata& metadata, 
FieldWithDataType* value,
+                                     std::deque<std::string>* string_values, 
const uint8_t** next) {
+    RETURN_IF_ERROR(require_available(ptr, end, 1, "array child value"));
+    const uint8_t value_metadata = *ptr++;
+    const uint8_t basic_type = value_metadata & 0x03;
+    const uint8_t value_header = value_metadata >> 2;
+
+    if (basic_type == 0) {
+        VariantMap element_values;
+        PathInDataBuilder element_path;
+        RETURN_IF_ERROR(decode_primitive_to_variant_map(value_header, ptr, 
end, metadata,
+                                                        &element_path, 
&element_values,
+                                                        string_values, next));
+        move_variant_map_to_field(std::move(element_values), value);
+        return Status::OK();
+    }
+
+    if (basic_type == 1) {
+        const size_t size = value_header;
+        RETURN_IF_ERROR(require_available(ptr, end, size, "short string 
value"));
+        std::string_view data(reinterpret_cast<const char*>(ptr), size);
+        RETURN_IF_ERROR(require_valid_utf8(data, "short string value"));
+        value->field = Field::create_field<TYPE_STRING>(String(data));
+        fill_field_type_info(value);
+        *next = ptr + size;
+        return Status::OK();
+    }
+
+    if (basic_type == 2 || basic_type == 3) {
+        VariantMap element_values;
+        PathInDataBuilder element_path;
+        RETURN_IF_ERROR(decode_value_to_variant_map(ptr - 1, end, metadata, 
&element_path,
+                                                    &element_values, 
string_values, next));
+        move_variant_map_to_field(std::move(element_values), value);
+        return Status::OK();
+    }
+
+    std::string json;
+    RETURN_IF_ERROR(decode_value(ptr - 1, end, metadata, &json, next));
+    VariantMap element_values;
+    RETURN_IF_ERROR(parse_json_to_variant_map(json, PathInData(), 
&element_values));
+    move_variant_map_to_field(std::move(element_values), value);
+    return Status::OK();
+}
+
+Status decode_array_to_variant_map(uint8_t value_header, const uint8_t* ptr, 
const uint8_t* end,
+                                   const VariantMetadata& metadata, 
PathInDataBuilder* path,
+                                   VariantMap* values, 
std::deque<std::string>* string_values,
+                                   const uint8_t** next) {
+    VariantArrayLayout layout;
+    RETURN_IF_ERROR(read_array_layout(value_header, ptr, end, &layout));
+
+    Array array;
+    array.reserve(layout.field_offsets.size() - 1);
+    for (uint64_t i = 0; i + 1 < layout.field_offsets.size(); ++i) {
+        const uint8_t* child_begin = layout.fields + layout.field_offsets[i];
+        const uint8_t* child_end = layout.fields + layout.field_offsets[i + 1];
+        const uint8_t* child_next = nullptr;
+        FieldWithDataType child;
+        RETURN_IF_ERROR(decode_array_element_to_field(child_begin, child_end, 
metadata, &child,
+                                                      string_values, 
&child_next));
+        if (child_next != child_end) {
+            return Status::Corruption("Invalid Parquet VARIANT array child 
value length");
+        }
+        array.push_back(std::move(child.field));
+    }
+
+    FieldWithDataType value;
+    const size_t elements = array.size();
+    value.field = Field::create_field<TYPE_ARRAY>(std::move(array));
+    fill_field_type_info(&value);
+    if (value.base_scalar_type_id == INVALID_TYPE) {

Review Comment:
   When a residual VARIANT array cannot be represented with one scalar element 
type, this fallback replaces every decoded element with `null`. Valid VARIANT 
values such as `[1, "x"]`, `[{}, 1]`, or arrays containing objects reach 
`fill_field_type_info()` with `INVALID_TYPE`, then are stored as JSONB `[null, 
...]`, silently losing the actual element values. The fallback should serialize 
the decoded array contents (or otherwise preserve the VARIANT structure), not 
synthesize a null array except for genuinely null elements; please add coverage 
for heterogeneous and complex residual arrays.



##########
be/src/format/parquet/vparquet_column_reader.cpp:
##########
@@ -103,6 +127,1837 @@ static void fill_array_offset(FieldSchema* field, 
ColumnArray::Offsets64& offset
     }
 }
 
+static constexpr int64_t UNIX_EPOCH_DAYNR = 719528;
+static constexpr int64_t MICROS_PER_SECOND = 1000000;
+
+static int64_t variant_date_value(const VecDateTimeValue& value) {
+    return value.daynr() - UNIX_EPOCH_DAYNR;
+}
+
+static int64_t variant_date_value(const DateV2Value<DateV2ValueType>& value) {
+    return value.daynr() - UNIX_EPOCH_DAYNR;
+}
+
+static int64_t variant_datetime_value(const VecDateTimeValue& value) {
+    int64_t timestamp = 0;
+    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
+    return timestamp * MICROS_PER_SECOND;
+}
+
+static int64_t variant_datetime_value(const DateV2Value<DateTimeV2ValueType>& 
value) {
+    int64_t timestamp = 0;
+    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
+    return timestamp * MICROS_PER_SECOND + value.microsecond();
+}
+
+static int64_t variant_datetime_value(const TimestampTzValue& value) {
+    int64_t timestamp = 0;
+    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
+    return timestamp * MICROS_PER_SECOND + value.microsecond();
+}
+
+static int find_child_idx(const FieldSchema& field, std::string_view name) {
+    for (int i = 0; i < field.children.size(); ++i) {
+        if (field.children[i].lower_case_name == name) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+static bool is_variant_wrapper_typed_value_child(const FieldSchema& field) {
+    auto type = remove_nullable(field.data_type);
+    return type->get_primitive_type() == TYPE_STRUCT || 
type->get_primitive_type() == TYPE_ARRAY;
+}
+
+static bool is_unannotated_variant_value_field(const FieldSchema& field) {
+    // VARIANT residual value is raw binary; annotated strings named value are 
user fields.
+    return field.lower_case_name == "value" && field.physical_type == 
tparquet::Type::BYTE_ARRAY &&
+           !field.parquet_schema.__isset.logicalType &&
+           !field.parquet_schema.__isset.converted_type;
+}
+
+static bool is_unannotated_variant_metadata_field(const FieldSchema& field) {
+    return field.lower_case_name == "metadata" &&
+           field.physical_type == tparquet::Type::BYTE_ARRAY &&
+           !field.parquet_schema.__isset.logicalType &&
+           !field.parquet_schema.__isset.converted_type;
+}
+
+static bool is_variant_wrapper_field(const FieldSchema& field,
+                                     bool 
allow_scalar_typed_value_only_wrapper) {
+    auto type = remove_nullable(field.data_type);
+    if (type->get_primitive_type() != TYPE_STRUCT && 
type->get_primitive_type() != TYPE_VARIANT) {
+        return false;
+    }
+
+    bool has_metadata = false;
+    bool has_value = false;
+    const FieldSchema* typed_value = nullptr;
+    for (const auto& child : field.children) {
+        if (child.lower_case_name == "metadata") {
+            if (!is_unannotated_variant_metadata_field(child)) {
+                return false;
+            }
+            has_metadata = true;
+            continue;
+        }
+        if (child.lower_case_name == "value") {
+            if (!is_unannotated_variant_value_field(child)) {
+                return false;
+            }
+            has_value = true;
+            continue;
+        }
+        if (child.lower_case_name == "typed_value") {
+            typed_value = &child;
+            continue;
+        }
+        return false;
+    }
+    if (has_metadata) {
+        return type->get_primitive_type() == TYPE_VARIANT && (has_value || 
typed_value != nullptr);
+    }
+    if (has_value) {
+        return typed_value != nullptr;
+    }
+    return typed_value != nullptr && (allow_scalar_typed_value_only_wrapper ||
+                                      
is_variant_wrapper_typed_value_child(*typed_value));
+}
+
+static bool is_value_only_variant_wrapper_candidate(const FieldSchema& field) {
+    auto type = remove_nullable(field.data_type);
+    if (type->get_primitive_type() != TYPE_STRUCT && 
type->get_primitive_type() != TYPE_VARIANT) {
+        return false;
+    }
+
+    bool has_value = false;
+    for (const auto& child : field.children) {
+        if (is_unannotated_variant_value_field(child)) {
+            has_value = true;
+            continue;
+        }
+        return false;
+    }
+    return has_value;
+}
+
+static Status get_binary_field(const Field& field, std::string* value, bool* 
present) {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    *present = true;
+    switch (field.get_type()) {
+    case TYPE_STRING:
+        *value = field.get<TYPE_STRING>();
+        return Status::OK();
+    case TYPE_CHAR:
+        *value = field.get<TYPE_CHAR>();
+        return Status::OK();
+    case TYPE_VARCHAR:
+        *value = field.get<TYPE_VARCHAR>();
+        return Status::OK();
+    case TYPE_VARBINARY: {
+        auto ref = field.get<TYPE_VARBINARY>().to_string_ref();
+        value->assign(ref.data, ref.size);
+        return Status::OK();
+    }
+    default:
+        return Status::Corruption("Parquet VARIANT binary field has unexpected 
Doris type {}",
+                                  field.get_type_name());
+    }
+}
+
+static PathInData append_path(const PathInData& prefix, const PathInData& 
suffix) {
+    if (prefix.empty()) {
+        return suffix;
+    }
+    if (suffix.empty()) {
+        return prefix;
+    }
+    PathInDataBuilder builder;
+    builder.append(prefix.get_parts(), false);
+    builder.append(suffix.get_parts(), false);
+    return builder.build();
+}
+
+static Status make_jsonb_field(std::string_view json, FieldWithDataType* 
value) {
+    JsonBinaryValue jsonb_value;
+    RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size()));
+    value->field =
+            Field::create_field<TYPE_JSONB>(JsonbField(jsonb_value.value(), 
jsonb_value.size()));
+    value->base_scalar_type_id = TYPE_JSONB;
+    value->num_dimensions = 0;
+    value->precision = 0;
+    value->scale = 0;
+    return Status::OK();
+}
+
+static std::string make_null_array_json(size_t elements) {
+    std::string json = "[";
+    for (size_t i = 0; i < elements; ++i) {
+        if (i != 0) {
+            json.push_back(',');
+        }
+        json.append("null");
+    }
+    json.push_back(']');
+    return json;
+}
+
+static Status make_empty_object_field(Field* field) {
+    FieldWithDataType value;
+    RETURN_IF_ERROR(make_jsonb_field("{}", &value));
+    *field = std::move(value.field);
+    return Status::OK();
+}
+
+static Status insert_jsonb_value(const PathInData& path, std::string_view json,
+                                 VariantMap* values) {
+    FieldWithDataType value;
+    RETURN_IF_ERROR(make_jsonb_field(json, &value));
+    (*values)[path] = std::move(value);
+    return Status::OK();
+}
+
+static Status insert_empty_object_marker(const PathInData& path, VariantMap* 
values) {
+    return insert_jsonb_value(path, "{}", values);
+}
+
+static bool is_empty_object_marker(const FieldWithDataType& value) {
+    if (value.field.get_type() != TYPE_JSONB) {
+        return false;
+    }
+    const auto& jsonb = value.field.get<TYPE_JSONB>();
+    const JsonbDocument* document = nullptr;
+    Status st =
+            JsonbDocument::checkAndCreateDocument(jsonb.get_value(), 
jsonb.get_size(), &document);
+    if (!st.ok() || document == nullptr || document->getValue() == nullptr ||
+        !document->getValue()->isObject()) {
+        return false;
+    }
+    return document->getValue()->unpack<ObjectVal>()->numElem() == 0;
+}
+
+static Status collect_empty_object_markers(const rapidjson::Value& value, 
PathInDataBuilder* path,
+                                           VariantMap* values) {
+    if (!value.IsObject()) {
+        return Status::OK();
+    }
+    if (value.MemberCount() == 0) {
+        return insert_empty_object_marker(path->build(), values);
+    }
+    for (auto it = value.MemberBegin(); it != value.MemberEnd(); ++it) {
+        if (it->value.IsObject()) {
+            path->append(std::string_view(it->name.GetString(), 
it->name.GetStringLength()), false);
+            RETURN_IF_ERROR(collect_empty_object_markers(it->value, path, 
values));
+            path->pop_back();
+        }
+    }
+    return Status::OK();
+}
+
+static Status add_empty_object_markers_from_json(const std::string& json, 
const PathInData& prefix,
+                                                 VariantMap* values) {
+    if (json.find("{}") == std::string::npos) {
+        return Status::OK();
+    }
+    rapidjson::Document document;
+    document.Parse(json.data(), json.size());
+    if (document.HasParseError()) {
+        return Status::Corruption("Invalid Parquet VARIANT decoded JSON");
+    }
+    PathInDataBuilder path;
+    path.append(prefix.get_parts(), false);
+    return collect_empty_object_markers(document, &path, values);
+}
+
+static Status parse_json_to_variant_map(const std::string& json, const 
PathInData& prefix,
+                                        VariantMap* values) {
+    auto parsed_column = ColumnVariant::create(0, false);
+    ParseConfig parse_config;
+    StringRef json_ref(json.data(), json.size());
+    RETURN_IF_CATCH_EXCEPTION(
+            variant_util::parse_json_to_variant(*parsed_column, json_ref, 
nullptr, parse_config));
+    Field parsed = (*parsed_column)[0];
+    if (!parsed.is_null()) {
+        auto& parsed_values = parsed.get<TYPE_VARIANT>();
+        for (auto& [path, value] : parsed_values) {
+            (*values)[append_path(prefix, path)] = std::move(value);
+        }
+    }
+    RETURN_IF_ERROR(add_empty_object_markers_from_json(json, prefix, values));
+    return Status::OK();
+}
+
+static Status variant_map_to_json(VariantMap values, std::string* json) {
+    auto variant_column = ColumnVariant::create(0, false);
+    RETURN_IF_CATCH_EXCEPTION(
+            
variant_column->insert(Field::create_field<TYPE_VARIANT>(std::move(values))));
+    DataTypeSerDe::FormatOptions options;
+    variant_column->serialize_one_row_to_string(0, json, options);
+    return Status::OK();
+}
+
+static bool path_has_prefix(const PathInData& path, const PathInData& prefix) {
+    const auto& parts = path.get_parts();
+    const auto& prefix_parts = prefix.get_parts();
+    if (parts.size() < prefix_parts.size()) {
+        return false;
+    }
+    for (size_t i = 0; i < prefix_parts.size(); ++i) {
+        if (parts[i] != prefix_parts[i]) {
+            return false;
+        }
+    }
+    return true;
+}
+
+static bool has_descendant_path(const VariantMap& values, const PathInData& 
prefix) {
+    const size_t prefix_size = prefix.get_parts().size();
+    return std::ranges::any_of(values, [&](const auto& entry) {
+        const auto& path = entry.first;
+        return path.get_parts().size() > prefix_size && path_has_prefix(path, 
prefix);
+    });
+}
+
+static void erase_shadowed_empty_object_markers(VariantMap* values,
+                                                const VariantMap& 
shadowing_values) {
+    for (auto it = values->begin(); it != values->end();) {
+        if (is_empty_object_marker(it->second) &&
+            (has_descendant_path(*values, it->first) ||
+             has_descendant_path(shadowing_values, it->first))) {
+            it = values->erase(it);
+            continue;
+        }
+        ++it;
+    }
+}
+
+static void erase_shadowed_empty_object_markers(VariantMap* value_values,
+                                                VariantMap* typed_values) {
+    erase_shadowed_empty_object_markers(value_values, *typed_values);
+    erase_shadowed_empty_object_markers(typed_values, *value_values);
+}
+
+static Status check_no_shredded_value_typed_duplicates(const VariantMap& 
value_values,
+                                                       const VariantMap& 
typed_values,
+                                                       const PathInData& 
prefix) {
+    const size_t prefix_size = prefix.get_parts().size();
+    for (const auto& value_entry : value_values) {
+        const auto& value_path = value_entry.first;
+        if (!path_has_prefix(value_path, prefix)) {
+            continue;
+        }
+        if (value_path.get_parts().size() == prefix_size) {
+            if (is_empty_object_marker(value_entry.second) &&
+                !has_descendant_path(typed_values, value_path)) {
+                continue;
+            }
+            if (!typed_values.empty()) {
+                return Status::Corruption(
+                        "Parquet VARIANT residual value conflicts with 
typed_value at path {}",
+                        value_path.get_path());
+            }
+            continue;
+        }
+        for (const auto& typed_entry : typed_values) {
+            const auto& typed_path = typed_entry.first;
+            if (!path_has_prefix(typed_path, prefix)) {
+                continue;
+            }
+            if (typed_path.get_parts().size() == prefix_size) {
+                if (is_empty_object_marker(typed_entry.second) &&
+                    !has_descendant_path(value_values, typed_path)) {
+                    continue;
+                }
+                return Status::Corruption(
+                        "Parquet VARIANT residual value and typed_value 
contain duplicate field {}",
+                        value_path.get_parts()[prefix_size].key);
+            }
+            if (value_path.get_parts()[prefix_size] == 
typed_path.get_parts()[prefix_size]) {
+                if (value_path == typed_path && 
is_empty_object_marker(value_entry.second) &&
+                    is_empty_object_marker(typed_entry.second)) {
+                    continue;
+                }
+                return Status::Corruption(
+                        "Parquet VARIANT residual value and typed_value 
contain duplicate field {}",
+                        value_path.get_parts()[prefix_size].key);
+            }
+        }
+    }
+    return Status::OK();
+}
+
+static bool has_direct_typed_parent_null(const std::vector<const NullMap*>& 
null_maps, size_t row) {
+    return std::ranges::any_of(null_maps, [&](const NullMap* null_map) {
+        DCHECK_LT(row, null_map->size());
+        return (*null_map)[row];
+    });
+}
+
+static void insert_direct_typed_leaf_range(const IColumn& column, size_t 
start, size_t rows,
+                                           const std::vector<const NullMap*>& 
parent_null_maps,
+                                           IColumn* variant_leaf) {
+    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
+    const IColumn* value_column = &column;
+    const NullMap* leaf_null_map = nullptr;
+    if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(&column)) {
+        value_column = &nullable_column->get_nested_column();
+        leaf_null_map = &nullable_column->get_null_map_data();
+    }
+
+    nullable_leaf.get_nested_column().insert_range_from(*value_column, start, 
rows);
+    auto& null_map = nullable_leaf.get_null_map_data();
+    null_map.reserve(null_map.size() + rows);
+    for (size_t i = 0; i < rows; ++i) {
+        const size_t row = start + i;
+        const bool leaf_is_null = leaf_null_map != nullptr && 
(*leaf_null_map)[row];
+        null_map.push_back(leaf_is_null || 
has_direct_typed_parent_null(parent_null_maps, row));
+    }
+}
+
+static bool is_temporal_variant_leaf_type(PrimitiveType type) {
+    switch (type) {
+    case TYPE_TIMEV2:
+    case TYPE_DATE:
+    case TYPE_DATETIME:
+    case TYPE_DATEV2:
+    case TYPE_DATETIMEV2:
+    case TYPE_TIMESTAMPTZ:
+        return true;
+    default:
+        return false;
+    }
+}
+
+static bool is_floating_point_variant_leaf_type(PrimitiveType type) {
+    switch (type) {
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+        return true;
+    default:
+        return false;
+    }
+}
+
+static bool is_uuid_typed_value_field(const FieldSchema& field_schema);
+static bool contains_uuid_typed_value_field(const FieldSchema& field_schema);
+
+static DataTypePtr direct_variant_leaf_type(const DataTypePtr& data_type) {
+    const auto& type = remove_nullable(data_type);
+    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
+        return std::make_shared<DataTypeInt64>();
+    }
+    return type;
+}
+
+static DataTypePtr direct_variant_leaf_type(const FieldSchema& field_schema) {
+    const auto& type = remove_nullable(field_schema.data_type);
+    if (is_uuid_typed_value_field(field_schema)) {
+        return std::make_shared<DataTypeString>();
+    }
+    if (type->get_primitive_type() == TYPE_ARRAY) {
+        DORIS_CHECK(!field_schema.children.empty());
+        DataTypePtr nested_type = 
direct_variant_leaf_type(field_schema.children[0]);
+        if (field_schema.children[0].data_type->is_nullable()) {
+            nested_type = make_nullable(nested_type);
+        }
+        return std::make_shared<DataTypeArray>(nested_type);
+    }
+    return direct_variant_leaf_type(field_schema.data_type);
+}
+
+static bool contains_temporal_variant_leaf_type(const DataTypePtr& data_type) {
+    const auto& type = remove_nullable(data_type);
+    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
+        return true;
+    }
+    if (type->get_primitive_type() == TYPE_ARRAY) {
+        return contains_temporal_variant_leaf_type(
+                assert_cast<const 
DataTypeArray*>(type.get())->get_nested_type());
+    }
+    return false;
+}
+
+static bool contains_floating_point_variant_leaf_type(const DataTypePtr& 
data_type) {
+    const auto& type = remove_nullable(data_type);
+    if (is_floating_point_variant_leaf_type(type->get_primitive_type())) {
+        return true;
+    }
+    if (type->get_primitive_type() == TYPE_ARRAY) {
+        return contains_floating_point_variant_leaf_type(
+                assert_cast<const 
DataTypeArray*>(type.get())->get_nested_type());
+    }
+    return false;
+}
+
+static int64_t direct_temporal_variant_value(PrimitiveType type, const 
IColumn& column,
+                                             size_t row) {
+    switch (type) {
+    case TYPE_TIMEV2:
+        return static_cast<int64_t>(
+                std::llround(assert_cast<const 
ColumnTimeV2&>(column).get_data()[row]));
+    case TYPE_DATE:
+        return variant_date_value(assert_cast<const 
ColumnDate&>(column).get_data()[row]);
+    case TYPE_DATETIME:
+        return variant_datetime_value(assert_cast<const 
ColumnDateTime&>(column).get_data()[row]);
+    case TYPE_DATEV2:
+        return variant_date_value(assert_cast<const 
ColumnDateV2&>(column).get_data()[row]);
+    case TYPE_DATETIMEV2:
+        return variant_datetime_value(assert_cast<const 
ColumnDateTimeV2&>(column).get_data()[row]);
+    case TYPE_TIMESTAMPTZ:
+        return variant_datetime_value(
+                assert_cast<const ColumnTimeStampTz&>(column).get_data()[row]);
+    default:
+        DORIS_CHECK(false);
+        return 0;
+    }
+}
+
+static void insert_direct_typed_temporal_leaf_range(
+        PrimitiveType type, const IColumn& column, size_t start, size_t rows,
+        const std::vector<const NullMap*>& parent_null_maps, IColumn* 
variant_leaf) {
+    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
+    const IColumn* value_column = &column;
+    const NullMap* leaf_null_map = nullptr;
+    if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(&column)) {
+        value_column = &nullable_column->get_nested_column();
+        leaf_null_map = &nullable_column->get_null_map_data();
+    }
+
+    auto& data = 
assert_cast<ColumnInt64&>(nullable_leaf.get_nested_column()).get_data();
+    data.reserve(data.size() + rows);
+    auto& null_map = nullable_leaf.get_null_map_data();
+    null_map.reserve(null_map.size() + rows);
+    for (size_t i = 0; i < rows; ++i) {
+        const size_t row = start + i;
+        const bool leaf_is_null = leaf_null_map != nullptr && 
(*leaf_null_map)[row];
+        const bool is_null = leaf_is_null || 
has_direct_typed_parent_null(parent_null_maps, row);
+        if (is_null) {
+            data.push_back(0);
+            null_map.push_back(1);
+            continue;
+        }
+        data.push_back(direct_temporal_variant_value(type, *value_column, 
row));
+        null_map.push_back(0);
+    }
+}
+
+static Status insert_direct_typed_uuid_leaf_range(
+        const IColumn& column, size_t start, size_t rows,
+        const std::vector<const NullMap*>& parent_null_maps, IColumn* 
variant_leaf) {
+    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
+    const IColumn* value_column = &column;
+    const NullMap* leaf_null_map = nullptr;
+    if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(&column)) {
+        value_column = &nullable_column->get_nested_column();
+        leaf_null_map = &nullable_column->get_null_map_data();
+    }
+
+    auto& data = assert_cast<ColumnString&>(nullable_leaf.get_nested_column());
+    auto& null_map = nullable_leaf.get_null_map_data();
+    null_map.reserve(null_map.size() + rows);
+    for (size_t i = 0; i < rows; ++i) {
+        const size_t row = start + i;
+        const bool leaf_is_null = leaf_null_map != nullptr && 
(*leaf_null_map)[row];
+        const bool is_null = leaf_is_null || 
has_direct_typed_parent_null(parent_null_maps, row);
+        if (is_null) {
+            data.insert_default();
+            null_map.push_back(1);
+            continue;
+        }
+        StringRef bytes = value_column->get_data_at(row);
+        if (bytes.size != 16) {
+            return Status::Corruption("Parquet VARIANT UUID typed_value has 
invalid length {}",
+                                      bytes.size);
+        }
+        std::string uuid =
+                parquet::format_variant_uuid(reinterpret_cast<const 
uint8_t*>(bytes.data));
+        data.insert_data(uuid.data(), uuid.size());
+        null_map.push_back(0);
+    }
+    return Status::OK();
+}
+
+static void append_json_string(std::string_view value, std::string* json) {
+    auto column = ColumnString::create();
+    VectorBufferWriter writer(*column);
+    writer.write_json_string(value);
+    writer.commit();
+    json->append(column->get_data_at(0).data, column->get_data_at(0).size);
+}
+
+static bool is_column_selected(const FieldSchema& field_schema,
+                               const std::set<uint64_t>& column_ids) {
+    return column_ids.empty() || column_ids.find(field_schema.get_column_id()) 
!= column_ids.end();
+}
+
+static bool has_selected_column(const FieldSchema& field_schema,
+                                const std::set<uint64_t>& column_ids) {
+    if (is_column_selected(field_schema, column_ids)) {
+        return true;
+    }
+    return std::any_of(field_schema.children.begin(), 
field_schema.children.end(),
+                       [&column_ids](const FieldSchema& child) {
+                           return has_selected_column(child, column_ids);
+                       });
+}
+
+static bool is_direct_variant_leaf_type(const DataTypePtr& data_type) {
+    const auto& type = remove_nullable(data_type);
+    switch (type->get_primitive_type()) {
+    case TYPE_BOOLEAN:
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT:
+    case TYPE_LARGEINT:
+    case TYPE_DECIMALV2:
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I:
+    case TYPE_DECIMAL256:
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+    case TYPE_STRING:
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+    case TYPE_VARBINARY:
+        return true;
+    case TYPE_TIMEV2:
+    case TYPE_DATE:
+    case TYPE_DATETIME:
+    case TYPE_DATEV2:
+    case TYPE_DATETIMEV2:
+    case TYPE_TIMESTAMPTZ:
+        return true;
+    case TYPE_ARRAY: {
+        const auto* array_type = assert_cast<const DataTypeArray*>(type.get());
+        return is_direct_variant_leaf_type(array_type->get_nested_type());
+    }
+    default:
+        return false;
+    }
+}
+
+static bool can_direct_read_typed_value(const FieldSchema& field_schema, bool 
allow_variant_wrapper,
+                                        const std::set<uint64_t>& column_ids) {
+    if (!has_selected_column(field_schema, column_ids)) {
+        return true;
+    }
+    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, 
false)) {
+        const int value_idx = find_child_idx(field_schema, "value");
+        const int typed_value_idx = find_child_idx(field_schema, 
"typed_value");
+        return (value_idx < 0 ||
+                !has_selected_column(field_schema.children[value_idx], 
column_ids)) &&
+               typed_value_idx >= 0 &&
+               
can_direct_read_typed_value(field_schema.children[typed_value_idx], false,
+                                           column_ids);
+    }
+
+    const auto& type = remove_nullable(field_schema.data_type);
+    if (type->get_primitive_type() == TYPE_STRUCT) {
+        return std::all_of(field_schema.children.begin(), 
field_schema.children.end(),
+                           [&column_ids](const FieldSchema& child) {
+                               return can_direct_read_typed_value(child, true, 
column_ids);
+                           });
+    }
+    return is_direct_variant_leaf_type(field_schema.data_type);
+}
+
+static bool has_selected_direct_typed_leaf(const FieldSchema& field_schema,
+                                           bool allow_variant_wrapper,
+                                           const std::set<uint64_t>& 
column_ids) {
+    if (!has_selected_column(field_schema, column_ids)) {
+        return false;
+    }
+    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, 
false)) {
+        const int typed_value_idx = find_child_idx(field_schema, 
"typed_value");
+        DCHECK_GE(typed_value_idx, 0);
+        return 
has_selected_direct_typed_leaf(field_schema.children[typed_value_idx], false,
+                                              column_ids);
+    }
+
+    const auto& type = remove_nullable(field_schema.data_type);
+    if (type->get_primitive_type() == TYPE_STRUCT) {
+        return std::any_of(field_schema.children.begin(), 
field_schema.children.end(),
+                           [&column_ids](const FieldSchema& child) {
+                               return has_selected_direct_typed_leaf(child, 
true, column_ids);
+                           });
+    }
+    return is_direct_variant_leaf_type(field_schema.data_type);
+}
+
+static bool can_use_direct_typed_only_value(const FieldSchema& variant_field,
+                                            const std::set<uint64_t>& 
column_ids) {
+    const int value_idx = find_child_idx(variant_field, "value");
+    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
+    return (value_idx < 0 || 
!has_selected_column(variant_field.children[value_idx], column_ids)) &&
+           typed_value_idx >= 0 &&
+           
has_selected_direct_typed_leaf(variant_field.children[typed_value_idx], false,
+                                          column_ids) &&
+           
can_direct_read_typed_value(variant_field.children[typed_value_idx], false, 
column_ids);
+}
+
+static DataTypePtr make_variant_struct_reader_type(const FieldSchema& field) {
+    DataTypes child_types;
+    Strings child_names;
+    child_types.reserve(field.children.size());
+    child_names.reserve(field.children.size());
+    for (const auto& child : field.children) {
+        child_types.push_back(make_nullable(child.data_type));
+        child_names.push_back(child.name);
+    }
+    return std::make_shared<DataTypeStruct>(child_types, child_names);
+}
+
+static ColumnPtr make_variant_struct_read_column(const FieldSchema& field,
+                                                 const DataTypePtr& 
variant_struct_type) {
+    if (field.data_type->is_nullable()) {
+        return make_nullable(variant_struct_type)->create_column();
+    }
+    return variant_struct_type->create_column();
+}
+
+static void fill_variant_field_info(FieldWithDataType* value) {
+    FieldInfo info;
+    variant_util::get_field_info(value->field, &info);
+    DCHECK_LE(info.num_dimensions, std::numeric_limits<uint8_t>::max());
+    value->base_scalar_type_id = info.scalar_type_id;
+    value->num_dimensions = static_cast<uint8_t>(info.num_dimensions);
+}
+
+static void fill_variant_leaf_type_info(const DataTypePtr& data_type, 
FieldWithDataType* value) {
+    auto leaf_type = remove_nullable(data_type);
+    size_t num_dimensions = 0;
+    while (leaf_type->get_primitive_type() == TYPE_ARRAY) {
+        ++num_dimensions;
+        leaf_type = remove_nullable(
+                assert_cast<const 
DataTypeArray*>(leaf_type.get())->get_nested_type());
+    }
+    DCHECK_LE(num_dimensions, std::numeric_limits<uint8_t>::max());
+    if (value->base_scalar_type_id == INVALID_TYPE) {
+        value->base_scalar_type_id = leaf_type->get_primitive_type();
+    }
+    if (value->num_dimensions == 0 && num_dimensions > 0) {
+        value->num_dimensions = static_cast<uint8_t>(num_dimensions);
+    }
+    if (is_decimal(leaf_type->get_primitive_type())) {
+        value->precision = leaf_type->get_precision();
+        value->scale = leaf_type->get_scale();
+    }
+}
+
+static Status fill_floating_point_variant_field(const Field& field, 
FieldWithDataType* value) {
+    value->field = field;
+    fill_variant_field_info(value);
+    return Status::OK();
+}
+
+static Status fill_floating_point_variant_field(PrimitiveType type, const 
Field& field,
+                                                FieldWithDataType* value) {
+    DORIS_CHECK(type == TYPE_FLOAT || type == TYPE_DOUBLE);
+    return fill_floating_point_variant_field(field, value);
+}
+
+static bool is_uuid_typed_value_field(const FieldSchema& field_schema) {
+    return field_schema.parquet_schema.__isset.logicalType &&
+           field_schema.parquet_schema.logicalType.__isset.UUID;
+}
+
+static bool contains_uuid_typed_value_field(const FieldSchema& field_schema) {
+    return is_uuid_typed_value_field(field_schema) ||
+           std::any_of(
+                   field_schema.children.begin(), field_schema.children.end(),
+                   [](const FieldSchema& child) { return 
contains_uuid_typed_value_field(child); });
+}
+
+static Status uuid_field_to_string(const Field& field, std::string* uuid) {
+    StringRef bytes;
+    switch (field.get_type()) {
+    case TYPE_STRING:
+        bytes = StringRef(field.get<TYPE_STRING>());
+        break;
+    case TYPE_CHAR:
+        bytes = StringRef(field.get<TYPE_CHAR>());
+        break;
+    case TYPE_VARCHAR:
+        bytes = StringRef(field.get<TYPE_VARCHAR>());
+        break;
+    case TYPE_VARBINARY:
+        bytes = field.get<TYPE_VARBINARY>().to_string_ref();
+        break;
+    default:
+        return Status::Corruption("Parquet VARIANT UUID typed_value has 
unexpected Doris type {}",
+                                  field.get_type_name());
+    }
+    if (bytes.size != 16) {
+        return Status::Corruption("Parquet VARIANT UUID typed_value has 
invalid length {}",
+                                  bytes.size);
+    }
+    *uuid = parquet::format_variant_uuid(reinterpret_cast<const 
uint8_t*>(bytes.data));
+    return Status::OK();
+}
+
+static Status fill_uuid_variant_field(const Field& field, FieldWithDataType* 
value) {
+    std::string uuid;
+    RETURN_IF_ERROR(uuid_field_to_string(field, &uuid));
+    value->field = Field::create_field<TYPE_STRING>(std::move(uuid));
+    value->base_scalar_type_id = TYPE_STRING;
+    return Status::OK();
+}
+
+static Status fill_temporal_variant_field(PrimitiveType type, const Field& 
field,
+                                          FieldWithDataType* value) {
+    switch (type) {
+    case TYPE_TIMEV2:
+        value->field = Field::create_field<TYPE_BIGINT>(
+                static_cast<int64_t>(std::llround(field.get<TYPE_TIMEV2>())));
+        value->base_scalar_type_id = TYPE_BIGINT;
+        return Status::OK();
+    case TYPE_DATE:
+        value->field = 
Field::create_field<TYPE_BIGINT>(variant_date_value(field.get<TYPE_DATE>()));
+        value->base_scalar_type_id = TYPE_BIGINT;
+        return Status::OK();
+    case TYPE_DATETIME:
+        value->field = Field::create_field<TYPE_BIGINT>(
+                variant_datetime_value(field.get<TYPE_DATETIME>()));
+        value->base_scalar_type_id = TYPE_BIGINT;
+        return Status::OK();
+    case TYPE_DATEV2:
+        value->field =
+                
Field::create_field<TYPE_BIGINT>(variant_date_value(field.get<TYPE_DATEV2>()));
+        value->base_scalar_type_id = TYPE_BIGINT;
+        return Status::OK();
+    case TYPE_DATETIMEV2:
+        value->field = Field::create_field<TYPE_BIGINT>(
+                variant_datetime_value(field.get<TYPE_DATETIMEV2>()));
+        value->base_scalar_type_id = TYPE_BIGINT;
+        return Status::OK();
+    case TYPE_TIMESTAMPTZ:
+        value->field = Field::create_field<TYPE_BIGINT>(
+                variant_datetime_value(field.get<TYPE_TIMESTAMPTZ>()));
+        value->base_scalar_type_id = TYPE_BIGINT;
+        return Status::OK();
+    default:
+        DORIS_CHECK(false);
+        return Status::OK();
+    }
+}
+
+static uint8_t direct_array_dimensions(const DataTypePtr& data_type) {
+    uint8_t num_dimensions = 0;
+    auto type = remove_nullable(data_type);
+    while (type->get_primitive_type() == TYPE_ARRAY) {
+        ++num_dimensions;
+        type = remove_nullable(assert_cast<const 
DataTypeArray*>(type.get())->get_nested_type());
+    }
+    return num_dimensions;
+}
+
+static PrimitiveType direct_array_base_scalar_type(const FieldSchema& 
field_schema) {
+    auto leaf_type = remove_nullable(direct_variant_leaf_type(field_schema));
+    while (leaf_type->get_primitive_type() == TYPE_ARRAY) {
+        leaf_type = remove_nullable(
+                assert_cast<const 
DataTypeArray*>(leaf_type.get())->get_nested_type());
+    }
+    return leaf_type->get_primitive_type();
+}
+
+static Status convert_direct_array_value(const FieldSchema& field_schema, 
const Field& field,
+                                         Field* converted) {
+    if (field.is_null()) {
+        *converted = Field();
+        return Status::OK();
+    }
+
+    const auto& type = remove_nullable(field_schema.data_type);
+    if (type->get_primitive_type() == TYPE_ARRAY) {
+        if (field_schema.children.empty()) {
+            return Status::Corruption("Parquet VARIANT array typed_value has 
no element schema");
+        }
+        Array converted_elements;
+        const auto& elements = field.get<TYPE_ARRAY>();
+        converted_elements.reserve(elements.size());
+        for (const auto& element : elements) {
+            Field converted_element;
+            
RETURN_IF_ERROR(convert_direct_array_value(field_schema.children[0], element,
+                                                       &converted_element));
+            converted_elements.push_back(std::move(converted_element));
+        }
+        *converted = 
Field::create_field<TYPE_ARRAY>(std::move(converted_elements));
+        return Status::OK();
+    }
+
+    if (is_uuid_typed_value_field(field_schema)) {
+        FieldWithDataType value;
+        RETURN_IF_ERROR(fill_uuid_variant_field(field, &value));
+        *converted = std::move(value.field);
+        return Status::OK();
+    }
+    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
+        FieldWithDataType value;
+        
RETURN_IF_ERROR(fill_temporal_variant_field(type->get_primitive_type(), field, 
&value));
+        *converted = std::move(value.field);
+        return Status::OK();
+    }
+    if (is_floating_point_variant_leaf_type(type->get_primitive_type())) {
+        FieldWithDataType value;
+        RETURN_IF_ERROR(
+                fill_floating_point_variant_field(type->get_primitive_type(), 
field, &value));
+        *converted = std::move(value.field);
+        return Status::OK();
+    }
+
+    *converted = field;
+    return Status::OK();
+}
+
+static Status insert_direct_typed_array_leaf_range(
+        const FieldSchema& field_schema, const IColumn& column, size_t start, 
size_t rows,
+        const std::vector<const NullMap*>& parent_null_maps, IColumn* 
variant_leaf) {
+    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
+    const IColumn* value_column = &column;
+    const NullMap* leaf_null_map = nullptr;
+    if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(&column)) {
+        value_column = &nullable_column->get_nested_column();
+        leaf_null_map = &nullable_column->get_null_map_data();
+    }
+
+    auto& data = nullable_leaf.get_nested_column();
+    auto& null_map = nullable_leaf.get_null_map_data();
+    null_map.reserve(null_map.size() + rows);
+    for (size_t i = 0; i < rows; ++i) {
+        const size_t row = start + i;
+        const bool leaf_is_null = leaf_null_map != nullptr && 
(*leaf_null_map)[row];
+        const bool is_null = leaf_is_null || 
has_direct_typed_parent_null(parent_null_maps, row);
+        if (is_null) {
+            data.insert_default();
+            null_map.push_back(1);
+            continue;
+        }
+
+        Field field;
+        value_column->get(row, field);
+        Field converted;
+        RETURN_IF_ERROR(convert_direct_array_value(field_schema, field, 
&converted));
+        data.insert(converted);
+        null_map.push_back(0);
+    }
+    return Status::OK();
+}
+
+static Status fill_direct_array_variant_field(const FieldSchema& field_schema, 
const Field& field,
+                                              FieldWithDataType* value, bool* 
present) {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    *present = true;
+    RETURN_IF_ERROR(convert_direct_array_value(field_schema, field, 
&value->field));
+    value->base_scalar_type_id = direct_array_base_scalar_type(field_schema);
+    value->num_dimensions = direct_array_dimensions(field_schema.data_type);
+    return Status::OK();
+}
+
+static Status field_to_variant_field(const FieldSchema& field_schema, const 
Field& field,
+                                     FieldWithDataType* value, bool* present) {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    *present = true;
+    if (is_uuid_typed_value_field(field_schema)) {
+        return fill_uuid_variant_field(field, value);
+    }
+    const DataTypePtr& type = remove_nullable(field_schema.data_type);
+    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
+        return fill_temporal_variant_field(type->get_primitive_type(), field, 
value);
+    }
+    switch (type->get_primitive_type()) {
+    case TYPE_BOOLEAN:
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT:
+    case TYPE_LARGEINT:
+    case TYPE_DECIMALV2:
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I:
+    case TYPE_DECIMAL256:
+    case TYPE_STRING:
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+    case TYPE_VARBINARY:
+    case TYPE_ARRAY:
+        value->field = field;
+        fill_variant_field_info(value);
+        fill_variant_leaf_type_info(type, value);
+        return Status::OK();
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+        return fill_floating_point_variant_field(field, value);
+    default:
+        return Status::Corruption("Unsupported Parquet VARIANT typed_value 
Doris type {}",
+                                  type->get_name());
+    }
+}
+
+static Status typed_value_to_json(const FieldSchema& typed_value_field, const 
Field& field,
+                                  const std::string& metadata, std::string* 
json, bool* present);
+static Status typed_map_to_variant_map(const FieldSchema& typed_value_field, 
const Field& field,
+                                       const std::string& metadata, 
PathInDataBuilder* path,
+                                       VariantMap* values, bool* present,
+                                       std::deque<std::string>* string_values);
+
+static Status serialize_field_to_json(const DataTypePtr& data_type, const 
Field& field,
+                                      std::string* json) {
+    MutableColumnPtr column = data_type->create_column();
+    column->insert(field);
+
+    auto json_column = ColumnString::create();
+    VectorBufferWriter writer(*json_column);
+    auto serde = data_type->get_serde();
+    DataTypeSerDe::FormatOptions options;
+    RETURN_IF_ERROR(serde->serialize_one_cell_to_json(*column, 0, writer, 
options));
+    writer.commit();
+    *json = json_column->get_data_at(0).to_string();
+    return Status::OK();
+}
+
+static Status scalar_typed_value_to_json(const FieldSchema& field_schema, 
const Field& field,
+                                         std::string* json, bool* present) {
+    FieldWithDataType value;
+    RETURN_IF_ERROR(field_to_variant_field(field_schema, field, &value, 
present));
+    if (!*present) {
+        return Status::OK();
+    }
+    if (value.field.is_null()) {
+        *json = "null";
+        return Status::OK();
+    }
+    if (!is_uuid_typed_value_field(field_schema) &&
+        remove_nullable(field_schema.data_type)->get_primitive_type() == 
TYPE_VARBINARY) {
+        return Status::NotSupported(
+                "Parquet VARIANT binary typed_value cannot be serialized to 
JSON");
+    }
+
+    DataTypePtr json_type;
+    if (value.base_scalar_type_id != PrimitiveType::INVALID_TYPE) {
+        json_type = 
DataTypeFactory::instance().create_data_type(value.base_scalar_type_id, false,
+                                                                 
value.precision, value.scale);
+    } else {
+        json_type = remove_nullable(field_schema.data_type);
+    }
+    return serialize_field_to_json(json_type, value.field, json);
+}
+
+static Status resolve_variant_metadata(const FieldSchema& variant_field, const 
Struct& fields,
+                                       const std::string* inherited_metadata, 
std::string* metadata,
+                                       bool* has_metadata) {
+    *has_metadata = false;
+    if (inherited_metadata != nullptr) {
+        *metadata = *inherited_metadata;
+        *has_metadata = true;
+    }
+
+    const int metadata_idx = find_child_idx(variant_field, "metadata");
+    if (metadata_idx >= 0) {
+        bool metadata_present = false;
+        RETURN_IF_ERROR(get_binary_field(fields[metadata_idx], metadata, 
&metadata_present));
+        *has_metadata = metadata_present;
+    }
+    return Status::OK();
+}
+
+static Status variant_typed_value_to_json(const FieldSchema& variant_field, 
const Struct& fields,
+                                          const std::string& metadata, 
std::string* typed_json,
+                                          bool* typed_present) {
+    *typed_present = false;
+    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
+    if (typed_value_idx < 0) {
+        return Status::OK();
+    }
+    return typed_value_to_json(variant_field.children[typed_value_idx], 
fields[typed_value_idx],
+                               metadata, typed_json, typed_present);
+}
+
+static Status variant_residual_value_to_json(const FieldSchema& variant_field, 
const Struct& fields,
+                                             const std::string& metadata, bool 
has_metadata,
+                                             std::string* value_json, bool* 
value_present) {
+    *value_present = false;
+    const int value_idx = find_child_idx(variant_field, "value");
+    if (value_idx < 0) {
+        return Status::OK();
+    }
+
+    std::string value;
+    RETURN_IF_ERROR(get_binary_field(fields[value_idx], &value, 
value_present));
+    if (!*value_present) {
+        return Status::OK();
+    }
+    if (!has_metadata) {
+        return Status::Corruption("Parquet VARIANT value is present without 
metadata");
+    }
+    return parquet::decode_variant_to_json(StringRef(metadata.data(), 
metadata.size()),
+                                           StringRef(value.data(), 
value.size()), value_json);
+}
+
+static Status merge_variant_value_and_typed_json(const std::string& value_json,
+                                                 const std::string& 
typed_json, std::string* json) {
+    VariantMap value_values;
+    RETURN_IF_ERROR(parse_json_to_variant_map(value_json, PathInData(), 
&value_values));
+    VariantMap typed_values;
+    RETURN_IF_ERROR(parse_json_to_variant_map(typed_json, PathInData(), 
&typed_values));
+    erase_shadowed_empty_object_markers(&value_values, &typed_values);
+    auto root_value = value_values.find(PathInData());
+    if (root_value != value_values.end() && 
!is_empty_object_marker(root_value->second)) {
+        return Status::Corruption(
+                "Parquet VARIANT has conflicting non-object value and 
typed_value");
+    }
+    RETURN_IF_ERROR(
+            check_no_shredded_value_typed_duplicates(value_values, 
typed_values, PathInData()));
+    value_values.merge(std::move(typed_values));
+    return variant_map_to_json(std::move(value_values), json);
+}
+
+static Status variant_to_json(const FieldSchema& variant_field, const Field& 
field,
+                              const std::string* inherited_metadata, 
std::string* json,
+                              bool* present) {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+
+    const auto& fields = field.get<TYPE_STRUCT>();
+    std::string metadata;
+    bool has_metadata = false;
+    RETURN_IF_ERROR(resolve_variant_metadata(variant_field, fields, 
inherited_metadata, &metadata,
+                                             &has_metadata));
+
+    std::string typed_json;
+    bool typed_present = false;
+    RETURN_IF_ERROR(variant_typed_value_to_json(variant_field, fields, 
metadata, &typed_json,
+                                                &typed_present));
+
+    std::string value_json;
+    bool value_present = false;
+    RETURN_IF_ERROR(variant_residual_value_to_json(variant_field, fields, 
metadata, has_metadata,
+                                                   &value_json, 
&value_present));
+
+    if (value_present && typed_present) {
+        RETURN_IF_ERROR(merge_variant_value_and_typed_json(value_json, 
typed_json, json));
+        *present = true;
+        return Status::OK();
+    }
+
+    if (typed_present) {
+        *json = std::move(typed_json);
+        *present = true;
+        return Status::OK();
+    }
+    if (value_present) {
+        *json = std::move(value_json);
+        *present = true;
+        return Status::OK();
+    }
+
+    *present = false;
+    return Status::OK();
+}
+
+static Status shredded_field_to_json(const FieldSchema& field_schema, const 
Field& field,
+                                     const std::string& metadata, std::string* 
json, bool* present,
+                                     bool 
allow_scalar_typed_value_only_wrapper) {
+    if (is_variant_wrapper_field(field_schema, 
allow_scalar_typed_value_only_wrapper)) {
+        return variant_to_json(field_schema, field, &metadata, json, present);
+    }
+    if (is_value_only_variant_wrapper_candidate(field_schema)) {
+        Status st = variant_to_json(field_schema, field, &metadata, json, 
present);
+        if (st.ok()) {
+            return st;
+        }
+        if (!st.is<ErrorCode::CORRUPTION>()) {
+            return st;
+        }
+    }
+    return typed_value_to_json(field_schema, field, metadata, json, present);
+}
+
+static Status typed_array_to_json(const FieldSchema& typed_value_field, const 
Field& field,
+                                  const std::string& metadata, std::string* 
json, bool* present) {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    if (typed_value_field.children.empty()) {
+        return Status::Corruption("Parquet VARIANT array typed_value has no 
element schema");
+    }
+
+    const auto& elements = field.get<TYPE_ARRAY>();
+    const auto& element_schema = typed_value_field.children[0];
+    json->clear();
+    json->push_back('[');
+    for (size_t i = 0; i < elements.size(); ++i) {
+        if (i != 0) {
+            json->push_back(',');
+        }
+        std::string element_json;
+        bool element_present = false;
+        RETURN_IF_ERROR(shredded_field_to_json(element_schema, elements[i], 
metadata, &element_json,
+                                               &element_present, true));
+        if (!element_present) {
+            if (elements[i].is_null()) {
+                json->append("null");
+                continue;
+            }
+            return Status::Corruption("Parquet VARIANT array element is 
missing");
+        }
+        json->append(element_json);
+    }
+    json->push_back(']');
+    *present = true;
+    return Status::OK();
+}
+
+static Status typed_struct_to_json(const FieldSchema& typed_value_field, const 
Field& field,
+                                   const std::string& metadata, std::string* 
json, bool* present) {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+
+    const auto& fields = field.get<TYPE_STRUCT>();
+    json->clear();
+    json->push_back('{');
+    bool first = true;
+    for (int i = 0; i < typed_value_field.children.size(); ++i) {
+        std::string child_json;
+        bool child_present = false;
+        RETURN_IF_ERROR(shredded_field_to_json(typed_value_field.children[i], 
fields[i], metadata,
+                                               &child_json, &child_present, 
false));
+        if (!child_present) {
+            continue;
+        }
+        if (!first) {
+            json->push_back(',');
+        }
+        append_json_string(typed_value_field.children[i].name, json);
+        json->push_back(':');
+        json->append(child_json);
+        first = false;
+    }
+    json->push_back('}');
+    *present = true;
+    return Status::OK();
+}
+
+static Status typed_value_to_json(const FieldSchema& typed_value_field, const 
Field& field,
+                                  const std::string& metadata, std::string* 
json, bool* present) {
+    const DataTypePtr& typed_type = 
remove_nullable(typed_value_field.data_type);
+    switch (typed_type->get_primitive_type()) {
+    case TYPE_STRUCT:
+        return typed_struct_to_json(typed_value_field, field, metadata, json, 
present);
+    case TYPE_ARRAY:
+        return typed_array_to_json(typed_value_field, field, metadata, json, 
present);
+    case TYPE_MAP: {
+        VariantMap values;
+        PathInDataBuilder path;
+        std::deque<std::string> string_values;
+        RETURN_IF_ERROR(typed_map_to_variant_map(typed_value_field, field, 
metadata, &path, &values,
+                                                 present, &string_values));
+        if (!*present) {
+            return Status::OK();
+        }
+        return variant_map_to_json(std::move(values), json);
+    }
+    default:
+        return scalar_typed_value_to_json(typed_value_field, field, json, 
present);
+    }
+}
+
+static Status typed_value_to_variant_map(const FieldSchema& typed_value_field, 
const Field& field,
+                                         const std::string& metadata, 
PathInDataBuilder* path,
+                                         VariantMap* values, bool* present,
+                                         std::deque<std::string>* 
string_values);
+
+static Status variant_to_variant_map(const FieldSchema& variant_field, const 
Field& field,
+                                     const std::string* inherited_metadata, 
PathInDataBuilder* path,
+                                     VariantMap* values, bool* present,
+                                     std::deque<std::string>* string_values) {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    const auto& fields = field.get<TYPE_STRUCT>();
+    const int metadata_idx = find_child_idx(variant_field, "metadata");
+    const int value_idx = find_child_idx(variant_field, "value");
+    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
+
+    std::string metadata;
+    bool has_metadata = false;
+    if (inherited_metadata != nullptr) {
+        metadata = *inherited_metadata;
+        has_metadata = true;
+    }
+    if (metadata_idx >= 0) {
+        bool metadata_present = false;
+        RETURN_IF_ERROR(get_binary_field(fields[metadata_idx], &metadata, 
&metadata_present));
+        has_metadata = metadata_present;
+    }
+
+    VariantMap value_values;
+    bool value_present = false;
+    const PathInData current_path = path->build();
+    if (value_idx >= 0) {
+        std::string value;
+        RETURN_IF_ERROR(get_binary_field(fields[value_idx], &value, 
&value_present));
+        if (value_present) {
+            if (!has_metadata) {
+                return Status::Corruption("Parquet VARIANT value is present 
without metadata");
+            }
+            RETURN_IF_ERROR(parquet::decode_variant_to_variant_map(
+                    StringRef(metadata.data(), metadata.size()),
+                    StringRef(value.data(), value.size()), current_path, 
&value_values,
+                    string_values));
+        }
+    }
+
+    VariantMap typed_values;
+    bool typed_present = false;
+    if (typed_value_idx >= 0) {
+        
RETURN_IF_ERROR(typed_value_to_variant_map(variant_field.children[typed_value_idx],
+                                                   fields[typed_value_idx], 
metadata, path,
+                                                   &typed_values, 
&typed_present, string_values));
+    }
+
+    erase_shadowed_empty_object_markers(&value_values, &typed_values);
+    auto current_value = value_values.find(current_path);
+    if (value_present && typed_present && current_value != value_values.end() 
&&
+        !is_empty_object_marker(current_value->second)) {
+        return Status::Corruption(
+                "Parquet VARIANT has conflicting non-object value and 
typed_value");
+    }
+    RETURN_IF_ERROR(
+            check_no_shredded_value_typed_duplicates(value_values, 
typed_values, current_path));
+    values->merge(std::move(value_values));
+    values->merge(std::move(typed_values));
+    *present = value_present || typed_present;
+    return Status::OK();
+}
+
+static Status shredded_field_to_variant_map(const FieldSchema& field_schema, 
const Field& field,
+                                            const std::string& metadata, 
PathInDataBuilder* path,
+                                            VariantMap* values, bool* present,
+                                            std::deque<std::string>* 
string_values) {
+    if (is_variant_wrapper_field(field_schema, false)) {
+        return variant_to_variant_map(field_schema, field, &metadata, path, 
values, present,
+                                      string_values);
+    }
+    if (is_value_only_variant_wrapper_candidate(field_schema)) {
+        Status st = variant_to_variant_map(field_schema, field, &metadata, 
path, values, present,
+                                           string_values);
+        if (st.ok()) {
+            return st;
+        }
+        if (!st.is<ErrorCode::CORRUPTION>()) {
+            return st;
+        }
+    }
+    return typed_value_to_variant_map(field_schema, field, metadata, path, 
values, present,
+                                      string_values);
+}
+
+static Status append_typed_field_to_variant_map(const FieldSchema& 
typed_value_field,
+                                                const Field& field, 
PathInDataBuilder* path,
+                                                VariantMap* values, bool* 
present) {
+    FieldWithDataType value;
+    RETURN_IF_ERROR(field_to_variant_field(typed_value_field, field, &value, 
present));
+    if (*present) {
+        (*values)[path->build()] = std::move(value);
+    }
+    return Status::OK();
+}
+
+static void move_variant_map_to_field(VariantMap&& element_values, 
FieldWithDataType* value) {
+    if (element_values.size() == 1 && element_values.begin()->first.empty()) {
+        *value = std::move(element_values.begin()->second);
+        return;
+    }
+    value->field = 
Field::create_field<TYPE_VARIANT>(std::move(element_values));
+    fill_variant_field_info(value);
+}
+
+static Status typed_array_to_variant_map(const FieldSchema& typed_value_field, 
const Field& field,
+                                         const std::string& metadata, 
PathInDataBuilder* path,
+                                         VariantMap* values, bool* present,
+                                         std::deque<std::string>* 
string_values) {
+    if ((contains_uuid_typed_value_field(typed_value_field) ||
+         contains_temporal_variant_leaf_type(typed_value_field.data_type) ||
+         
contains_floating_point_variant_leaf_type(typed_value_field.data_type)) &&
+        is_direct_variant_leaf_type(typed_value_field.data_type)) {
+        FieldWithDataType value;
+        RETURN_IF_ERROR(fill_direct_array_variant_field(typed_value_field, 
field, &value, present));
+        if (*present) {
+            (*values)[path->build()] = std::move(value);
+        }
+        return Status::OK();
+    }
+    if (is_direct_variant_leaf_type(typed_value_field.data_type)) {
+        return append_typed_field_to_variant_map(typed_value_field, field, 
path, values, present);
+    }
+
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    if (typed_value_field.children.empty()) {
+        return Status::Corruption("Parquet VARIANT array typed_value has no 
element schema");
+    }
+
+    const auto& elements = field.get<TYPE_ARRAY>();
+    const auto& element_schema = typed_value_field.children[0];
+    Array array;
+    array.reserve(elements.size());
+    for (const auto& element : elements) {
+        VariantMap element_values;
+        bool element_present = false;
+        PathInDataBuilder element_path;
+        RETURN_IF_ERROR(shredded_field_to_variant_map(element_schema, element, 
metadata,
+                                                      &element_path, 
&element_values,
+                                                      &element_present, 
string_values));
+        if (!element_present) {
+            if (element.is_null()) {
+                array.push_back(Field());
+                continue;
+            }
+            return Status::Corruption("Parquet VARIANT array element is 
missing");
+        }
+
+        FieldWithDataType element_value;
+        move_variant_map_to_field(std::move(element_values), &element_value);
+        array.push_back(std::move(element_value.field));
+    }
+
+    FieldWithDataType value;
+    const size_t elements_count = array.size();
+    value.field = Field::create_field<TYPE_ARRAY>(std::move(array));
+    fill_variant_field_info(&value);

Review Comment:
   The typed/shredded array path has the same data-loss fallback: after each 
element is reconstructed into `array`, any array whose inferred 
`base_scalar_type_id` is `INVALID_TYPE` is converted to `[null, ...]`. A valid 
typed VARIANT array with struct/list/map elements or mixed element kinds will 
therefore return the correct length but all element values as null. Please 
preserve the reconstructed element values when materializing JSONB and add 
typed-array coverage for non-null complex/mixed elements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to