github-actions[bot] commented on code in PR #25138: URL: https://github.com/apache/doris/pull/25138#discussion_r1371767124
########## be/src/vec/exec/format/parquet/parquet_column_convert.h: ########## @@ -0,0 +1,665 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/parquet_types.h> + +#include <algorithm> +#include <functional> +#include <ostream> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "gen_cpp/descriptors.pb.h" +#include "gutil/endian.h" +#include "gutil/strings/numbers.h" +#include "io/file_factory.h" +#include "olap/olap_common.h" +#include "util/coding.h" +#include "util/slice.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/decoder.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +namespace ParquetConvert { + +template <tparquet::Type::type ParquetType> +struct PhysicalTypeTraits {}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT32> { + using DataType = int32_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BOOLEAN> { + using DataType = uint8; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT64> { + using DataType = int64_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FLOAT> { + using DataType = float; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::DOUBLE> { + using DataType = double; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FIXED_LEN_BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT96> { + using DataType = ParquetInt96; + using ColumnType = ColumnVector<Int8>; +}; + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +#define FOR_LOGICAL_DECIMAL_TYPES(M) \ + M(TypeIndex::Decimal32, Decimal32, Int32) \ + M(TypeIndex::Decimal64, Decimal64, Int64) \ + M(TypeIndex::Decimal128, Decimal128, Int128) \ + M(TypeIndex::Decimal128I, Decimal128, Int128) + +struct ConvertParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + size_t offset_days = 0; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; + FieldSchema* field_schema = nullptr; + size_t start_idx = 0; + + void init(FieldSchema* field_schema_, cctz::time_zone* ctz_, size_t start_idx_ = 0) { + field_schema = field_schema_; + if (ctz_ != nullptr) { + ctz = ctz_; + } + const auto& schema = field_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + ctz = const_cast<cctz::time_zone*>(&utc0); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + second_mask = 1000000000; + scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } + } + + if (ctz) { + VecDateTimeValue t; + t.from_unixtime(0, *ctz); + offset_days = t.day() == 31 ? 0 : 1; + } + start_idx = start_idx_; + } + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type) { + if (field_schema == nullptr || decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { + return; + } + auto scale = field_schema->parquet_schema.scale; + auto* decimal_type = static_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>( + const_cast<IDataType*>(remove_nullable(data_type).get())); + auto dest_scale = decimal_type->get_scale(); + if (dest_scale > scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); + } else { + decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; + decimal_scale.scale_factor = 1; + } + } +}; + +/* +* parquet_physical_type : The type of data stored in parquet. +* Read data into columns returned by get_column according to the physical type of parquet. +* show_type : The data format that should be displayed. +* doris_column : What type of column does the upper layer need to put the data in. +* +* example : +* In hive, if decimal is stored as FIXED_LENBYTE_ARRAY in parquet, +* then we use `ALTER TABLE TableName CHANGE COLUMN Col_Decimal Col_Decimal String;` +* to convert this column to string type. +* parquet_type : FIXED_LEN_BYTE_ARRAY. +* ans_data_type : ColumnInt8 +* show_type : Decimal. +* doris_column : ColumnString. +*/ +ColumnPtr get_column(tparquet::Type::type parquet_physical_type, PrimitiveType show_type, + ColumnPtr& doris_column, DataTypePtr& doris_type, bool* need_convert); + +struct ColumnConvert { + virtual Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) { return Status::OK(); } + + virtual ~ColumnConvert() = default; + + void convert_null(ColumnPtr& src_col, MutableColumnPtr& dst_col) { + src_col = remove_nullable(src_col); + dst_col = remove_nullable(dst_col->get_ptr())->assume_mutable(); + } + +public: + ConvertParams* _convert_params; +}; + +template <tparquet::Type::type parquet_physical_type, typename dst_type> +struct NumberToNumberConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<dst_type>&>(*dst_col.get()).get_data(); + for (int i = 0; i < rows; i++) { + dst_type value = static_cast<dst_type>(src_data[i]); + data[_convert_params->start_idx + i] = value; + } + + return Status::OK(); + } +}; + +template <tparquet::Type::type parquet_physical_type> +struct NumberToStringConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + char buf[100]; + auto str_col = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + if constexpr (parquet_physical_type == tparquet::Type::FLOAT) { + int len = FastFloatToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + + } else if constexpr (parquet_physical_type == tparquet::Type::DOUBLE) { + int len = FastDoubleToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + } else if constexpr (parquet_physical_type == tparquet::Type::INT32) { + char* end = FastInt32ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else if constexpr (parquet_physical_type == tparquet::Type::INT64) { + char* end = FastInt64ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else { + string value = std::to_string(src_data[i]); + str_col->insert_data(value.data(), value.size()); + } + } + return Status::OK(); + } +}; + +struct Int96toTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto ParquetInt96_data = (ParquetInt96*)src_data.data(); + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + ParquetInt96 x = ParquetInt96_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = x.to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); + value.set_microsecond(micros % 1000000); + } + return Status::OK(); + } +}; + +struct Int64ToTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto src_data = static_cast<const ColumnVector<int64_t>*>(src_col.get())->get_data().data(); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + int64_t x = src_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + value.from_unixtime(x / _convert_params->second_mask, *_convert_params->ctz); + value.set_microsecond((x % _convert_params->second_mask) * + (_convert_params->scale_to_nano_factor / 1000)); + } + return Status::OK(); + } +}; + +class Int32ToDate : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + auto& data = static_cast<ColumnDateV2*>(dst_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + for (int i = 0; i < rows; i++) { + auto& value = reinterpret_cast<DateV2Value<DateV2ValueType>&>( + data[_convert_params->start_idx + i]); + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + value = date_dict[date_value]; + } + + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType, DecimalScaleParams::ScaleType ScaleType> +class StringToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& data = static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data(); + for (int i = 0; i < rows; i++) { + size_t len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) { + // do nothing + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + auto& v = reinterpret_cast<DecimalType&>(data[_convert_params->start_idx + i]); + v = (DecimalType)value; + } + + return Status::OK(); + } +}; +template <typename NumberType, typename DecimalPhysicalType, typename ValueCopyType, + DecimalScaleParams::ScaleType ScaleType> +class NumberToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto* src_data = + static_cast<const ColumnVector<NumberType>*>(src_col.get())->get_data().data(); + dst_col->resize(_convert_params->start_idx + rows); + + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto* data = static_cast<ColumnDecimal<Decimal<DecimalPhysicalType>>*>(dst_col.get()) + ->get_data() + .data(); + + for (int i = 0; i < rows; i++) { + ValueCopyType value = src_data[i]; + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + data[_convert_params->start_idx + i] = (DecimalPhysicalType)value; + } + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType> +class StringToDecimalString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + + auto data = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + int len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + std::string ans = reinterpret_cast<DecimalType&>(value).to_string( + _convert_params->field_schema->parquet_schema.scale); + data->insert_data(ans.data(), ans.size()); + } + return Status::OK(); + } +}; + +class Int32ToDateString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + auto str_col = static_cast<ColumnString*>(dst_col.get()); + char buf[50]; + for (int i = 0; i < rows; i++) { + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + DateV2Value<DateV2ValueType> value = date_dict[date_value]; + char* end = value.to_string(buf); + str_col->insert_data(buf, end - buf); + } + + return Status::OK(); + } +}; + +class Int96ToTimestampString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto dst_data = static_cast<ColumnString*>(dst_col.get()); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + ParquetInt96* data = (ParquetInt96*)src_data.data(); Review Comment: warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto] ```suggestion auto* data = (ParquetInt96*)src_data.data(); ``` ########## be/src/vec/exec/format/parquet/parquet_column_convert.h: ########## @@ -0,0 +1,665 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/parquet_types.h> + +#include <algorithm> +#include <functional> +#include <ostream> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "gen_cpp/descriptors.pb.h" +#include "gutil/endian.h" +#include "gutil/strings/numbers.h" +#include "io/file_factory.h" +#include "olap/olap_common.h" +#include "util/coding.h" +#include "util/slice.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/decoder.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +namespace ParquetConvert { + +template <tparquet::Type::type ParquetType> +struct PhysicalTypeTraits {}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT32> { + using DataType = int32_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BOOLEAN> { + using DataType = uint8; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT64> { + using DataType = int64_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FLOAT> { + using DataType = float; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::DOUBLE> { + using DataType = double; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FIXED_LEN_BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT96> { + using DataType = ParquetInt96; + using ColumnType = ColumnVector<Int8>; +}; + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +#define FOR_LOGICAL_DECIMAL_TYPES(M) \ + M(TypeIndex::Decimal32, Decimal32, Int32) \ + M(TypeIndex::Decimal64, Decimal64, Int64) \ + M(TypeIndex::Decimal128, Decimal128, Int128) \ + M(TypeIndex::Decimal128I, Decimal128, Int128) + +struct ConvertParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + size_t offset_days = 0; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; + FieldSchema* field_schema = nullptr; + size_t start_idx = 0; + + void init(FieldSchema* field_schema_, cctz::time_zone* ctz_, size_t start_idx_ = 0) { + field_schema = field_schema_; + if (ctz_ != nullptr) { + ctz = ctz_; + } + const auto& schema = field_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + ctz = const_cast<cctz::time_zone*>(&utc0); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + second_mask = 1000000000; + scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } + } + + if (ctz) { + VecDateTimeValue t; + t.from_unixtime(0, *ctz); + offset_days = t.day() == 31 ? 0 : 1; + } + start_idx = start_idx_; + } + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type) { + if (field_schema == nullptr || decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { + return; + } + auto scale = field_schema->parquet_schema.scale; + auto* decimal_type = static_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>( + const_cast<IDataType*>(remove_nullable(data_type).get())); + auto dest_scale = decimal_type->get_scale(); + if (dest_scale > scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); + } else { + decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; + decimal_scale.scale_factor = 1; + } + } +}; + +/* +* parquet_physical_type : The type of data stored in parquet. +* Read data into columns returned by get_column according to the physical type of parquet. +* show_type : The data format that should be displayed. +* doris_column : What type of column does the upper layer need to put the data in. +* +* example : +* In hive, if decimal is stored as FIXED_LENBYTE_ARRAY in parquet, +* then we use `ALTER TABLE TableName CHANGE COLUMN Col_Decimal Col_Decimal String;` +* to convert this column to string type. +* parquet_type : FIXED_LEN_BYTE_ARRAY. +* ans_data_type : ColumnInt8 +* show_type : Decimal. +* doris_column : ColumnString. +*/ +ColumnPtr get_column(tparquet::Type::type parquet_physical_type, PrimitiveType show_type, + ColumnPtr& doris_column, DataTypePtr& doris_type, bool* need_convert); + +struct ColumnConvert { + virtual Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) { return Status::OK(); } + + virtual ~ColumnConvert() = default; + + void convert_null(ColumnPtr& src_col, MutableColumnPtr& dst_col) { + src_col = remove_nullable(src_col); + dst_col = remove_nullable(dst_col->get_ptr())->assume_mutable(); + } + +public: + ConvertParams* _convert_params; +}; + +template <tparquet::Type::type parquet_physical_type, typename dst_type> +struct NumberToNumberConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<dst_type>&>(*dst_col.get()).get_data(); + for (int i = 0; i < rows; i++) { + dst_type value = static_cast<dst_type>(src_data[i]); + data[_convert_params->start_idx + i] = value; + } + + return Status::OK(); + } +}; + +template <tparquet::Type::type parquet_physical_type> +struct NumberToStringConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + char buf[100]; + auto str_col = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + if constexpr (parquet_physical_type == tparquet::Type::FLOAT) { + int len = FastFloatToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + + } else if constexpr (parquet_physical_type == tparquet::Type::DOUBLE) { + int len = FastDoubleToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + } else if constexpr (parquet_physical_type == tparquet::Type::INT32) { + char* end = FastInt32ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else if constexpr (parquet_physical_type == tparquet::Type::INT64) { + char* end = FastInt64ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else { + string value = std::to_string(src_data[i]); + str_col->insert_data(value.data(), value.size()); + } + } + return Status::OK(); + } +}; + +struct Int96toTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto ParquetInt96_data = (ParquetInt96*)src_data.data(); + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + ParquetInt96 x = ParquetInt96_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = x.to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); + value.set_microsecond(micros % 1000000); + } + return Status::OK(); + } +}; + +struct Int64ToTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto src_data = static_cast<const ColumnVector<int64_t>*>(src_col.get())->get_data().data(); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + int64_t x = src_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + value.from_unixtime(x / _convert_params->second_mask, *_convert_params->ctz); + value.set_microsecond((x % _convert_params->second_mask) * + (_convert_params->scale_to_nano_factor / 1000)); + } + return Status::OK(); + } +}; + +class Int32ToDate : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + auto& data = static_cast<ColumnDateV2*>(dst_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + for (int i = 0; i < rows; i++) { + auto& value = reinterpret_cast<DateV2Value<DateV2ValueType>&>( + data[_convert_params->start_idx + i]); + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + value = date_dict[date_value]; + } + + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType, DecimalScaleParams::ScaleType ScaleType> +class StringToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& data = static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data(); + for (int i = 0; i < rows; i++) { + size_t len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) { + // do nothing + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + auto& v = reinterpret_cast<DecimalType&>(data[_convert_params->start_idx + i]); + v = (DecimalType)value; + } + + return Status::OK(); + } +}; +template <typename NumberType, typename DecimalPhysicalType, typename ValueCopyType, + DecimalScaleParams::ScaleType ScaleType> +class NumberToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto* src_data = + static_cast<const ColumnVector<NumberType>*>(src_col.get())->get_data().data(); + dst_col->resize(_convert_params->start_idx + rows); + + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto* data = static_cast<ColumnDecimal<Decimal<DecimalPhysicalType>>*>(dst_col.get()) + ->get_data() + .data(); + + for (int i = 0; i < rows; i++) { + ValueCopyType value = src_data[i]; + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + data[_convert_params->start_idx + i] = (DecimalPhysicalType)value; + } + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType> +class StringToDecimalString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + + auto data = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + int len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + std::string ans = reinterpret_cast<DecimalType&>(value).to_string( + _convert_params->field_schema->parquet_schema.scale); + data->insert_data(ans.data(), ans.size()); + } + return Status::OK(); + } +}; + +class Int32ToDateString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + auto str_col = static_cast<ColumnString*>(dst_col.get()); + char buf[50]; + for (int i = 0; i < rows; i++) { + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + DateV2Value<DateV2ValueType> value = date_dict[date_value]; + char* end = value.to_string(buf); + str_col->insert_data(buf, end - buf); + } + + return Status::OK(); + } +}; + +class Int96ToTimestampString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto dst_data = static_cast<ColumnString*>(dst_col.get()); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + ParquetInt96* data = (ParquetInt96*)src_data.data(); + + char buf[50]; Review Comment: warning: 50 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp char buf[50]; ^ ``` ########## be/src/vec/exec/format/parquet/parquet_column_convert.h: ########## @@ -0,0 +1,665 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/parquet_types.h> + +#include <algorithm> +#include <functional> +#include <ostream> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "gen_cpp/descriptors.pb.h" +#include "gutil/endian.h" +#include "gutil/strings/numbers.h" +#include "io/file_factory.h" +#include "olap/olap_common.h" +#include "util/coding.h" +#include "util/slice.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/decoder.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +namespace ParquetConvert { + +template <tparquet::Type::type ParquetType> +struct PhysicalTypeTraits {}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT32> { + using DataType = int32_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BOOLEAN> { + using DataType = uint8; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT64> { + using DataType = int64_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FLOAT> { + using DataType = float; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::DOUBLE> { + using DataType = double; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FIXED_LEN_BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT96> { + using DataType = ParquetInt96; + using ColumnType = ColumnVector<Int8>; +}; + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +#define FOR_LOGICAL_DECIMAL_TYPES(M) \ + M(TypeIndex::Decimal32, Decimal32, Int32) \ + M(TypeIndex::Decimal64, Decimal64, Int64) \ + M(TypeIndex::Decimal128, Decimal128, Int128) \ + M(TypeIndex::Decimal128I, Decimal128, Int128) + +struct ConvertParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + size_t offset_days = 0; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; + FieldSchema* field_schema = nullptr; + size_t start_idx = 0; + + void init(FieldSchema* field_schema_, cctz::time_zone* ctz_, size_t start_idx_ = 0) { + field_schema = field_schema_; + if (ctz_ != nullptr) { + ctz = ctz_; + } + const auto& schema = field_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + ctz = const_cast<cctz::time_zone*>(&utc0); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + second_mask = 1000000000; + scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } + } + + if (ctz) { + VecDateTimeValue t; + t.from_unixtime(0, *ctz); + offset_days = t.day() == 31 ? 0 : 1; + } + start_idx = start_idx_; + } + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type) { + if (field_schema == nullptr || decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { + return; + } + auto scale = field_schema->parquet_schema.scale; + auto* decimal_type = static_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>( + const_cast<IDataType*>(remove_nullable(data_type).get())); + auto dest_scale = decimal_type->get_scale(); + if (dest_scale > scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); + } else { + decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; + decimal_scale.scale_factor = 1; + } + } +}; + +/* +* parquet_physical_type : The type of data stored in parquet. +* Read data into columns returned by get_column according to the physical type of parquet. +* show_type : The data format that should be displayed. +* doris_column : What type of column does the upper layer need to put the data in. +* +* example : +* In hive, if decimal is stored as FIXED_LENBYTE_ARRAY in parquet, +* then we use `ALTER TABLE TableName CHANGE COLUMN Col_Decimal Col_Decimal String;` +* to convert this column to string type. +* parquet_type : FIXED_LEN_BYTE_ARRAY. +* ans_data_type : ColumnInt8 +* show_type : Decimal. +* doris_column : ColumnString. +*/ +ColumnPtr get_column(tparquet::Type::type parquet_physical_type, PrimitiveType show_type, + ColumnPtr& doris_column, DataTypePtr& doris_type, bool* need_convert); + +struct ColumnConvert { + virtual Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) { return Status::OK(); } + + virtual ~ColumnConvert() = default; + + void convert_null(ColumnPtr& src_col, MutableColumnPtr& dst_col) { + src_col = remove_nullable(src_col); + dst_col = remove_nullable(dst_col->get_ptr())->assume_mutable(); + } + +public: + ConvertParams* _convert_params; +}; + +template <tparquet::Type::type parquet_physical_type, typename dst_type> +struct NumberToNumberConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<dst_type>&>(*dst_col.get()).get_data(); + for (int i = 0; i < rows; i++) { + dst_type value = static_cast<dst_type>(src_data[i]); + data[_convert_params->start_idx + i] = value; + } + + return Status::OK(); + } +}; + +template <tparquet::Type::type parquet_physical_type> +struct NumberToStringConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + char buf[100]; + auto str_col = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + if constexpr (parquet_physical_type == tparquet::Type::FLOAT) { + int len = FastFloatToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + + } else if constexpr (parquet_physical_type == tparquet::Type::DOUBLE) { + int len = FastDoubleToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + } else if constexpr (parquet_physical_type == tparquet::Type::INT32) { + char* end = FastInt32ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else if constexpr (parquet_physical_type == tparquet::Type::INT64) { + char* end = FastInt64ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else { + string value = std::to_string(src_data[i]); + str_col->insert_data(value.data(), value.size()); + } + } + return Status::OK(); + } +}; + +struct Int96toTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto ParquetInt96_data = (ParquetInt96*)src_data.data(); + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + ParquetInt96 x = ParquetInt96_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = x.to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); + value.set_microsecond(micros % 1000000); + } + return Status::OK(); + } +}; + +struct Int64ToTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto src_data = static_cast<const ColumnVector<int64_t>*>(src_col.get())->get_data().data(); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + int64_t x = src_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + value.from_unixtime(x / _convert_params->second_mask, *_convert_params->ctz); + value.set_microsecond((x % _convert_params->second_mask) * + (_convert_params->scale_to_nano_factor / 1000)); + } + return Status::OK(); + } +}; + +class Int32ToDate : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + auto& data = static_cast<ColumnDateV2*>(dst_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + for (int i = 0; i < rows; i++) { + auto& value = reinterpret_cast<DateV2Value<DateV2ValueType>&>( + data[_convert_params->start_idx + i]); + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + value = date_dict[date_value]; + } + + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType, DecimalScaleParams::ScaleType ScaleType> +class StringToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& data = static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data(); + for (int i = 0; i < rows; i++) { + size_t len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) { + // do nothing + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + auto& v = reinterpret_cast<DecimalType&>(data[_convert_params->start_idx + i]); + v = (DecimalType)value; + } + + return Status::OK(); + } +}; +template <typename NumberType, typename DecimalPhysicalType, typename ValueCopyType, + DecimalScaleParams::ScaleType ScaleType> +class NumberToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto* src_data = + static_cast<const ColumnVector<NumberType>*>(src_col.get())->get_data().data(); + dst_col->resize(_convert_params->start_idx + rows); + + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto* data = static_cast<ColumnDecimal<Decimal<DecimalPhysicalType>>*>(dst_col.get()) + ->get_data() + .data(); + + for (int i = 0; i < rows; i++) { + ValueCopyType value = src_data[i]; + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + data[_convert_params->start_idx + i] = (DecimalPhysicalType)value; + } + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType> +class StringToDecimalString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + + auto data = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + int len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + std::string ans = reinterpret_cast<DecimalType&>(value).to_string( + _convert_params->field_schema->parquet_schema.scale); + data->insert_data(ans.data(), ans.size()); + } + return Status::OK(); + } +}; + +class Int32ToDateString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + auto str_col = static_cast<ColumnString*>(dst_col.get()); + char buf[50]; + for (int i = 0; i < rows; i++) { + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + DateV2Value<DateV2ValueType> value = date_dict[date_value]; + char* end = value.to_string(buf); + str_col->insert_data(buf, end - buf); + } + + return Status::OK(); + } +}; + +class Int96ToTimestampString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto dst_data = static_cast<ColumnString*>(dst_col.get()); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + ParquetInt96* data = (ParquetInt96*)src_data.data(); + + char buf[50]; + for (int i = 0; i < rows; i++) { + uint64_t num = 0; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = data[i].to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); + value.set_microsecond(micros % 1000000); Review Comment: warning: 1000000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp value.set_microsecond(micros % 1000000); ^ ``` ########## be/src/vec/exec/format/parquet/parquet_column_convert.h: ########## @@ -0,0 +1,665 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/parquet_types.h> + +#include <algorithm> +#include <functional> +#include <ostream> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "gen_cpp/descriptors.pb.h" +#include "gutil/endian.h" +#include "gutil/strings/numbers.h" +#include "io/file_factory.h" +#include "olap/olap_common.h" +#include "util/coding.h" +#include "util/slice.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/decoder.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +namespace ParquetConvert { + +template <tparquet::Type::type ParquetType> +struct PhysicalTypeTraits {}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT32> { + using DataType = int32_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BOOLEAN> { + using DataType = uint8; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT64> { + using DataType = int64_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FLOAT> { + using DataType = float; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::DOUBLE> { + using DataType = double; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FIXED_LEN_BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT96> { + using DataType = ParquetInt96; + using ColumnType = ColumnVector<Int8>; +}; + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +#define FOR_LOGICAL_DECIMAL_TYPES(M) \ + M(TypeIndex::Decimal32, Decimal32, Int32) \ + M(TypeIndex::Decimal64, Decimal64, Int64) \ + M(TypeIndex::Decimal128, Decimal128, Int128) \ + M(TypeIndex::Decimal128I, Decimal128, Int128) + +struct ConvertParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + size_t offset_days = 0; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; + FieldSchema* field_schema = nullptr; + size_t start_idx = 0; + + void init(FieldSchema* field_schema_, cctz::time_zone* ctz_, size_t start_idx_ = 0) { + field_schema = field_schema_; + if (ctz_ != nullptr) { + ctz = ctz_; + } + const auto& schema = field_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + ctz = const_cast<cctz::time_zone*>(&utc0); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + second_mask = 1000000000; + scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } + } + + if (ctz) { + VecDateTimeValue t; + t.from_unixtime(0, *ctz); + offset_days = t.day() == 31 ? 0 : 1; + } + start_idx = start_idx_; + } + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type) { + if (field_schema == nullptr || decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { + return; + } + auto scale = field_schema->parquet_schema.scale; + auto* decimal_type = static_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>( + const_cast<IDataType*>(remove_nullable(data_type).get())); + auto dest_scale = decimal_type->get_scale(); + if (dest_scale > scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); + } else { + decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; + decimal_scale.scale_factor = 1; + } + } +}; + +/* +* parquet_physical_type : The type of data stored in parquet. +* Read data into columns returned by get_column according to the physical type of parquet. +* show_type : The data format that should be displayed. +* doris_column : What type of column does the upper layer need to put the data in. +* +* example : +* In hive, if decimal is stored as FIXED_LENBYTE_ARRAY in parquet, +* then we use `ALTER TABLE TableName CHANGE COLUMN Col_Decimal Col_Decimal String;` +* to convert this column to string type. +* parquet_type : FIXED_LEN_BYTE_ARRAY. +* ans_data_type : ColumnInt8 +* show_type : Decimal. +* doris_column : ColumnString. +*/ +ColumnPtr get_column(tparquet::Type::type parquet_physical_type, PrimitiveType show_type, + ColumnPtr& doris_column, DataTypePtr& doris_type, bool* need_convert); + +struct ColumnConvert { + virtual Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) { return Status::OK(); } + + virtual ~ColumnConvert() = default; + + void convert_null(ColumnPtr& src_col, MutableColumnPtr& dst_col) { + src_col = remove_nullable(src_col); + dst_col = remove_nullable(dst_col->get_ptr())->assume_mutable(); + } + +public: + ConvertParams* _convert_params; +}; + +template <tparquet::Type::type parquet_physical_type, typename dst_type> +struct NumberToNumberConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<dst_type>&>(*dst_col.get()).get_data(); + for (int i = 0; i < rows; i++) { + dst_type value = static_cast<dst_type>(src_data[i]); + data[_convert_params->start_idx + i] = value; + } + + return Status::OK(); + } +}; + +template <tparquet::Type::type parquet_physical_type> +struct NumberToStringConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + char buf[100]; + auto str_col = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + if constexpr (parquet_physical_type == tparquet::Type::FLOAT) { + int len = FastFloatToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + + } else if constexpr (parquet_physical_type == tparquet::Type::DOUBLE) { + int len = FastDoubleToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + } else if constexpr (parquet_physical_type == tparquet::Type::INT32) { + char* end = FastInt32ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else if constexpr (parquet_physical_type == tparquet::Type::INT64) { + char* end = FastInt64ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else { + string value = std::to_string(src_data[i]); + str_col->insert_data(value.data(), value.size()); + } + } + return Status::OK(); + } +}; + +struct Int96toTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto ParquetInt96_data = (ParquetInt96*)src_data.data(); + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + ParquetInt96 x = ParquetInt96_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = x.to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); + value.set_microsecond(micros % 1000000); + } + return Status::OK(); + } +}; + +struct Int64ToTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto src_data = static_cast<const ColumnVector<int64_t>*>(src_col.get())->get_data().data(); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + int64_t x = src_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + value.from_unixtime(x / _convert_params->second_mask, *_convert_params->ctz); + value.set_microsecond((x % _convert_params->second_mask) * + (_convert_params->scale_to_nano_factor / 1000)); + } + return Status::OK(); + } +}; + +class Int32ToDate : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + auto& data = static_cast<ColumnDateV2*>(dst_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + for (int i = 0; i < rows; i++) { + auto& value = reinterpret_cast<DateV2Value<DateV2ValueType>&>( + data[_convert_params->start_idx + i]); + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + value = date_dict[date_value]; + } + + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType, DecimalScaleParams::ScaleType ScaleType> +class StringToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& data = static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data(); + for (int i = 0; i < rows; i++) { + size_t len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) { + // do nothing + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + auto& v = reinterpret_cast<DecimalType&>(data[_convert_params->start_idx + i]); + v = (DecimalType)value; + } + + return Status::OK(); + } +}; +template <typename NumberType, typename DecimalPhysicalType, typename ValueCopyType, + DecimalScaleParams::ScaleType ScaleType> +class NumberToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto* src_data = + static_cast<const ColumnVector<NumberType>*>(src_col.get())->get_data().data(); + dst_col->resize(_convert_params->start_idx + rows); + + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto* data = static_cast<ColumnDecimal<Decimal<DecimalPhysicalType>>*>(dst_col.get()) + ->get_data() + .data(); + + for (int i = 0; i < rows; i++) { + ValueCopyType value = src_data[i]; + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + data[_convert_params->start_idx + i] = (DecimalPhysicalType)value; + } + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType> +class StringToDecimalString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + + auto data = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + int len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + std::string ans = reinterpret_cast<DecimalType&>(value).to_string( + _convert_params->field_schema->parquet_schema.scale); + data->insert_data(ans.data(), ans.size()); + } + return Status::OK(); + } +}; + +class Int32ToDateString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + auto str_col = static_cast<ColumnString*>(dst_col.get()); + char buf[50]; + for (int i = 0; i < rows; i++) { + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + DateV2Value<DateV2ValueType> value = date_dict[date_value]; + char* end = value.to_string(buf); + str_col->insert_data(buf, end - buf); + } + + return Status::OK(); + } +}; + +class Int96ToTimestampString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto dst_data = static_cast<ColumnString*>(dst_col.get()); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + ParquetInt96* data = (ParquetInt96*)src_data.data(); + + char buf[50]; + for (int i = 0; i < rows; i++) { + uint64_t num = 0; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = data[i].to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); + value.set_microsecond(micros % 1000000); + char* end = value.to_string(buf); + dst_data->insert_data(buf, end - buf); + } + return Status::OK(); + } +}; + +inline Status get_converter(tparquet::Type::type parquet_physical_type, PrimitiveType show_type, + std::shared_ptr<const IDataType> dst_data_type, + std::unique_ptr<ColumnConvert>* converter, + ConvertParams* convert_params) { + auto dst_type = remove_nullable(dst_data_type)->get_type_id(); + switch (dst_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + switch (parquet_physical_type) { \ + case tparquet::Type::BOOLEAN: \ + *converter = std::make_unique< \ + NumberToNumberConvert<tparquet::Type::BOOLEAN, CPP_NUMERIC_TYPE>>(); \ + break; \ + case tparquet::Type::INT32: \ + *converter = std::make_unique< \ + NumberToNumberConvert<tparquet::Type::INT32, CPP_NUMERIC_TYPE>>(); \ + break; \ + case tparquet::Type::INT64: \ + *converter = std::make_unique< \ + NumberToNumberConvert<tparquet::Type::INT64, CPP_NUMERIC_TYPE>>(); \ + break; \ + case tparquet::Type::FLOAT: \ + *converter = std::make_unique< \ + NumberToNumberConvert<tparquet::Type::FLOAT, CPP_NUMERIC_TYPE>>(); \ + break; \ + case tparquet::Type::DOUBLE: \ + *converter = std::make_unique< \ + NumberToNumberConvert<tparquet::Type::DOUBLE, CPP_NUMERIC_TYPE>>(); \ + break; \ + default: \ + break; \ + } \ + break; + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + + case TypeIndex::String: { + if (tparquet::Type::FIXED_LEN_BYTE_ARRAY == parquet_physical_type) { + if (show_type == PrimitiveType::TYPE_DECIMAL32) { + *converter = std::make_unique<StringToDecimalString<Decimal32, Int32>>(); + break; + } else if (show_type == PrimitiveType::TYPE_DECIMAL64) { + *converter = std::make_unique<StringToDecimalString<Decimal64, Int64>>(); + break; + } else if (show_type == PrimitiveType::TYPE_DECIMALV2) { + *converter = std::make_unique<StringToDecimalString<Decimal128, Int128>>(); + break; + } else if (show_type == PrimitiveType::TYPE_DECIMAL128I) { + *converter = std::make_unique<StringToDecimalString<Decimal128, Int128>>(); + break; + } Review Comment: warning: do not use 'else' after 'break' [readability-else-after-return] ```suggestion } if (show_type == PrimitiveType::TYPE_DECIMAL64) { *converter = std::make_unique<StringToDecimalString<Decimal64, Int64>>(); break; } else if (show_type == PrimitiveType::TYPE_DECIMALV2) { *converter = std::make_unique<StringToDecimalString<Decimal128, Int128>>(); break; } else if (show_type == PrimitiveType::TYPE_DECIMAL128I) { *converter = std::make_unique<StringToDecimalString<Decimal128, Int128>>(); break; } ``` ########## be/src/vec/exec/format/parquet/parquet_common.h: ########## @@ -54,6 +54,11 @@ struct ParquetInt96 { inline uint64_t to_timestamp_micros() const { return (hi - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY + lo / NANOS_PER_MICROSECOND; } + inline __int128 to_int128() const { + __int128 ans = 0; + ans = (((__int128)hi) << 64) + lo; Review Comment: warning: 64 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp ans = (((__int128)hi) << 64) + lo; ^ ``` ########## be/src/vec/exec/format/parquet/parquet_column_convert.h: ########## @@ -0,0 +1,665 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/parquet_types.h> + +#include <algorithm> +#include <functional> +#include <ostream> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "gen_cpp/descriptors.pb.h" +#include "gutil/endian.h" +#include "gutil/strings/numbers.h" +#include "io/file_factory.h" +#include "olap/olap_common.h" +#include "util/coding.h" +#include "util/slice.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/decoder.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +namespace ParquetConvert { + +template <tparquet::Type::type ParquetType> +struct PhysicalTypeTraits {}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT32> { + using DataType = int32_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BOOLEAN> { + using DataType = uint8; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT64> { + using DataType = int64_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FLOAT> { + using DataType = float; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::DOUBLE> { + using DataType = double; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FIXED_LEN_BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT96> { + using DataType = ParquetInt96; + using ColumnType = ColumnVector<Int8>; +}; + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +#define FOR_LOGICAL_DECIMAL_TYPES(M) \ + M(TypeIndex::Decimal32, Decimal32, Int32) \ + M(TypeIndex::Decimal64, Decimal64, Int64) \ + M(TypeIndex::Decimal128, Decimal128, Int128) \ + M(TypeIndex::Decimal128I, Decimal128, Int128) + +struct ConvertParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + size_t offset_days = 0; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; + FieldSchema* field_schema = nullptr; + size_t start_idx = 0; + + void init(FieldSchema* field_schema_, cctz::time_zone* ctz_, size_t start_idx_ = 0) { + field_schema = field_schema_; + if (ctz_ != nullptr) { + ctz = ctz_; + } + const auto& schema = field_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + ctz = const_cast<cctz::time_zone*>(&utc0); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + second_mask = 1000000000; + scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } + } + + if (ctz) { + VecDateTimeValue t; + t.from_unixtime(0, *ctz); + offset_days = t.day() == 31 ? 0 : 1; + } + start_idx = start_idx_; + } + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type) { + if (field_schema == nullptr || decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { + return; + } + auto scale = field_schema->parquet_schema.scale; + auto* decimal_type = static_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>( + const_cast<IDataType*>(remove_nullable(data_type).get())); + auto dest_scale = decimal_type->get_scale(); + if (dest_scale > scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); + } else { + decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; + decimal_scale.scale_factor = 1; + } + } +}; + +/* +* parquet_physical_type : The type of data stored in parquet. +* Read data into columns returned by get_column according to the physical type of parquet. +* show_type : The data format that should be displayed. +* doris_column : What type of column does the upper layer need to put the data in. +* +* example : +* In hive, if decimal is stored as FIXED_LENBYTE_ARRAY in parquet, +* then we use `ALTER TABLE TableName CHANGE COLUMN Col_Decimal Col_Decimal String;` +* to convert this column to string type. +* parquet_type : FIXED_LEN_BYTE_ARRAY. +* ans_data_type : ColumnInt8 +* show_type : Decimal. +* doris_column : ColumnString. +*/ +ColumnPtr get_column(tparquet::Type::type parquet_physical_type, PrimitiveType show_type, + ColumnPtr& doris_column, DataTypePtr& doris_type, bool* need_convert); + +struct ColumnConvert { + virtual Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) { return Status::OK(); } + + virtual ~ColumnConvert() = default; + + void convert_null(ColumnPtr& src_col, MutableColumnPtr& dst_col) { + src_col = remove_nullable(src_col); + dst_col = remove_nullable(dst_col->get_ptr())->assume_mutable(); + } + +public: + ConvertParams* _convert_params; +}; + +template <tparquet::Type::type parquet_physical_type, typename dst_type> +struct NumberToNumberConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<dst_type>&>(*dst_col.get()).get_data(); + for (int i = 0; i < rows; i++) { + dst_type value = static_cast<dst_type>(src_data[i]); + data[_convert_params->start_idx + i] = value; + } + + return Status::OK(); + } +}; + +template <tparquet::Type::type parquet_physical_type> +struct NumberToStringConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + char buf[100]; + auto str_col = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + if constexpr (parquet_physical_type == tparquet::Type::FLOAT) { + int len = FastFloatToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + + } else if constexpr (parquet_physical_type == tparquet::Type::DOUBLE) { + int len = FastDoubleToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + } else if constexpr (parquet_physical_type == tparquet::Type::INT32) { + char* end = FastInt32ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else if constexpr (parquet_physical_type == tparquet::Type::INT64) { + char* end = FastInt64ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else { + string value = std::to_string(src_data[i]); + str_col->insert_data(value.data(), value.size()); + } + } + return Status::OK(); + } +}; + +struct Int96toTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto ParquetInt96_data = (ParquetInt96*)src_data.data(); + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + ParquetInt96 x = ParquetInt96_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = x.to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); + value.set_microsecond(micros % 1000000); + } + return Status::OK(); + } +}; + +struct Int64ToTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto src_data = static_cast<const ColumnVector<int64_t>*>(src_col.get())->get_data().data(); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + int64_t x = src_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + value.from_unixtime(x / _convert_params->second_mask, *_convert_params->ctz); + value.set_microsecond((x % _convert_params->second_mask) * + (_convert_params->scale_to_nano_factor / 1000)); + } + return Status::OK(); + } +}; + +class Int32ToDate : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + auto& data = static_cast<ColumnDateV2*>(dst_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + for (int i = 0; i < rows; i++) { + auto& value = reinterpret_cast<DateV2Value<DateV2ValueType>&>( + data[_convert_params->start_idx + i]); + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + value = date_dict[date_value]; + } + + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType, DecimalScaleParams::ScaleType ScaleType> +class StringToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& data = static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data(); + for (int i = 0; i < rows; i++) { + size_t len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) { + // do nothing + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + auto& v = reinterpret_cast<DecimalType&>(data[_convert_params->start_idx + i]); + v = (DecimalType)value; + } + + return Status::OK(); + } +}; +template <typename NumberType, typename DecimalPhysicalType, typename ValueCopyType, + DecimalScaleParams::ScaleType ScaleType> +class NumberToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto* src_data = + static_cast<const ColumnVector<NumberType>*>(src_col.get())->get_data().data(); + dst_col->resize(_convert_params->start_idx + rows); + + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto* data = static_cast<ColumnDecimal<Decimal<DecimalPhysicalType>>*>(dst_col.get()) + ->get_data() + .data(); + + for (int i = 0; i < rows; i++) { + ValueCopyType value = src_data[i]; + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + data[_convert_params->start_idx + i] = (DecimalPhysicalType)value; + } + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType> +class StringToDecimalString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + + auto data = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + int len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + std::string ans = reinterpret_cast<DecimalType&>(value).to_string( + _convert_params->field_schema->parquet_schema.scale); + data->insert_data(ans.data(), ans.size()); + } + return Status::OK(); + } +}; + +class Int32ToDateString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + auto str_col = static_cast<ColumnString*>(dst_col.get()); + char buf[50]; + for (int i = 0; i < rows; i++) { + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + DateV2Value<DateV2ValueType> value = date_dict[date_value]; + char* end = value.to_string(buf); + str_col->insert_data(buf, end - buf); + } + + return Status::OK(); + } +}; + +class Int96ToTimestampString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto dst_data = static_cast<ColumnString*>(dst_col.get()); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + ParquetInt96* data = (ParquetInt96*)src_data.data(); + + char buf[50]; Review Comment: warning: do not declare C-style arrays, use std::array<> instead [modernize-avoid-c-arrays] ```cpp char buf[50]; ^ ``` ########## be/src/vec/exec/format/parquet/vparquet_column_reader.cpp: ########## @@ -476,86 +481,108 @@ Status ScalarColumnReader::_try_load_dict_page(bool* loaded, bool* has_dict) { Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter) { - if (_chunk_reader->remaining_num_values() == 0) { - if (!_chunk_reader->has_next_page()) { - *eof = true; - *read_rows = 0; - return Status::OK(); - } - RETURN_IF_ERROR(_chunk_reader->next_page()); - } - if (_nested_column) { - RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent()); - return _read_nested_column(doris_column, type, select_vector, batch_size, read_rows, eof, - is_dict_filter); - } - - // generate the row ranges that should be read - std::list<RowRange> read_ranges; - _generate_read_ranges(_current_row_index, - _current_row_index + _chunk_reader->remaining_num_values(), read_ranges); - if (read_ranges.size() == 0) { - // skip the whole page - _current_row_index += _chunk_reader->remaining_num_values(); - RETURN_IF_ERROR(_chunk_reader->skip_page()); - *read_rows = 0; - } else { - bool skip_whole_batch = false; - // Determining whether to skip page or batch will increase the calculation time. - // When the filtering effect is greater than 60%, it is possible to skip the page or batch. - if (select_vector.has_filter() && select_vector.filter_ratio() > 0.6) { - // lazy read - size_t remaining_num_values = 0; - for (auto& range : read_ranges) { - remaining_num_values += range.last_row - range.first_row; - } - if (batch_size >= remaining_num_values && - select_vector.can_filter_all(remaining_num_values)) { - // We can skip the whole page if the remaining values is filtered by predicate columns - select_vector.skip(remaining_num_values); - _current_row_index += _chunk_reader->remaining_num_values(); - RETURN_IF_ERROR(_chunk_reader->skip_page()); - *read_rows = remaining_num_values; - if (!_chunk_reader->has_next_page()) { - *eof = true; - } + bool need_convert = false; + auto& parquet_physical_type = _chunk_meta.meta_data.type; + auto& show_type = _field_schema->type.type; + + ColumnPtr src_column = ParquetConvert::get_column(parquet_physical_type, show_type, + doris_column, type, &need_convert); + + do { + if (_chunk_reader->remaining_num_values() == 0) { + if (!_chunk_reader->has_next_page()) { + *eof = true; + *read_rows = 0; return Status::OK(); } - skip_whole_batch = - batch_size <= remaining_num_values && select_vector.can_filter_all(batch_size); - if (skip_whole_batch) { - select_vector.skip(batch_size); - } + RETURN_IF_ERROR(_chunk_reader->next_page()); } - // load page data to decode or skip values - RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent()); - size_t has_read = 0; - for (auto& range : read_ranges) { - // generate the skipped values - size_t skip_values = range.first_row - _current_row_index; - RETURN_IF_ERROR(_skip_values(skip_values)); - _current_row_index += skip_values; - // generate the read values - size_t read_values = - std::min((size_t)(range.last_row - range.first_row), batch_size - has_read); - if (skip_whole_batch) { - RETURN_IF_ERROR(_skip_values(read_values)); - } else { - RETURN_IF_ERROR(_read_values(read_values, doris_column, type, select_vector, - is_dict_filter)); + if (_nested_column) { + RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent()); + RETURN_IF_ERROR(_read_nested_column(src_column, type, select_vector, batch_size, + read_rows, eof, is_dict_filter)); + break; + } + + // generate the row ranges that should be read + std::list<RowRange> read_ranges; + _generate_read_ranges(_current_row_index, + _current_row_index + _chunk_reader->remaining_num_values(), + read_ranges); + if (read_ranges.size() == 0) { + // skip the whole page + _current_row_index += _chunk_reader->remaining_num_values(); + RETURN_IF_ERROR(_chunk_reader->skip_page()); + *read_rows = 0; + } else { + bool skip_whole_batch = false; + // Determining whether to skip page or batch will increase the calculation time. + // When the filtering effect is greater than 60%, it is possible to skip the page or batch. + if (select_vector.has_filter() && select_vector.filter_ratio() > 0.6) { Review Comment: warning: 0.6 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp if (select_vector.has_filter() && select_vector.filter_ratio() > 0.6) { ^ ``` ########## be/src/vec/exec/format/parquet/parquet_column_convert.h: ########## @@ -0,0 +1,665 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <gen_cpp/parquet_types.h> + +#include <algorithm> +#include <functional> +#include <ostream> +#include <utility> + +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "gen_cpp/descriptors.pb.h" +#include "gutil/endian.h" +#include "gutil/strings/numbers.h" +#include "io/file_factory.h" +#include "olap/olap_common.h" +#include "util/coding.h" +#include "util/slice.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/decoder.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +namespace ParquetConvert { + +template <tparquet::Type::type ParquetType> +struct PhysicalTypeTraits {}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT32> { + using DataType = int32_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BOOLEAN> { + using DataType = uint8; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT64> { + using DataType = int64_t; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FLOAT> { + using DataType = float; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::DOUBLE> { + using DataType = double; + using ColumnType = ColumnVector<DataType>; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::FIXED_LEN_BYTE_ARRAY> { + using DataType = String; + using ColumnType = ColumnString; +}; + +template <> +struct PhysicalTypeTraits<tparquet::Type::INT96> { + using DataType = ParquetInt96; + using ColumnType = ColumnVector<Int8>; +}; + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +#define FOR_LOGICAL_DECIMAL_TYPES(M) \ + M(TypeIndex::Decimal32, Decimal32, Int32) \ + M(TypeIndex::Decimal64, Decimal64, Int64) \ + M(TypeIndex::Decimal128, Decimal128, Int128) \ + M(TypeIndex::Decimal128I, Decimal128, Int128) + +struct ConvertParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + size_t offset_days = 0; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; + FieldSchema* field_schema = nullptr; + size_t start_idx = 0; + + void init(FieldSchema* field_schema_, cctz::time_zone* ctz_, size_t start_idx_ = 0) { + field_schema = field_schema_; + if (ctz_ != nullptr) { + ctz = ctz_; + } + const auto& schema = field_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + ctz = const_cast<cctz::time_zone*>(&utc0); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + second_mask = 1000000000; + scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } + } + + if (ctz) { + VecDateTimeValue t; + t.from_unixtime(0, *ctz); + offset_days = t.day() == 31 ? 0 : 1; + } + start_idx = start_idx_; + } + + template <typename DecimalPrimitiveType> + void init_decimal_converter(DataTypePtr& data_type) { + if (field_schema == nullptr || decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { + return; + } + auto scale = field_schema->parquet_schema.scale; + auto* decimal_type = static_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>( + const_cast<IDataType*>(remove_nullable(data_type).get())); + auto dest_scale = decimal_type->get_scale(); + if (dest_scale > scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; + decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); + } else { + decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; + decimal_scale.scale_factor = 1; + } + } +}; + +/* +* parquet_physical_type : The type of data stored in parquet. +* Read data into columns returned by get_column according to the physical type of parquet. +* show_type : The data format that should be displayed. +* doris_column : What type of column does the upper layer need to put the data in. +* +* example : +* In hive, if decimal is stored as FIXED_LENBYTE_ARRAY in parquet, +* then we use `ALTER TABLE TableName CHANGE COLUMN Col_Decimal Col_Decimal String;` +* to convert this column to string type. +* parquet_type : FIXED_LEN_BYTE_ARRAY. +* ans_data_type : ColumnInt8 +* show_type : Decimal. +* doris_column : ColumnString. +*/ +ColumnPtr get_column(tparquet::Type::type parquet_physical_type, PrimitiveType show_type, + ColumnPtr& doris_column, DataTypePtr& doris_type, bool* need_convert); + +struct ColumnConvert { + virtual Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) { return Status::OK(); } + + virtual ~ColumnConvert() = default; + + void convert_null(ColumnPtr& src_col, MutableColumnPtr& dst_col) { + src_col = remove_nullable(src_col); + dst_col = remove_nullable(dst_col->get_ptr())->assume_mutable(); + } + +public: + ConvertParams* _convert_params; +}; + +template <tparquet::Type::type parquet_physical_type, typename dst_type> +struct NumberToNumberConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<dst_type>&>(*dst_col.get()).get_data(); + for (int i = 0; i < rows; i++) { + dst_type value = static_cast<dst_type>(src_data[i]); + data[_convert_params->start_idx + i] = value; + } + + return Status::OK(); + } +}; + +template <tparquet::Type::type parquet_physical_type> +struct NumberToStringConvert : public ColumnConvert { + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + using ColumnType = typename PhysicalTypeTraits<parquet_physical_type>::ColumnType; + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto& src_data = static_cast<const ColumnType*>(src_col.get())->get_data(); + + char buf[100]; + auto str_col = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + if constexpr (parquet_physical_type == tparquet::Type::FLOAT) { + int len = FastFloatToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + + } else if constexpr (parquet_physical_type == tparquet::Type::DOUBLE) { + int len = FastDoubleToBuffer(src_data[i], buf, true); + str_col->insert_data(buf, len); + } else if constexpr (parquet_physical_type == tparquet::Type::INT32) { + char* end = FastInt32ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else if constexpr (parquet_physical_type == tparquet::Type::INT64) { + char* end = FastInt64ToBufferLeft(src_data[i], buf); + str_col->insert_data(buf, end - buf); + + } else { + string value = std::to_string(src_data[i]); + str_col->insert_data(value.data(), value.size()); + } + } + return Status::OK(); + } +}; + +struct Int96toTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto ParquetInt96_data = (ParquetInt96*)src_data.data(); + dst_col->resize(_convert_params->start_idx + rows); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + ParquetInt96 x = ParquetInt96_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = x.to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); + value.set_microsecond(micros % 1000000); + } + return Status::OK(); + } +}; + +struct Int64ToTimestamp : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto src_data = static_cast<const ColumnVector<int64_t>*>(src_col.get())->get_data().data(); + auto& data = static_cast<ColumnVector<UInt64>*>(dst_col.get())->get_data(); + + for (int i = 0; i < rows; i++) { + int64_t x = src_data[i]; + auto& num = data[_convert_params->start_idx + i]; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + value.from_unixtime(x / _convert_params->second_mask, *_convert_params->ctz); + value.set_microsecond((x % _convert_params->second_mask) * + (_convert_params->scale_to_nano_factor / 1000)); + } + return Status::OK(); + } +}; + +class Int32ToDate : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + auto& data = static_cast<ColumnDateV2*>(dst_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + for (int i = 0; i < rows; i++) { + auto& value = reinterpret_cast<DateV2Value<DateV2ValueType>&>( + data[_convert_params->start_idx + i]); + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + value = date_dict[date_value]; + } + + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType, DecimalScaleParams::ScaleType ScaleType> +class StringToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + dst_col->resize(_convert_params->start_idx + rows); + + auto& data = static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data(); + for (int i = 0; i < rows; i++) { + size_t len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) { + // do nothing + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + auto& v = reinterpret_cast<DecimalType&>(data[_convert_params->start_idx + i]); + v = (DecimalType)value; + } + + return Status::OK(); + } +}; +template <typename NumberType, typename DecimalPhysicalType, typename ValueCopyType, + DecimalScaleParams::ScaleType ScaleType> +class NumberToDecimal : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + auto* src_data = + static_cast<const ColumnVector<NumberType>*>(src_col.get())->get_data().data(); + dst_col->resize(_convert_params->start_idx + rows); + + DecimalScaleParams& scale_params = _convert_params->decimal_scale; + auto* data = static_cast<ColumnDecimal<Decimal<DecimalPhysicalType>>*>(dst_col.get()) + ->get_data() + .data(); + + for (int i = 0; i < rows; i++) { + ValueCopyType value = src_data[i]; + if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + data[_convert_params->start_idx + i] = (DecimalPhysicalType)value; + } + return Status::OK(); + } +}; + +template <typename DecimalType, typename ValueCopyType> +class StringToDecimalString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data(); + auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets(); + + auto data = static_cast<ColumnString*>(dst_col.get()); + for (int i = 0; i < rows; i++) { + int len = offset[i] - offset[i - 1]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + ValueCopyType value = 0; + memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len); + value = BitUtil::big_endian_to_host(value); + value = value >> ((sizeof(value) - len) * 8); + std::string ans = reinterpret_cast<DecimalType&>(value).to_string( + _convert_params->field_schema->parquet_schema.scale); + data->insert_data(ans.data(), ans.size()); + } + return Status::OK(); + } +}; + +class Int32ToDateString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + size_t rows = src_col->size(); + + auto& src_data = static_cast<const ColumnVector<int32>*>(src_col.get())->get_data(); + date_day_offset_dict& date_dict = date_day_offset_dict::get(); + + auto str_col = static_cast<ColumnString*>(dst_col.get()); + char buf[50]; + for (int i = 0; i < rows; i++) { + int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days; + DateV2Value<DateV2ValueType> value = date_dict[date_value]; + char* end = value.to_string(buf); + str_col->insert_data(buf, end - buf); + } + + return Status::OK(); + } +}; + +class Int96ToTimestampString : public ColumnConvert { +public: + Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override { + convert_null(src_col, dst_col); + + auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data(); + auto dst_data = static_cast<ColumnString*>(dst_col.get()); + + size_t rows = src_col->size() / sizeof(ParquetInt96); + ParquetInt96* data = (ParquetInt96*)src_data.data(); + + char buf[50]; + for (int i = 0; i < rows; i++) { + uint64_t num = 0; + auto& value = reinterpret_cast<DateV2Value<DateTimeV2ValueType>&>(num); + int64_t micros = data[i].to_timestamp_micros(); + value.from_unixtime(micros / 1000000, *_convert_params->ctz); Review Comment: warning: 1000000 is a magic number; consider replacing it with a named constant [readability-magic-numbers] ```cpp value.from_unixtime(micros / 1000000, *_convert_params->ctz); ^ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
