github-actions[bot] commented on code in PR #63192: URL: https://github.com/apache/doris/pull/63192#discussion_r3234810189
########## be/src/format/parquet/parquet_variant_reader.cpp: ########## @@ -0,0 +1,541 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_variant_reader.h" + +#include <algorithm> +#include <cmath> +#include <cstring> +#include <iomanip> +#include <limits> +#include <sstream> +#include <string_view> +#include <vector> + +namespace doris::parquet { + +namespace { + +struct VariantMetadata { + std::vector<std::string> dictionary; +}; + +uint64_t read_unsigned_le(const uint8_t* ptr, int size) { + uint64_t value = 0; + for (int i = 0; i < size; ++i) { + value |= static_cast<uint64_t>(ptr[i]) << (i * 8); + } + return value; +} + +int64_t read_signed_le(const uint8_t* ptr, int size) { + uint64_t value = read_unsigned_le(ptr, size); + if (size < 8) { + uint64_t sign_bit = uint64_t {1} << (size * 8 - 1); + if ((value & sign_bit) != 0) { + uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1); + value |= mask; + } + } + return static_cast<int64_t>(value); +} + +Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size, + std::string_view context) { + if (ptr > end || size > static_cast<size_t>(end - ptr)) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return Status::OK(); +} + +Status require_available_entries(const uint8_t* ptr, const uint8_t* end, uint64_t entries, + size_t entry_size, std::string_view context) { + if (entries > std::numeric_limits<size_t>::max() / entry_size) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return require_available(ptr, end, static_cast<size_t>(entries) * entry_size, context); +} + +Status validate_array_field_offsets(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::string_view context) { + if (field_offsets.empty() || field_offsets.front() != 0) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + for (size_t i = 0; i < field_offsets.size(); ++i) { + if (field_offsets[i] > total_size) { + return Status::Corruption("Invalid Parquet VARIANT {} field offset {}", context, + field_offsets[i]); + } + if (i > 0 && field_offsets[i] < field_offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + } + return Status::OK(); +} + +Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::vector<uint64_t>* field_ends) { + if (field_offsets.empty()) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + size_t num_elements = field_offsets.size() - 1; + if (num_elements == 0) { + if (total_size != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + return Status::OK(); + } + + std::vector<std::pair<uint64_t, size_t>> physical_offsets; + physical_offsets.reserve(num_elements); + for (size_t i = 0; i < num_elements; ++i) { + if (field_offsets[i] >= total_size) { + return Status::Corruption("Invalid Parquet VARIANT object field offset {}", + field_offsets[i]); + } + physical_offsets.emplace_back(field_offsets[i], i); + } + std::sort(physical_offsets.begin(), physical_offsets.end()); + if (physical_offsets.front().first != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + + field_ends->assign(num_elements, 0); + for (size_t i = 0; i < physical_offsets.size(); ++i) { + if (i > 0 && physical_offsets[i].first == physical_offsets[i - 1].first) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + uint64_t child_end = + i + 1 < physical_offsets.size() ? physical_offsets[i + 1].first : total_size; + (*field_ends)[physical_offsets[i].second] = child_end; + } + return Status::OK(); +} + +void append_json_string(std::string_view value, std::string* json) { + json->push_back('"'); + static constexpr char hex[] = "0123456789abcdef"; + for (unsigned char c : value) { + switch (c) { + case '"': + json->append("\\\""); + break; + case '\\': + json->append("\\\\"); + break; + case '\b': + json->append("\\b"); + break; + case '\f': + json->append("\\f"); + break; + case '\n': + json->append("\\n"); + break; + case '\r': + json->append("\\r"); + break; + case '\t': + json->append("\\t"); + break; + default: + if (c < 0x20) { + json->append("\\u00"); + json->push_back(hex[c >> 4]); + json->push_back(hex[c & 0x0f]); + } else { + json->push_back(static_cast<char>(c)); + } + break; + } + } + json->push_back('"'); +} + +template <typename T> +void append_floating_json(T value, std::string* json) { + if (!std::isfinite(value)) { + json->append("null"); + return; + } + std::ostringstream oss; + oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value; + json->append(oss.str()); +} + +std::string int128_to_string(__int128 value) { + if (value == 0) { + return "0"; + } + bool negative = value < 0; + unsigned __int128 unsigned_value = negative ? static_cast<unsigned __int128>(-(value + 1)) + 1 + : static_cast<unsigned __int128>(value); + std::string digits; + while (unsigned_value > 0) { + digits.push_back(static_cast<char>('0' + unsigned_value % 10)); + unsigned_value /= 10; + } + if (negative) { + digits.push_back('-'); + } + std::reverse(digits.begin(), digits.end()); + return digits; +} + +void append_decimal_json(__int128 unscaled, int scale, std::string* json) { + std::string value = int128_to_string(unscaled); + bool negative = !value.empty() && value[0] == '-'; + std::string digits = negative ? value.substr(1) : value; + if (scale == 0) { + json->append(value); + return; + } + if (scale > 0) { + if (digits.size() <= static_cast<size_t>(scale)) { + digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), '0'); + } + digits.insert(digits.end() - scale, '.'); + if (negative) { + json->push_back('-'); + } + json->append(digits); + return; + } + if (negative) { + json->push_back('-'); + } + json->append(digits); + json->append(static_cast<size_t>(-scale), '0'); +} + +void append_uuid_json(const uint8_t* ptr, std::string* json) { + static constexpr char hex[] = "0123456789abcdef"; + json->push_back('"'); + for (int i = 0; i < 16; ++i) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + json->push_back('-'); + } + json->push_back(hex[ptr[i] >> 4]); + json->push_back(hex[ptr[i] & 0x0f]); + } + json->push_back('"'); +} + +Status decode_metadata(const StringRef& metadata, VariantMetadata* result) { + const auto* ptr = reinterpret_cast<const uint8_t*>(metadata.data); + const auto* end = ptr + metadata.size; + RETURN_IF_ERROR(require_available(ptr, end, 1, "metadata")); + uint8_t header = *ptr++; + uint8_t version = header & 0x0f; + if (version != 1) { + return Status::Corruption("Unsupported Parquet VARIANT metadata version {}", version); + } + int offset_size = ((header >> 6) & 0x03) + 1; + RETURN_IF_ERROR(require_available(ptr, end, offset_size, "metadata dictionary size")); + uint64_t dictionary_size = read_unsigned_le(ptr, offset_size); + ptr += offset_size; + + RETURN_IF_ERROR(require_available_entries(ptr, end, dictionary_size + 1, offset_size, + "metadata dictionary offsets")); + std::vector<uint64_t> offsets(dictionary_size + 1); + for (uint64_t i = 0; i <= dictionary_size; ++i) { + offsets[i] = read_unsigned_le(ptr, offset_size); + ptr += offset_size; + if (i > 0 && offsets[i] < offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT metadata dictionary offsets"); + } + } + + RETURN_IF_ERROR(require_available(ptr, end, offsets.back(), "metadata dictionary bytes")); + result->dictionary.clear(); + result->dictionary.reserve(dictionary_size); + for (uint64_t i = 0; i < dictionary_size; ++i) { + result->dictionary.emplace_back(reinterpret_cast<const char*>(ptr + offsets[i]), + offsets[i + 1] - offsets[i]); Review Comment: `decode_object()` now rejects duplicate or out-of-order field ids, but duplicate field names can still enter through the metadata dictionary itself. A malformed metadata dictionary like `["a", "a"]` with object field ids `[0, 1]` passes the increasing-id check and is serialized as JSON with duplicate `"a"` keys instead of corruption. Please reject duplicate dictionary entries during metadata decode and add a unit case for duplicate metadata names; this is distinct from the existing object field-id duplicate thread because the field ids are strictly increasing here. ########## be/src/format/parquet/vparquet_column_reader.cpp: ########## @@ -103,6 +119,856 @@ static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offset } } +static constexpr int64_t UNIX_EPOCH_DAYNR = 719528; +static constexpr int64_t MICROS_PER_SECOND = 1000000; + +static int64_t variant_date_value(const VecDateTimeValue& value) { + return value.daynr() - UNIX_EPOCH_DAYNR; +} + +static int64_t variant_date_value(const DateV2Value<DateV2ValueType>& value) { + return value.daynr() - UNIX_EPOCH_DAYNR; +} + +static int64_t variant_datetime_value(const VecDateTimeValue& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND; +} + +static int64_t variant_datetime_value(const DateV2Value<DateTimeV2ValueType>& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND + value.microsecond(); +} + +static int64_t variant_datetime_value(const TimestampTzValue& value) { + int64_t timestamp = 0; + value.unix_timestamp(×tamp, cctz::utc_time_zone()); + return timestamp * MICROS_PER_SECOND + value.microsecond(); +} + +static int find_child_idx(const FieldSchema& field, std::string_view name) { + for (int i = 0; i < field.children.size(); ++i) { + if (field.children[i].lower_case_name == name) { + return i; + } + } + return -1; +} + +static bool is_variant_wrapper_typed_value_child(const FieldSchema& field) { + auto type = remove_nullable(field.data_type); + return type->get_primitive_type() == TYPE_STRUCT || type->get_primitive_type() == TYPE_ARRAY; +} + +static bool is_variant_wrapper_field(const FieldSchema& field, + bool allow_scalar_typed_value_only_wrapper) { + auto type = remove_nullable(field.data_type); + if (type->get_primitive_type() != TYPE_STRUCT && type->get_primitive_type() != TYPE_VARIANT) { + return false; + } + + bool has_metadata = false; + bool has_value = false; + const FieldSchema* typed_value = nullptr; + for (const auto& child : field.children) { + if (child.lower_case_name == "metadata") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_metadata = true; + continue; + } + if (child.lower_case_name == "value") { + if (child.physical_type != tparquet::Type::BYTE_ARRAY) { + return false; + } + has_value = true; + continue; + } + if (child.lower_case_name == "typed_value") { + typed_value = &child; + continue; + } + return false; + } + if (has_metadata && has_value) { + return true; Review Comment: This classifies any typed struct with binary user fields named `metadata` and `value` as a Parquet VARIANT wrapper even when there is no `typed_value` child. A valid typed-only object like `typed_value.obj { metadata: string, value: string }` should decode as `{"obj":{"metadata":...,"value":...}}`, but `typed_struct_to_json()` routes `obj` through `variant_to_json()`, interpreting the user bytes as structural VARIANT metadata/value and either corrupting or dropping the object. Please require an actual shredded-wrapper shape here instead of `has_metadata && has_value` alone, and add coverage for typed object fields named `metadata` and `value`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
