github-actions[bot] commented on code in PR #63192: URL: https://github.com/apache/doris/pull/63192#discussion_r3253073742
########## be/src/format/parquet/parquet_variant_reader.cpp: ########## @@ -0,0 +1,949 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/parquet/parquet_variant_reader.h" + +#include <algorithm> +#include <cmath> +#include <cstring> +#include <deque> +#include <iomanip> +#include <limits> +#include <sstream> +#include <string_view> +#include <vector> + +#include "core/column/column_variant.h" +#include "core/value/jsonb_value.h" +#include "exec/common/variant_util.h" + +namespace doris::parquet { + +std::string format_variant_uuid(const uint8_t* ptr) { + static constexpr char hex[] = "0123456789abcdef"; + std::string uuid; + uuid.reserve(36); + for (int i = 0; i < 16; ++i) { + if (i == 4 || i == 6 || i == 8 || i == 10) { + uuid.push_back('-'); + } + uuid.push_back(hex[ptr[i] >> 4]); + uuid.push_back(hex[ptr[i] & 0x0f]); + } + return uuid; +} + +namespace { + +struct VariantMetadata { + std::vector<std::string> dictionary; +}; + +struct VariantObjectLayout { + std::vector<uint64_t> field_ids; + std::vector<uint64_t> field_offsets; + const uint8_t* fields = nullptr; + uint64_t total_size = 0; +}; + +struct VariantArrayLayout { + std::vector<uint64_t> field_offsets; + const uint8_t* fields = nullptr; + uint64_t total_size = 0; +}; + +uint64_t read_unsigned_le(const uint8_t* ptr, int size) { + uint64_t value = 0; + for (int i = 0; i < size; ++i) { + value |= static_cast<uint64_t>(ptr[i]) << (i * 8); + } + return value; +} + +int64_t read_signed_le(const uint8_t* ptr, int size) { + uint64_t value = read_unsigned_le(ptr, size); + if (size < 8) { + uint64_t sign_bit = uint64_t {1} << (size * 8 - 1); + if ((value & sign_bit) != 0) { + uint64_t mask = ~((uint64_t {1} << (size * 8)) - 1); + value |= mask; + } + } + return static_cast<int64_t>(value); +} + +__int128 read_signed_int128_le(const uint8_t* ptr) { + unsigned __int128 unsigned_value = 0; + for (int i = 15; i >= 0; --i) { + unsigned_value <<= 8; + unsigned_value |= ptr[i]; + } + static constexpr unsigned __int128 sign_bit = static_cast<unsigned __int128>(1) << 127; + if ((unsigned_value & sign_bit) == 0) { + return static_cast<__int128>(unsigned_value); + } + static constexpr __int128 signed_half_range = static_cast<__int128>(1) << 126; + return (static_cast<__int128>(unsigned_value & (sign_bit - 1)) - signed_half_range) - + signed_half_range; +} + +Status require_available(const uint8_t* ptr, const uint8_t* end, size_t size, + std::string_view context) { + if (ptr > end) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + if (size > static_cast<size_t>(end - ptr)) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return Status::OK(); +} + +Status require_available_entries(const uint8_t* ptr, const uint8_t* end, uint64_t entries, + size_t entry_size, std::string_view context) { + if (entries > std::numeric_limits<size_t>::max() / entry_size) { + return Status::Corruption("Invalid Parquet VARIANT {} encoding", context); + } + return require_available(ptr, end, static_cast<size_t>(entries) * entry_size, context); +} + +bool variant_string_less(std::string_view lhs, std::string_view rhs) { + return std::lexicographical_compare( + lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), [](char left, char right) { + return static_cast<unsigned char>(left) < static_cast<unsigned char>(right); + }); +} + +bool is_valid_utf8(std::string_view value) { + const auto* data = reinterpret_cast<const uint8_t*>(value.data()); + const auto* end = data + value.size(); + while (data < end) { + const uint8_t first = *data++; + if (first <= 0x7f) { + continue; + } + + uint32_t code_point = 0; + size_t continuation_bytes = 0; + if (first >= 0xc2 && first <= 0xdf) { + code_point = first & 0x1f; + continuation_bytes = 1; + } else if (first >= 0xe0 && first <= 0xef) { + code_point = first & 0x0f; + continuation_bytes = 2; + } else if (first >= 0xf0 && first <= 0xf4) { + code_point = first & 0x07; + continuation_bytes = 3; + } else { + return false; + } + + if (static_cast<size_t>(end - data) < continuation_bytes) { + return false; + } + for (size_t i = 0; i < continuation_bytes; ++i) { + const uint8_t byte = *data++; + if ((byte & 0xc0) != 0x80) { + return false; + } + code_point = (code_point << 6) | (byte & 0x3f); + } + + if ((continuation_bytes == 2 && code_point < 0x800) || + (continuation_bytes == 3 && code_point < 0x10000) || + (code_point >= 0xd800 && code_point <= 0xdfff) || code_point > 0x10ffff) { + return false; + } + } + return true; +} + +Status require_valid_utf8(std::string_view value, std::string_view context) { + if (!is_valid_utf8(value)) { + return Status::Corruption("Invalid Parquet VARIANT {} UTF-8 string", context); + } + return Status::OK(); +} + +Status validate_array_field_offsets(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::string_view context) { + if (field_offsets.empty() || field_offsets.front() != 0) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + for (size_t i = 0; i < field_offsets.size(); ++i) { + if (field_offsets[i] > total_size) { + return Status::Corruption("Invalid Parquet VARIANT {} field offset {}", context, + field_offsets[i]); + } + if (i > 0 && field_offsets[i] < field_offsets[i - 1]) { + return Status::Corruption("Invalid Parquet VARIANT {} field offsets", context); + } + } + return Status::OK(); +} + +Status compute_object_field_ends(const std::vector<uint64_t>& field_offsets, uint64_t total_size, + std::vector<uint64_t>* field_ends) { + if (field_offsets.empty()) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + size_t num_elements = field_offsets.size() - 1; + if (num_elements == 0) { + if (total_size != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + return Status::OK(); + } + + std::vector<std::pair<uint64_t, size_t>> physical_offsets; + physical_offsets.reserve(num_elements); + for (size_t i = 0; i < num_elements; ++i) { + if (field_offsets[i] >= total_size) { + return Status::Corruption("Invalid Parquet VARIANT object field offset {}", + field_offsets[i]); + } + physical_offsets.emplace_back(field_offsets[i], i); + } + std::sort(physical_offsets.begin(), physical_offsets.end()); + if (physical_offsets.front().first != 0) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + + field_ends->assign(num_elements, 0); + for (size_t i = 0; i < physical_offsets.size(); ++i) { + if (i > 0 && physical_offsets[i].first == physical_offsets[i - 1].first) { + return Status::Corruption("Invalid Parquet VARIANT object field offsets"); + } + uint64_t child_end = + i + 1 < physical_offsets.size() ? physical_offsets[i + 1].first : total_size; + (*field_ends)[physical_offsets[i].second] = child_end; + } + return Status::OK(); +} + +void append_json_string(std::string_view value, std::string* json) { + json->push_back('"'); + static constexpr char hex[] = "0123456789abcdef"; + for (unsigned char c : value) { + switch (c) { + case '"': + json->append("\\\""); + break; + case '\\': + json->append("\\\\"); + break; + case '\b': + json->append("\\b"); + break; + case '\f': + json->append("\\f"); + break; + case '\n': + json->append("\\n"); + break; + case '\r': + json->append("\\r"); + break; + case '\t': + json->append("\\t"); + break; + default: + if (c < 0x20) { + json->append("\\u00"); + json->push_back(hex[c >> 4]); + json->push_back(hex[c & 0x0f]); + } else { + json->push_back(static_cast<char>(c)); + } + break; + } + } + json->push_back('"'); +} + +template <typename T> +Status append_floating_json(T value, std::string* json) { + if (!std::isfinite(value)) { + return Status::NotSupported( + "Parquet VARIANT non-finite floating point value is not supported"); + } + std::ostringstream oss; + oss << std::setprecision(std::numeric_limits<T>::max_digits10) << value; + json->append(oss.str()); + return Status::OK(); +} + +std::string int128_to_string(__int128 value) { + if (value == 0) { + return "0"; + } + bool negative = value < 0; + unsigned __int128 unsigned_value = negative ? static_cast<unsigned __int128>(-(value + 1)) + 1 + : static_cast<unsigned __int128>(value); + std::string digits; + while (unsigned_value > 0) { + digits.push_back(static_cast<char>('0' + unsigned_value % 10)); + unsigned_value /= 10; + } + if (negative) { + digits.push_back('-'); + } + std::reverse(digits.begin(), digits.end()); + return digits; +} + +void append_decimal_json(__int128 unscaled, int scale, std::string* json) { + std::string value = int128_to_string(unscaled); + bool negative = !value.empty() && value[0] == '-'; + std::string digits = negative ? value.substr(1) : value; + if (scale == 0) { + json->append(value); + return; + } + if (scale > 0) { + if (digits.size() <= static_cast<size_t>(scale)) { + digits.insert(0, static_cast<size_t>(scale) + 1 - digits.size(), '0'); + } + digits.insert(digits.end() - scale, '.'); + if (negative) { + json->push_back('-'); + } + json->append(digits); + return; + } + if (negative) { + json->push_back('-'); + } + json->append(digits); + json->append(static_cast<size_t>(-scale), '0'); +} + +Status decode_primitive(uint8_t primitive_header, const uint8_t* ptr, const uint8_t* end, + std::string* json, const uint8_t** next); +Status decode_value(const uint8_t* ptr, const uint8_t* end, const VariantMetadata& metadata, + std::string* json, const uint8_t** next); + +void append_uuid_json(const uint8_t* ptr, std::string* json) { + json->push_back('"'); + json->append(format_variant_uuid(ptr)); + json->push_back('"'); +} + +Status make_jsonb_field(std::string_view json, FieldWithDataType* value) { + JsonBinaryValue jsonb_value; + RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size())); + value->field = + Field::create_field<TYPE_JSONB>(JsonbField(jsonb_value.value(), jsonb_value.size())); + value->base_scalar_type_id = TYPE_JSONB; + return Status::OK(); +} + +Status insert_empty_object_marker(const PathInData& path, VariantMap* values) { + FieldWithDataType value; + RETURN_IF_ERROR(make_jsonb_field("{}", &value)); + (*values)[path] = std::move(value); + return Status::OK(); +} + +Status parse_json_to_variant_map(std::string_view json, const PathInData& prefix, + VariantMap* values) { + auto parsed_column = ColumnVariant::create(0, false); + ParseConfig parse_config; + StringRef json_ref(json.data(), json.size()); + RETURN_IF_CATCH_EXCEPTION( + variant_util::parse_json_to_variant(*parsed_column, json_ref, nullptr, parse_config)); + Field parsed = (*parsed_column)[0]; + if (parsed.is_null()) { + return Status::OK(); + } + + PathInDataBuilder path; + path.append(prefix.get_parts(), false); + for (auto& [parsed_path, value] : parsed.get<TYPE_VARIANT>()) { + path.append(parsed_path.get_parts(), false); + (*values)[path.build()] = std::move(value); + for (size_t i = 0; i < parsed_path.get_parts().size(); ++i) { + path.pop_back(); + } + } + return Status::OK(); +} + +void fill_field_type_info(FieldWithDataType* value) { + FieldInfo info; + variant_util::get_field_info(value->field, &info); + value->base_scalar_type_id = info.scalar_type_id; + value->num_dimensions = static_cast<uint8_t>(info.num_dimensions); + value->precision = info.precision; + value->scale = info.scale; +} + +Status read_array_layout(uint8_t value_header, const uint8_t* ptr, const uint8_t* end, + VariantArrayLayout* layout) { + int field_offset_size = (value_header & 0x03) + 1; + int num_elements_size = (value_header & 0x04) != 0 ? 4 : 1; + + RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "array element count")); + uint64_t num_elements = read_unsigned_le(ptr, num_elements_size); + ptr += num_elements_size; + + RETURN_IF_ERROR(require_available_entries(ptr, end, num_elements + 1, field_offset_size, + "array field offsets")); + layout->field_offsets.resize(num_elements + 1); + for (uint64_t i = 0; i <= num_elements; ++i) { + layout->field_offsets[i] = read_unsigned_le(ptr, field_offset_size); + ptr += field_offset_size; + } + + layout->total_size = layout->field_offsets.back(); + layout->fields = ptr; + RETURN_IF_ERROR( + require_available(layout->fields, end, layout->total_size, "array field values")); + RETURN_IF_ERROR( + validate_array_field_offsets(layout->field_offsets, layout->total_size, "array")); + return Status::OK(); +} + +Status read_object_layout(uint8_t value_header, const uint8_t* ptr, const uint8_t* end, + const VariantMetadata& metadata, VariantObjectLayout* layout) { + int field_offset_size = (value_header & 0x03) + 1; + int field_id_size = ((value_header >> 2) & 0x03) + 1; + int num_elements_size = (value_header & 0x10) != 0 ? 4 : 1; + + RETURN_IF_ERROR(require_available(ptr, end, num_elements_size, "object element count")); + uint64_t num_elements = read_unsigned_le(ptr, num_elements_size); + ptr += num_elements_size; + + RETURN_IF_ERROR( + require_available_entries(ptr, end, num_elements, field_id_size, "object field ids")); + layout->field_ids.resize(num_elements); + for (uint64_t i = 0; i < num_elements; ++i) { + layout->field_ids[i] = read_unsigned_le(ptr, field_id_size); + ptr += field_id_size; + if (layout->field_ids[i] >= metadata.dictionary.size()) { + return Status::Corruption("Invalid Parquet VARIANT object field id {}", + layout->field_ids[i]); + } + if (i > 0 && !variant_string_less(metadata.dictionary[layout->field_ids[i - 1]], + metadata.dictionary[layout->field_ids[i]])) { + return Status::Corruption("Invalid Parquet VARIANT object field names"); + } + } + + RETURN_IF_ERROR(require_available_entries(ptr, end, num_elements + 1, field_offset_size, + "object field offsets")); + layout->field_offsets.resize(num_elements + 1); + for (uint64_t i = 0; i <= num_elements; ++i) { + layout->field_offsets[i] = read_unsigned_le(ptr, field_offset_size); + ptr += field_offset_size; + } + + layout->total_size = layout->field_offsets.back(); + layout->fields = ptr; + RETURN_IF_ERROR( + require_available(layout->fields, end, layout->total_size, "object field values")); + RETURN_IF_ERROR( + validate_array_field_offsets(layout->field_offsets, layout->total_size, "object")); Review Comment: The VariantMap decoder rejects valid VARIANT objects whose field values are stored out of field-name order. The Parquet VARIANT spec requires field ids/offset entries to be listed in key order, but explicitly says the actual value entries may be in any physical order, so object offsets need not be monotonically increasing. The JSON decoder below handles this with `compute_object_field_ends()`, but this shared layout reader calls `validate_array_field_offsets()` and `decode_object_to_variant_map()` later slices children with `field_offsets[i + 1]`. A valid residual object with keys `[a,b]` but physical values stored as `b` then `a` will fail here or be mis-sliced on the `VariantMap` path used by `variant_to_variant_map()`, even though `decode_variant_to_json()` can read it. Please compute per-field ends for object layouts instead of requiring monotonic offsets, and add coverage for `decode_variant_to_variant_map()` with non-monotonic object offsets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
