eldenmoon commented on code in PR #63192:
URL: https://github.com/apache/doris/pull/63192#discussion_r3232446649


##########
be/src/format/parquet/vparquet_column_reader.cpp:
##########
@@ -103,6 +114,410 @@ static void fill_array_offset(FieldSchema* field, 
ColumnArray::Offsets64& offset
     }
 }
 
+static constexpr int64_t UNIX_EPOCH_DAYNR = 719528;
+static constexpr int64_t MICROS_PER_SECOND = 1000000;
+
+static int64_t variant_date_value(const VecDateTimeValue& value) {
+    return value.daynr() - UNIX_EPOCH_DAYNR;
+}
+
+static int64_t variant_date_value(const DateV2Value<DateV2ValueType>& value) {
+    return value.daynr() - UNIX_EPOCH_DAYNR;
+}
+
+static int64_t variant_datetime_value(const VecDateTimeValue& value) {
+    int64_t timestamp = 0;
+    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
+    return timestamp * MICROS_PER_SECOND;
+}
+
+static int64_t variant_datetime_value(const DateV2Value<DateTimeV2ValueType>& 
value) {
+    int64_t timestamp = 0;
+    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
+    return timestamp * MICROS_PER_SECOND + value.microsecond();
+}
+
+static int64_t variant_datetime_value(const TimestampTzValue& value) {
+    int64_t timestamp = 0;
+    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
+    return timestamp * MICROS_PER_SECOND + value.microsecond();
+}
+
+static int find_child_idx(const FieldSchema& field, std::string_view name) {
+    for (int i = 0; i < field.children.size(); ++i) {
+        if (field.children[i].lower_case_name == name) {
+            return i;
+        }
+    }
+    return -1;
+}
+
+static bool is_variant_wrapper_field(const FieldSchema& field) {
+    auto type = remove_nullable(field.data_type);
+    if (type->get_primitive_type() != TYPE_STRUCT && 
type->get_primitive_type() != TYPE_VARIANT) {
+        return false;
+    }
+
+    bool has_metadata = false;
+    bool has_value = false;
+    bool has_typed_value = false;
+    for (const auto& child : field.children) {
+        if (child.lower_case_name == "metadata") {
+            if (child.physical_type != tparquet::Type::BYTE_ARRAY) {
+                return false;
+            }
+            has_metadata = true;
+            continue;
+        }
+        if (child.lower_case_name == "value") {
+            if (child.physical_type != tparquet::Type::BYTE_ARRAY) {
+                return false;
+            }
+            has_value = true;
+            continue;
+        }
+        if (child.lower_case_name == "typed_value") {
+            has_typed_value = true;
+            continue;
+        }
+        return false;
+    }
+    return has_typed_value || (has_metadata && has_value);
+}
+
+static const IColumn& remove_nullable_column(const IColumn& column) {
+    if (const auto* nullable = check_and_get_column<ColumnNullable>(&column)) {
+        return nullable->get_nested_column();
+    }
+    return column;
+}
+
+static Status get_binary_field(const Field& field, std::string* value, bool* 
present) {
+    if (field.is_null()) {
+        *present = false;
+        return Status::OK();
+    }
+    *present = true;
+    switch (field.get_type()) {
+    case TYPE_STRING:
+        *value = field.get<TYPE_STRING>();
+        return Status::OK();
+    case TYPE_CHAR:
+        *value = field.get<TYPE_CHAR>();
+        return Status::OK();
+    case TYPE_VARCHAR:
+        *value = field.get<TYPE_VARCHAR>();
+        return Status::OK();
+    case TYPE_VARBINARY: {
+        auto ref = field.get<TYPE_VARBINARY>().to_string_ref();
+        value->assign(ref.data, ref.size);
+        return Status::OK();
+    }
+    default:
+        return Status::Corruption("Parquet VARIANT binary field has unexpected 
Doris type {}",
+                                  field.get_type_name());
+    }
+}
+
+static PathInData append_path(const PathInData& prefix, const PathInData& 
suffix) {
+    if (prefix.empty()) {
+        return suffix;
+    }
+    if (suffix.empty()) {
+        return prefix;
+    }
+    PathInDataBuilder builder;
+    builder.append(prefix.get_parts(), false);
+    builder.append(suffix.get_parts(), false);
+    return builder.build();
+}
+
+static Status parse_json_to_variant_map(const std::string& json, const 
PathInData& prefix,
+                                        VariantMap* values) {
+    auto parsed_column = ColumnVariant::create(0, false);
+    ParseConfig parse_config;
+    StringRef json_ref(json.data(), json.size());
+    RETURN_IF_CATCH_EXCEPTION(
+            variant_util::parse_json_to_variant(*parsed_column, json_ref, 
nullptr, parse_config));
+    Field parsed = (*parsed_column)[0];
+    auto& parsed_values = parsed.get<TYPE_VARIANT>();
+    for (auto& [path, value] : parsed_values) {
+        (*values)[append_path(prefix, path)] = std::move(value);
+    }
+    return Status::OK();
+}
+
+static bool is_direct_variant_leaf_type(const DataTypePtr& data_type) {
+    const auto& type = remove_nullable(data_type);
+    switch (type->get_primitive_type()) {
+    case TYPE_BOOLEAN:
+    case TYPE_TINYINT:
+    case TYPE_SMALLINT:
+    case TYPE_INT:
+    case TYPE_BIGINT:
+    case TYPE_LARGEINT:
+    case TYPE_DECIMALV2:
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I:
+    case TYPE_DECIMAL256:
+    case TYPE_STRING:
+    case TYPE_CHAR:
+    case TYPE_VARCHAR:
+        return true;
+    case TYPE_ARRAY: {
+        const auto* array_type = assert_cast<const DataTypeArray*>(type.get());
+        return is_direct_variant_leaf_type(array_type->get_nested_type());
+    }
+    default:
+        return false;
+    }
+}
+
+static bool can_direct_read_typed_value(const FieldSchema& field_schema) {
+    if (is_variant_wrapper_field(field_schema)) {
+        const int value_idx = find_child_idx(field_schema, "value");

Review Comment:
   Addressed in current head `448355b9ec5afaa1afb2b0910fd3a6825e1007cf`. The 
typed-only direct-read path now keeps wrapper detection context-aware: the 
top-level `typed_value` object-field container is entered with 
`allow_variant_wrapper=false`, so user keys named `typed_value` or `value` are 
preserved as object fields instead of being unwrapped as structural wrapper 
fields. Added BE UT coverage in 
`ParquetVariantReaderTest.DirectTypedOnlyKeepsStructuralNameUserKeys`. Verified 
locally with `./run-be-ut.sh --run --filter='ParquetVariantReaderTest.*'` and 
`./run-regression-test.sh --run --conf tmp/regression-conf.auto.groovy -d 
external_table_p0/tvf -s test_local_tvf_iceberg_variant`.



##########
be/src/format/parquet/vparquet_column_reader.cpp:
##########
@@ -1001,6 +1441,142 @@ Status StructColumnReader::read_column_data(
     return Status::OK();
 }
 
+Status VariantColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
+                                 const tparquet::RowGroup& row_group, size_t 
max_buf_size,
+                                 std::unordered_map<int, 
tparquet::OffsetIndex>& col_offsets,
+                                 RuntimeState* state, bool in_collection,
+                                 const std::set<uint64_t>& column_ids,
+                                 const std::set<uint64_t>& filter_column_ids) {
+    _field_schema = field;
+    _variant_struct_field = std::make_unique<FieldSchema>(*field);
+
+    DataTypes child_types;
+    Strings child_names;
+    child_types.reserve(field->children.size());
+    child_names.reserve(field->children.size());
+    for (const auto& child : field->children) {
+        child_types.push_back(make_nullable(child.data_type));
+        child_names.push_back(child.name);
+    }
+    _variant_struct_type = std::make_shared<DataTypeStruct>(child_types, 
child_names);
+    if (field->data_type->is_nullable()) {
+        _variant_struct_type = make_nullable(_variant_struct_type);
+    }
+    _variant_struct_field->data_type = _variant_struct_type;
+
+    RETURN_IF_ERROR(ParquetColumnReader::create(file, 
_variant_struct_field.get(), row_group,
+                                                _row_ranges, _ctz, _io_ctx, 
_struct_reader,
+                                                max_buf_size, col_offsets, 
state, in_collection,
+                                                column_ids, 
filter_column_ids));
+    _struct_reader->set_column_in_nested();
+    return Status::OK();
+}
+
+Status VariantColumnReader::read_column_data(
+        ColumnPtr& doris_column, const DataTypePtr& type,
+        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, 
FilterMap& filter_map,
+        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
+        int64_t real_column_size) {
+    (void)root_node;
+    if (remove_nullable(type)->get_primitive_type() != 
PrimitiveType::TYPE_VARIANT) {
+        return Status::Corruption(
+                "Wrong data type for column '{}', expected Variant type, 
actual type: {}.",
+                _field_schema->name, type->get_name());
+    }
+
+    ColumnPtr struct_column = _variant_struct_type->create_column();
+    const size_t old_struct_rows = struct_column->size();
+    auto const_node = TableSchemaChangeHelper::ConstNode::get_instance();
+    RETURN_IF_ERROR(_struct_reader->read_column_data(struct_column, 
_variant_struct_type,
+                                                     const_node, filter_map, 
batch_size, read_rows,
+                                                     eof, is_dict_filter, 
real_column_size));
+
+    const size_t new_struct_rows = struct_column->size() - old_struct_rows;
+    if (new_struct_rows == 0) {
+        return Status::OK();
+    }
+
+    MutableColumnPtr variant_column_ptr;
+    NullMap* null_map_ptr = nullptr;
+    auto mutable_column = doris_column->assume_mutable();
+    if (doris_column->is_nullable()) {
+        auto* nullable_column = 
assert_cast<ColumnNullable*>(mutable_column.get());
+        variant_column_ptr = nullable_column->get_nested_column_ptr();
+        null_map_ptr = &nullable_column->get_null_map_data();
+    } else {
+        if (_field_schema->data_type->is_nullable()) {
+            return Status::Corruption("Not nullable column has null values in 
parquet file");
+        }
+        variant_column_ptr = std::move(mutable_column);
+    }
+    auto* variant_column = 
assert_cast<ColumnVariant*>(variant_column_ptr.get());
+
+    const IColumn* variant_struct_source = struct_column.get();
+    const NullMap* struct_null_map = nullptr;
+    if (const auto* nullable_struct = 
check_and_get_column<ColumnNullable>(variant_struct_source)) {
+        struct_null_map = &nullable_struct->get_null_map_data();
+        variant_struct_source = &nullable_struct->get_nested_column();
+    }
+    const auto& variant_struct_column = assert_cast<const 
ColumnStruct&>(*variant_struct_source);
+
+    const int value_idx = find_child_idx(*_field_schema, "value");
+    const int typed_value_idx = find_child_idx(*_field_schema, "typed_value");
+    if (value_idx < 0 && typed_value_idx >= 0 &&
+        can_direct_read_typed_value(_field_schema->children[typed_value_idx], 
false)) {
+        MutableColumnPtr batch_variant_column =
+                ColumnVariant::create(variant_column->max_subcolumns_count(),
+                                      variant_column->enable_doc_mode(), 
new_struct_rows + 1);
+        auto* batch_variant = 
assert_cast<ColumnVariant*>(batch_variant_column.get());
+        PathInDataBuilder path;
+        RETURN_IF_ERROR(append_direct_typed_column_to_batch(
+                _field_schema->children[typed_value_idx],
+                variant_struct_column.get_column(typed_value_idx), 
old_struct_rows, new_struct_rows,
+                &path, batch_variant, false));
+        variant_column->insert_range_from(*batch_variant_column, 1, 
new_struct_rows);
+        if (null_map_ptr != nullptr) {
+            for (size_t i = old_struct_rows; i < struct_column->size(); ++i) {
+                null_map_ptr->push_back(struct_null_map != nullptr && 
(*struct_null_map)[i]);
+            }
+        }
+#ifndef NDEBUG
+        doris_column->sanity_check();
+#endif
+        return Status::OK();
+    }
+
+    for (size_t i = old_struct_rows; i < struct_column->size(); ++i) {
+        if (struct_null_map != nullptr && (*struct_null_map)[i]) {
+            if (null_map_ptr != nullptr) {
+                variant_column->insert_default();
+                null_map_ptr->push_back(1);
+                continue;
+            }
+        }
+        VariantMap values;
+        bool present = false;
+        PathInDataBuilder path;
+        RETURN_IF_ERROR(variant_to_variant_map(*_field_schema, 
(*struct_column)[i], nullptr, &path,
+                                               &values, &present));

Review Comment:
   Addressed in current head `448355b9ec5afaa1afb2b0910fd3a6825e1007cf`. The 
reconstruction path now distinguishes a nullable top-level Parquet group null, 
which becomes SQL NULL, from a present required Variant whose `value` and 
`typed_value` payloads are absent, which inserts a root Variant null. Added BE 
UT coverage in `ParquetVariantReaderTest.RequiredMissingPayloadIsVariantNull` 
and `ParquetVariantReaderTest.NullableTopLevelGroupIsSqlNull`. Verified locally 
with `./run-be-ut.sh --run --filter='ParquetVariantReaderTest.*'` and 
`./run-regression-test.sh --run --conf tmp/regression-conf.auto.groovy -d 
external_table_p0/tvf -s test_local_tvf_iceberg_variant`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to