github-actions[bot] commented on code in PR #63192: URL: https://github.com/apache/doris/pull/63192#discussion_r3253254078
########## be/src/format/parquet/parquet_nested_column_utils.cpp: ########## @@ -0,0 +1,540 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_nested_column_utils.h" + +#include <algorithm> +#include <cctype> +#include <string_view> +#include <unordered_map> +#include <utility> + +#include "core/data_type/data_type_nullable.h" +#include "format/parquet/schema_desc.h" + +namespace doris { +namespace { + +enum class NestedPathMode { + NAME, + FIELD_ID, +}; + +void add_column_id_range(const FieldSchema& field_schema, std::set<uint64_t>& column_ids) { + const uint64_t start_id = field_schema.get_column_id(); + const uint64_t max_column_id = field_schema.get_max_column_id(); + for (uint64_t id = start_id; id <= max_column_id; ++id) { + column_ids.insert(id); + } +} + +const FieldSchema* find_child_by_structural_name(const FieldSchema& field_schema, + std::string_view name) { + std::string lower_name(name); + std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), + [](unsigned char c) { return static_cast<char>(std::tolower(c)); }); + for (const auto& child : field_schema.children) { + if (child.name == name || child.lower_case_name == lower_name) { + return &child; + } + } + return nullptr; +} + +const FieldSchema* find_child_by_exact_name(const FieldSchema& field_schema, + std::string_view name) { + for (const auto& child : field_schema.children) { + if (child.name == name) { + return &child; + } + } + return nullptr; +} + +const FieldSchema* find_variant_typed_child_by_key(const FieldSchema& field_schema, + std::string_view key, NestedPathMode mode) { + if (const auto* child = find_child_by_exact_name(field_schema, key)) { + return child; + } + if (mode == NestedPathMode::NAME) { + return nullptr; + } + for (const auto& child : field_schema.children) { + if (child.field_id >= 0 && key == std::to_string(child.field_id)) { + return &child; + } + } + return nullptr; +} + +void add_variant_metadata(const FieldSchema& variant_field, std::set<uint64_t>& column_ids) { + if (const auto* metadata = find_child_by_structural_name(variant_field, "metadata")) { + add_column_id_range(*metadata, column_ids); + } +} + +void add_variant_value(const FieldSchema& variant_field, std::set<uint64_t>& column_ids) { + add_variant_metadata(variant_field, column_ids); + if (const auto* value = find_child_by_structural_name(variant_field, "value")) { + add_column_id_range(*value, column_ids); + } +} + +struct VariantColumnIdExtractionResult { + bool has_child_columns = false; + bool needs_metadata = false; +}; + +using VariantPathMap = std::unordered_map<std::string, std::vector<std::vector<std::string>>>; + +bool is_shredded_variant_field(const FieldSchema& field_schema) { + bool has_value = false; + const FieldSchema* typed_value = nullptr; + for (const auto& child : field_schema.children) { + if (child.lower_case_name == "value") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_value = true; + continue; + } + if (child.lower_case_name == "typed_value") { + typed_value = &child; + continue; + } + return false; + } + if (has_value) { + return true; + } + if (typed_value == nullptr) { + return false; + } + const auto type = remove_nullable(typed_value->data_type); + return type->get_primitive_type() == TYPE_STRUCT || type->get_primitive_type() == TYPE_ARRAY; +} + +bool add_shredded_variant_field_value(const FieldSchema& shredded_field, + std::set<uint64_t>& column_ids) { + if (const auto* value = find_child_by_structural_name(shredded_field, "value")) { + add_column_id_range(*value, column_ids); + return true; + } + return false; +} + +bool is_variant_array_subscript(std::string_view path) { + return !path.empty() && + std::all_of(path.begin(), path.end(), [](unsigned char c) { return std::isdigit(c); }); +} + +bool is_terminal_variant_meta_component(std::string_view path) { + return path == "NULL" || path == "OFFSET"; +} + +const std::vector<std::string>& effective_variant_path(const std::vector<std::string>& raw_path, + std::vector<std::string>& stripped_path) { + if (!raw_path.empty() && is_terminal_variant_meta_component(raw_path.back())) { + stripped_path.assign(raw_path.begin(), raw_path.end() - 1); + return stripped_path; + } + return raw_path; +} + +bool contains_inherited_metadata_value(const FieldSchema& field_schema) { + if (is_shredded_variant_field(field_schema) && + find_child_by_structural_name(field_schema, "value") != nullptr) { + return true; + } + return std::any_of( + field_schema.children.begin(), field_schema.children.end(), + [](const FieldSchema& child) { return contains_inherited_metadata_value(child); }); +} + +VariantColumnIdExtractionResult extract_variant_typed_nested_column_ids( + const FieldSchema& field_schema, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids, NestedPathMode mode); + +void add_variant_typed_path(PrimitiveType field_type, const FieldSchema& field_schema, + const std::vector<std::string>& path, + VariantColumnIdExtractionResult* result, std::set<uint64_t>& column_ids, + VariantPathMap* child_paths) { + if (path.empty()) { + add_column_id_range(field_schema, column_ids); + result->has_child_columns = true; + result->needs_metadata |= contains_inherited_metadata_value(field_schema); + return; + } + + const bool is_list = field_type == PrimitiveType::TYPE_ARRAY; + const bool is_map = field_type == PrimitiveType::TYPE_MAP; + std::vector<std::string> remaining; + std::string child_key; + if (is_list) { + child_key = "*"; + if (!is_variant_array_subscript(path[0])) { + remaining.assign(path.begin(), path.end()); + } else if (path.size() > 1) { + remaining.assign(path.begin() + 1, path.end()); + } + } else if (is_map) { + (*child_paths)["KEYS"].emplace_back(); + child_key = "VALUES"; + if (path.size() > 1) { + remaining.assign(path.begin() + 1, path.end()); + } + } else { + child_key = path[0]; + if (path.size() > 1) { + remaining.assign(path.begin() + 1, path.end()); + } + } + (*child_paths)[child_key].push_back(std::move(remaining)); +} + +std::string variant_typed_child_key(PrimitiveType field_type, const FieldSchema& field_schema, + uint64_t child_index) { + if (field_type == PrimitiveType::TYPE_ARRAY) { + return "*"; + } + if (field_type == PrimitiveType::TYPE_MAP) { + if (child_index == 0) { + return "KEYS"; + } + return child_index == 1 ? "VALUES" : ""; + } + return field_schema.children[child_index].name; +} + +void append_variant_child_paths(const VariantPathMap& paths_by_name, const std::string& key, + std::vector<std::vector<std::string>>& child_paths) { + auto child_paths_it = paths_by_name.find(key); + if (child_paths_it != paths_by_name.end()) { + child_paths.insert(child_paths.end(), child_paths_it->second.begin(), + child_paths_it->second.end()); + } +} + +std::vector<std::vector<std::string>> collect_variant_typed_child_paths( + const VariantPathMap& paths_by_name, const FieldSchema& child, const std::string& child_key, + bool is_list, bool is_map, NestedPathMode mode) { + std::vector<std::vector<std::string>> child_paths; + append_variant_child_paths(paths_by_name, child_key, child_paths); + if (!is_list && !is_map && mode == NestedPathMode::FIELD_ID && child.field_id >= 0) { + const std::string field_id_key = std::to_string(child.field_id); + if (field_id_key != child_key) { + append_variant_child_paths(paths_by_name, field_id_key, child_paths); + } + } + return child_paths; +} + +void extract_variant_typed_child_column_ids( + const FieldSchema& child, const std::vector<std::vector<std::string>>& child_paths, + std::set<uint64_t>& column_ids, NestedPathMode mode, + VariantColumnIdExtractionResult* result) { + const bool needs_full_child = + std::any_of(child_paths.begin(), child_paths.end(), + [](const std::vector<std::string>& path) { return path.empty(); }); + if (needs_full_child) { + add_column_id_range(child, column_ids); + result->has_child_columns = true; + result->needs_metadata |= contains_inherited_metadata_value(child); + return; + } + + auto child_result = + extract_variant_typed_nested_column_ids(child, child_paths, column_ids, mode); + result->has_child_columns |= child_result.has_child_columns; + result->needs_metadata |= child_result.needs_metadata; +} + +VariantColumnIdExtractionResult extract_shredded_variant_field_ids( + const FieldSchema& shredded_field, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids, NestedPathMode mode) { + const auto* typed_value = find_child_by_structural_name(shredded_field, "typed_value"); + VariantColumnIdExtractionResult result; + + for (const auto& raw_path : paths) { + std::vector<std::string> stripped_path; + const auto& path = effective_variant_path(raw_path, stripped_path); + if (path.empty()) { + add_column_id_range(shredded_field, column_ids); + result.has_child_columns = true; + result.needs_metadata |= contains_inherited_metadata_value(shredded_field); + continue; + } + + bool has_selected_columns = add_shredded_variant_field_value(shredded_field, column_ids); + result.needs_metadata |= has_selected_columns; + if (typed_value != nullptr) { + const auto typed_value_type = remove_nullable(typed_value->data_type); + if (typed_value_type->get_primitive_type() != TYPE_STRUCT) { + auto child_result = extract_variant_typed_nested_column_ids(*typed_value, {path}, + column_ids, mode); + if (child_result.has_child_columns) { + column_ids.insert(typed_value->get_column_id()); + result.needs_metadata |= child_result.needs_metadata; + has_selected_columns = true; + } + } else if (const auto* typed_child = + find_variant_typed_child_by_key(*typed_value, path[0], mode)) { + if (path.size() == 1) { + add_column_id_range(*typed_child, column_ids); + result.needs_metadata |= contains_inherited_metadata_value(*typed_child); + column_ids.insert(typed_value->get_column_id()); + has_selected_columns = true; + } else { + std::vector<std::vector<std::string>> child_paths { + std::vector<std::string>(path.begin() + 1, path.end())}; + auto child_result = extract_variant_typed_nested_column_ids( + *typed_child, child_paths, column_ids, mode); + if (child_result.has_child_columns) { + column_ids.insert(typed_value->get_column_id()); + result.needs_metadata |= child_result.needs_metadata; + has_selected_columns = true; + } + } + } + } + result.has_child_columns |= has_selected_columns; + } + + if (result.has_child_columns) { + column_ids.insert(shredded_field.get_column_id()); + } + return result; +} + +VariantColumnIdExtractionResult extract_variant_nested_column_ids( + const FieldSchema& variant_field, const std::vector<std::vector<std::string>>& paths, + std::set<uint64_t>& column_ids, NestedPathMode mode) { + const auto* typed_value = find_child_by_structural_name(variant_field, "typed_value"); + VariantColumnIdExtractionResult result; + + for (const auto& raw_path : paths) { + std::vector<std::string> stripped_path; + const auto& path = effective_variant_path(raw_path, stripped_path); + if (path.empty()) { + add_column_id_range(variant_field, column_ids); + result.has_child_columns = true; + continue; + } + + VariantColumnIdExtractionResult typed_result; + if (typed_value != nullptr) { + const auto typed_value_type = remove_nullable(typed_value->data_type); + if (typed_value_type->get_primitive_type() != TYPE_STRUCT) { + typed_result = extract_variant_typed_nested_column_ids(*typed_value, {path}, Review Comment: This non-STRUCT root `typed_value` branch drops the `needs_metadata` signal returned by `extract_variant_typed_nested_column_ids()`. For a root typed array/map that contains a shredded residual child, e.g. `typed_value: array<struct<metric: { value: binary }>>` queried as `v[0]['metric']['x']`, the recursive extraction selects the nested `value` leaf and returns `needs_metadata=true`, but this branch never adds the top-level `metadata` column. Row-wise reconstruction then tries to decode the residual value without the inherited VARIANT metadata and can fail or decode from missing metadata. This is distinct from the existing root typed-array selection thread because the typed leaf is selected, but its required metadata is not. Please mirror the STRUCT branch by adding `add_variant_metadata(variant_field, column_ids)` when the non-STRUCT typed result needs metadata, with coverage for root typed array/map residual children. ########## be/src/format/parquet/parquet_variant_reader.cpp: ########## @@ -0,0 +1,980 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_variant_reader.h" + +#include <algorithm> +#include <cmath> +#include <cstring> +#include <deque> +#include <iomanip> +#include <limits> +#include <sstream> +#include <string_view> +#include <vector> + +#include "core/column/column_variant.h" +#include "core/value/jsonb_value.h" +#include "exec/common/variant_util.h" + +namespace doris::parquet { + +std::string format_variant_uuid(const uint8_t* ptr) { + static constexpr char hex[] = "0123456789abcdef"; + std::string uuid; + uuid.reserve(36); + for (int i = 0; i < 16; ++i) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + uuid.push_back('-'); + } + uuid.push_back(hex[ptr[i] >> 4]); + uuid.push_back(hex[ptr[i] & 0x0f]); + } + return uuid; +} + +namespace { + +struct VariantMetadata { + std::vector<std::string> dictionary; +}; + +struct VariantObjectLayout { + std::vector<uint64_t> field_ids; + std::vector<uint64_t> field_offsets; + std::vector<uint64_t> field_ends; + const uint8_t* fields = nullptr; + uint64_t total_size = 0; +}; + +struct VariantArrayLayout { + std::vector<uint64_t> field_offsets; + const uint8_t* fields = nullptr; + uint64_t total_size = 0; +}; + +uint64_t read_unsigned_le(const uint8_t* ptr, int size) { + uint64_t value = 0; + for (int i = 0; i < size; ++i) { + value |= static_cast<uint64_t>(ptr[i]) << (i * 8); + } + return value; +} + +int64_t read_signed_le(const uint8_t* ptr, int size) { + uint64_t value = read_unsigned_le(ptr, size); + if (size < 8) { + uint64_t sign_bit = uint64_t {1} << (size * 8 - 1); + if ((value & sign_bit) != 0) { + uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1); + value |= mask; + } + } + return static_cast<int64_t>(value); +} + +__int128 read_signed_int128_le(const uint8_t* ptr) { + unsigned __int128 unsigned_value = 0; + for (int i = 15; i >= 0; --i) { + unsigned_value <<= 8; + unsigned_value |= ptr[i]; + } + static constexpr unsigned __int128 sign_bit = static_cast<unsigned __int128>(1) << 127; + if ((unsigned_value & sign_bit) == 0) { + return static_cast<__int128>(unsigned_value); + } + static constexpr __int128 signed_half_range = static_cast<__int128>(1) << 126; + return (static_cast<__int128>(unsigned_value & (sign_bit - 1)) - signed_half_range) - + signed_half_range; +} + +Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size, + std::string_view context) { + if (ptr > end) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + if (size > static_cast<size_t>(end - ptr)) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return Status::OK(); +} + +Status require_available_entries(const uint8_t* ptr, const uint8_t* end, uint64_t entries, + size_t entry_size, std::string_view context) { + if (entries > std::numeric_limits<size_t>::max() / entry_size) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return require_available(ptr, end, static_cast<size_t>(entries) * entry_size, context); +} + +bool variant_string_less(std::string_view lhs, std::string_view rhs) { + return std::lexicographical_compare( + lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), [](char left, char right) { + return static_cast<unsigned char>(left) < static_cast<unsigned char>(right); + }); +} + +bool is_valid_utf8(std::string_view value) { + const auto* data = reinterpret_cast<const uint8_t*>(value.data()); + const auto* end = data + value.size(); + while (data < end) { + const uint8_t first = *data++; + if (first <= 0x7f) { + continue; + } + + uint32_t code_point = 0; + size_t continuation_bytes = 0; + if (first >= 0xc2 && first <= 0xdf) { + code_point = first & 0x1f; + continuation_bytes = 1; + } else if (first >= 0xe0 && first <= 0xef) { + code_point = first & 0x0f; + continuation_bytes = 2; + } else if (first >= 0xf0 && first <= 0xf4) { + code_point = first & 0x07; + continuation_bytes = 3; + } else { + return false; + } + + if (static_cast<size_t>(end - data) < continuation_bytes) { + return false; + } + for (size_t i = 0; i < continuation_bytes; ++i) { + const uint8_t byte = *data++; + if ((byte & 0xc0) != 0x80) { + return false; + } + code_point = (code_point << 6) | (byte & 0x3f); + } + + if ((continuation_bytes == 2 && code_point < 0x800) || + (continuation_bytes == 3 && code_point < 0x10000) || + (code_point >= 0xd800 && code_point <= 0xdfff) || code_point > 0x10ffff) { + return false; + } + } + return true; +} + +Status require_valid_utf8(std::string_view value, std::string_view context) { + if (!is_valid_utf8(value)) { + return Status::Corruption("Invalid Parquet VARIANT {} UTF-8 string", context); + } + return Status::OK(); +} + +Status validate_array_field_offsets(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::string_view context) { + if (field_offsets.empty() || field_offsets.front() != 0) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + for (size_t i = 0; i < field_offsets.size(); ++i) { + if (field_offsets[i] > total_size) { + return Status::Corruption("Invalid Parquet VARIANT {} field offset {}", context, + field_offsets[i]); + } + if (i > 0 && field_offsets[i] < field_offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + } + return Status::OK(); +} + +Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::vector<uint64_t>* field_ends) { + if (field_offsets.empty()) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + size_t num_elements = field_offsets.size() - 1; + if (num_elements == 0) { + if (total_size != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + return Status::OK(); + } + + std::vector<std::pair<uint64_t, size_t>> physical_offsets; + physical_offsets.reserve(num_elements); + for (size_t i = 0; i < num_elements; ++i) { + if (field_offsets[i] >= total_size) { + return Status::Corruption("Invalid Parquet VARIANT object field offset {}", + field_offsets[i]); + } + physical_offsets.emplace_back(field_offsets[i], i); + } + std::sort(physical_offsets.begin(), physical_offsets.end()); + if (physical_offsets.front().first != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + + field_ends->assign(num_elements, 0); + for (size_t i = 0; i < physical_offsets.size(); ++i) { + if (i > 0 && physical_offsets[i].first == physical_offsets[i - 1].first) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + uint64_t child_end = + i + 1 < physical_offsets.size() ? physical_offsets[i + 1].first : total_size; + (*field_ends)[physical_offsets[i].second] = child_end; + } + return Status::OK(); +} + +void append_json_string(std::string_view value, std::string* json) { + json->push_back('"'); + static constexpr char hex[] = "0123456789abcdef"; + for (unsigned char c : value) { + switch (c) { + case '"': + json->append("\\\""); + break; + case '\\': + json->append("\\\\"); + break; + case '\b': + json->append("\\b"); + break; + case '\f': + json->append("\\f"); + break; + case '\n': + json->append("\\n"); + break; + case '\r': + json->append("\\r"); + break; + case '\t': + json->append("\\t"); + break; + default: + if (c < 0x20) { + json->append("\\u00"); + json->push_back(hex[c >> 4]); + json->push_back(hex[c & 0x0f]); + } else { + json->push_back(static_cast<char>(c)); + } + break; + } + } + json->push_back('"'); +} + +template <typename T> +Status append_floating_json(T value, std::string* json) { + if (!std::isfinite(value)) { + return Status::NotSupported( + "Parquet VARIANT non-finite floating point value is not supported"); + } + std::ostringstream oss; + oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value; + json->append(oss.str()); + return Status::OK(); +} + +std::string int128_to_string(__int128 value) { + if (value == 0) { + return "0"; + } + bool negative = value < 0; + unsigned __int128 unsigned_value = negative ? static_cast<unsigned __int128>(-(value + 1)) + 1 + : static_cast<unsigned __int128>(value); + std::string digits; + while (unsigned_value > 0) { + digits.push_back(static_cast<char>('0' + unsigned_value % 10)); + unsigned_value /= 10; + } + if (negative) { + digits.push_back('-'); + } + std::reverse(digits.begin(), digits.end()); + return digits; +} + +void append_decimal_json(__int128 unscaled, int scale, std::string* json) { + std::string value = int128_to_string(unscaled); + bool negative = !value.empty() && value[0] == '-'; + std::string digits = negative ? value.substr(1) : value; + if (scale == 0) { + json->append(value); + return; + } + if (scale > 0) { + if (digits.size() <= static_cast<size_t>(scale)) { + digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), '0'); + } + digits.insert(digits.end() - scale, '.'); + if (negative) { + json->push_back('-'); + } + json->append(digits); + return; + } + if (negative) { + json->push_back('-'); + } + json->append(digits); + json->append(static_cast<size_t>(-scale), '0'); +} + +Status decode_primitive(uint8_t primitive_header, const uint8_t* ptr, const uint8_t* end, + std::string* json, const uint8_t** next); +Status decode_value(const uint8_t* ptr, const uint8_t* end, const VariantMetadata& metadata, + std::string* json, const uint8_t** next); + +void append_uuid_json(const uint8_t* ptr, std::string* json) { + json->push_back('"'); + json->append(format_variant_uuid(ptr)); + json->push_back('"'); +} + +Status make_jsonb_field(std::string_view json, FieldWithDataType* value) { + JsonBinaryValue jsonb_value; + RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); + value->field = + Field::create_field<TYPE_JSONB>(JsonbField(jsonb_value.value(), jsonb_value.size())); + value->base_scalar_type_id = TYPE_JSONB; + value->num_dimensions = 0; + value->precision = 0; + value->scale = 0; + return Status::OK(); +} + +std::string make_null_array_json(size_t elements) { + std::string json = "["; + for (size_t i = 0; i < elements; ++i) { + if (i != 0) { + json.push_back(','); + } + json.append("null"); + } + json.push_back(']'); + return json; +} + +Status insert_empty_object_marker(const PathInData& path, VariantMap* values) { + FieldWithDataType value; + RETURN_IF_ERROR(make_jsonb_field("{}", &value)); + (*values)[path] = std::move(value); + return Status::OK(); +} + +Status parse_json_to_variant_map(std::string_view json, const PathInData& prefix, + VariantMap* values) { + auto parsed_column = ColumnVariant::create(0, false); + ParseConfig parse_config; + StringRef json_ref(json.data(), json.size()); + RETURN_IF_CATCH_EXCEPTION( + variant_util::parse_json_to_variant(*parsed_column, json_ref, nullptr, parse_config)); + Field parsed = (*parsed_column)[0]; + if (parsed.is_null()) { + (*values)[prefix] = FieldWithDataType {.field = Field()}; + return Status::OK(); + } + + PathInDataBuilder path; + path.append(prefix.get_parts(), false); + for (auto& [parsed_path, value] : parsed.get<TYPE_VARIANT>()) { + path.append(parsed_path.get_parts(), false); + (*values)[path.build()] = std::move(value); + for (size_t i = 0; i < parsed_path.get_parts().size(); ++i) { + path.pop_back(); + } + } + return Status::OK(); +} + +void fill_field_type_info(FieldWithDataType* value) { + FieldInfo info; + variant_util::get_field_info(value->field, &info); + value->base_scalar_type_id = info.scalar_type_id; + value->num_dimensions = static_cast<uint8_t>(info.num_dimensions); + value->precision = info.precision; + value->scale = info.scale; +} + +Status read_array_layout(uint8_t value_header, const uint8_t* ptr, const uint8_t* end, + VariantArrayLayout* layout) { + int field_offset_size = (value_header & 0x03) + 1; + int num_elements_size = (value_header & 0x04) != 0 ? 4 : 1; + + RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "array element count")); + uint64_t num_elements = read_unsigned_le(ptr, num_elements_size); + ptr += num_elements_size; + + RETURN_IF_ERROR(require_available_entries(ptr, end, num_elements + 1, field_offset_size, + "array field offsets")); + layout->field_offsets.resize(num_elements + 1); + for (uint64_t i = 0; i <= num_elements; ++i) { + layout->field_offsets[i] = read_unsigned_le(ptr, field_offset_size); + ptr += field_offset_size; + } + + layout->total_size = layout->field_offsets.back(); + layout->fields = ptr; + RETURN_IF_ERROR( + require_available(layout->fields, end, layout->total_size, "array field values")); + RETURN_IF_ERROR( + validate_array_field_offsets(layout->field_offsets, layout->total_size, "array")); + return Status::OK(); +} + +Status read_object_layout(uint8_t value_header, const uint8_t* ptr, const uint8_t* end, + const VariantMetadata& metadata, VariantObjectLayout* layout) { + int field_offset_size = (value_header & 0x03) + 1; + int field_id_size = ((value_header >> 2) & 0x03) + 1; + int num_elements_size = (value_header & 0x10) != 0 ? 4 : 1; + + RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "object element count")); + uint64_t num_elements = read_unsigned_le(ptr, num_elements_size); + ptr += num_elements_size; + + RETURN_IF_ERROR( + require_available_entries(ptr, end, num_elements, field_id_size, "object field ids")); + layout->field_ids.resize(num_elements); + for (uint64_t i = 0; i < num_elements; ++i) { + layout->field_ids[i] = read_unsigned_le(ptr, field_id_size); + ptr += field_id_size; + if (layout->field_ids[i] >= metadata.dictionary.size()) { + return Status::Corruption("Invalid Parquet VARIANT object field id {}", + layout->field_ids[i]); + } + if (i > 0 && !variant_string_less(metadata.dictionary[layout->field_ids[i - 1]], + metadata.dictionary[layout->field_ids[i]])) { + return Status::Corruption("Invalid Parquet VARIANT object field names"); + } + } + + RETURN_IF_ERROR(require_available_entries(ptr, end, num_elements + 1, field_offset_size, + "object field offsets")); + layout->field_offsets.resize(num_elements + 1); + for (uint64_t i = 0; i <= num_elements; ++i) { + layout->field_offsets[i] = read_unsigned_le(ptr, field_offset_size); + ptr += field_offset_size; + } + + layout->total_size = layout->field_offsets.back(); + layout->fields = ptr; + RETURN_IF_ERROR( + require_available(layout->fields, end, layout->total_size, "object field values")); + RETURN_IF_ERROR(compute_object_field_ends(layout->field_offsets, layout->total_size, + &layout->field_ends)); + return Status::OK(); +} + +Status decode_value_to_variant_map(const uint8_t* ptr, const uint8_t* end, + const VariantMetadata& metadata, PathInDataBuilder* path, + VariantMap* values, std::deque<std::string>* string_values, + const uint8_t** next); + +Status decode_primitive_to_variant_map(uint8_t primitive_header, const uint8_t* ptr, + const uint8_t* end, const VariantMetadata&, + PathInDataBuilder* path, VariantMap* values, + std::deque<std::string>* string_values, + const uint8_t** next) { + if (primitive_header == 15) { + RETURN_IF_ERROR(require_available(ptr, end, 4, "binary length")); + uint64_t size = read_unsigned_le(ptr, 4); + ptr += 4; + RETURN_IF_ERROR(require_available(ptr, end, size, "binary value")); + string_values->emplace_back(reinterpret_cast<const char*>(ptr), static_cast<size_t>(size)); + (*values)[path->build()] = FieldWithDataType { + .field = Field::create_field<TYPE_VARBINARY>(StringView(string_values->back())), + .base_scalar_type_id = TYPE_VARBINARY}; + *next = ptr + size; + return Status::OK(); + } + + std::string json; Review Comment: This VariantMap path still round-trips non-binary Parquet VARIANT primitives through JSON, which loses the primitive type carried by the VARIANT value encoding. For example, a residual root value encoded with primitive header 8/9/10 for DECIMAL reaches `decode_primitive_to_variant_map()`, `decode_primitive()` emits only a JSON number, and `parse_json_to_variant_map()` reconstructs it as a normal numeric field without the original decimal precision/scale metadata; the same applies to date/time/timestamp/UUID headers that are emitted as JSON numbers/strings. This is distinct from the existing typed-array JSON round-trip comments because it affects residual scalar VARIANT values on the VariantMap path. Please decode these primitive headers directly into `FieldWithDataType` instead of reparsing JSON, and add coverage for residual DECIMAL and temporal/UUID scalar values, including array elements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
