zclllyybb commented on code in PR #47307: URL: https://github.com/apache/doris/pull/47307#discussion_r1924819842
########## be/src/vec/functions/function_compress.cpp: ########## @@ -0,0 +1,299 @@ +#include <glog/logging.h> + +#include <cctype> +#include <cstddef> +#include <cstring> +#include <memory> +#include <string> +#include <utility> + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared<FunctionCompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // LOG(INFO) << "Executing FunctionCompress with " << input_rows_count + // << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + auto result_column = ColumnString::create(); + + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + faststring compressed_str; + Slice data; + for (int row = 0; row < input_rows_count; row++) { + null_map[row] = false; + const auto& str = arg_column.get_data_at(row); + data = Slice(str.data, str.size); + + // Print the original string (before compression) + // LOG(INFO) << "Original string at row " << row << ": " + // << std::string(str.data, str.size); + + auto st = compression_codec->compress(data, &compressed_str); + + if (!st.ok()) { + // LOG(INFO) << "Compression failed at row " << row + // << ", skipping this row."; // Log failure + col_offset[row] = col_offset[row - 1]; + null_map[row] = true; + continue; + } + + size_t idx = col_data.size(); + if (!str.size) { // null -> 0x + col_data.resize(col_data.size() + 2); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + col_offset[row] = col_offset[row - 1] + 2; + continue; + } + + // first ten digits represent the length of the uncompressed string + int value = (int)str.size; + col_data.resize(col_data.size() + 10); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + for (int i = 0; i < 4; i++) { + unsigned char byte = (value >> (i * 8)) & 0xFF; + col_data[idx + 2 + i * 2] = "0123456789ABCDEF"[byte >> 4]; // 高4位 Review Comment: dont use Chinese ########## be/src/vec/functions/function_compress.cpp: ########## @@ -0,0 +1,299 @@ +#include <glog/logging.h> + +#include <cctype> +#include <cstddef> +#include <cstring> +#include <memory> +#include <string> +#include <utility> + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared<FunctionCompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // LOG(INFO) << "Executing FunctionCompress with " << input_rows_count Review Comment: remove these commented lines ########## regression-test/suites/query_p0/sql_functions/string_functions/test_compress_uncompress.groovy: ########## Review Comment: add cases like `regression-test/suites/query_p0/sql_functions/test_template_one_arg.groovy` did ########## be/test/vec/function/function_string_test.cpp: ########## @@ -3340,6 +3338,56 @@ TEST(function_string_test, function_rpad_test) { {{Null(), std::int32_t(0), Null()}, Null()}, }; + TEST(function_string_test, function_compress_uncompress_test) { + { + std::string func_name = "compress"; + InputTypeSet input_types = {TypeIndex::String}; + + // 压缩多个不同的字符串 Review Comment: dont use Chinese comment ########## be/src/vec/functions/function_compress.cpp: ########## @@ -0,0 +1,299 @@ +#include <glog/logging.h> + +#include <cctype> +#include <cstddef> +#include <cstring> +#include <memory> +#include <string> +#include <utility> + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared<FunctionCompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // LOG(INFO) << "Executing FunctionCompress with " << input_rows_count + // << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + auto result_column = ColumnString::create(); + + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + faststring compressed_str; + Slice data; + for (int row = 0; row < input_rows_count; row++) { + null_map[row] = false; + const auto& str = arg_column.get_data_at(row); + data = Slice(str.data, str.size); + + // Print the original string (before compression) + // LOG(INFO) << "Original string at row " << row << ": " + // << std::string(str.data, str.size); + + auto st = compression_codec->compress(data, &compressed_str); + + if (!st.ok()) { + // LOG(INFO) << "Compression failed at row " << row + // << ", skipping this row."; // Log failure + col_offset[row] = col_offset[row - 1]; + null_map[row] = true; + continue; + } + + size_t idx = col_data.size(); + if (!str.size) { // null -> 0x + col_data.resize(col_data.size() + 2); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + col_offset[row] = col_offset[row - 1] + 2; + continue; + } + + // first ten digits represent the length of the uncompressed string + int value = (int)str.size; + col_data.resize(col_data.size() + 10); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + for (int i = 0; i < 4; i++) { + unsigned char byte = (value >> (i * 8)) & 0xFF; + col_data[idx + 2 + i * 2] = "0123456789ABCDEF"[byte >> 4]; // 高4位 + col_data[idx + 3 + i * 2] = "0123456789ABCDEF"[byte & 0x0F]; // 低4位 + } + idx += 10; + + col_data.resize(col_data.size() + 2 * compressed_str.size()); + // memcpy(col_data.data() + col_data.size(), compressed_str.data(), compressed_str.size()); + + unsigned char* src = compressed_str.data(); + { + auto transform = [](char ch) -> unsigned char { + char x; + if (ch < 10) { + x = ch + '0'; + } else { + x = ch - 10 + 'A'; + } + // LOG(INFO) << "transform" << (int)x << "->" << x; + return x; + }; + for (int i = 0; i < compressed_str.size(); i++) { + col_data[idx] = transform(((*src) >> 4) & 0x0F); + col_data[idx + 1] = transform(*src & 0x0F); + LOG(INFO) << (unsigned int)(*src) << " -> " << (unsigned int)col_data[idx] + << " and " << (unsigned int)col_data[idx + 1]; + idx += 2; + src++; + } + + // Print the compressed string (after compression) + // LOG(INFO) << "Compressed string at row " << row << ": " + // << std::string(reinterpret_cast<const char*>(col_data.data())); + col_offset[row] = col_offset[row - 1] + 10 + compressed_str.size() * 2; Review Comment: What's this value for? ########## gensrc/script/doris_builtins_functions.py: ########## Review Comment: we dont need modify this file anymore ########## conf/be.conf: ########## Review Comment: dont push this ########## bin/fe.pid: ########## Review Comment: dont push this ########## conf/fe.conf: ########## Review Comment: dont push this ########## be/src/vec/functions/function_compress.cpp: ########## @@ -0,0 +1,299 @@ +#include <glog/logging.h> + +#include <cctype> +#include <cstddef> +#include <cstring> +#include <memory> +#include <string> +#include <utility> + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared<FunctionCompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // LOG(INFO) << "Executing FunctionCompress with " << input_rows_count + // << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + auto result_column = ColumnString::create(); + + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + faststring compressed_str; + Slice data; + for (int row = 0; row < input_rows_count; row++) { + null_map[row] = false; + const auto& str = arg_column.get_data_at(row); + data = Slice(str.data, str.size); + + // Print the original string (before compression) + // LOG(INFO) << "Original string at row " << row << ": " + // << std::string(str.data, str.size); + + auto st = compression_codec->compress(data, &compressed_str); + + if (!st.ok()) { Review Comment: add a comment about when will it fails ########## be/src/vec/functions/function_compress.cpp: ########## @@ -0,0 +1,299 @@ +#include <glog/logging.h> + +#include <cctype> +#include <cstddef> +#include <cstring> +#include <memory> +#include <string> +#include <utility> + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared<FunctionCompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // LOG(INFO) << "Executing FunctionCompress with " << input_rows_count + // << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + auto result_column = ColumnString::create(); + + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + faststring compressed_str; + Slice data; + for (int row = 0; row < input_rows_count; row++) { + null_map[row] = false; + const auto& str = arg_column.get_data_at(row); + data = Slice(str.data, str.size); + + // Print the original string (before compression) + // LOG(INFO) << "Original string at row " << row << ": " + // << std::string(str.data, str.size); + + auto st = compression_codec->compress(data, &compressed_str); + + if (!st.ok()) { + // LOG(INFO) << "Compression failed at row " << row + // << ", skipping this row."; // Log failure + col_offset[row] = col_offset[row - 1]; + null_map[row] = true; + continue; + } + + size_t idx = col_data.size(); + if (!str.size) { // null -> 0x + col_data.resize(col_data.size() + 2); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + col_offset[row] = col_offset[row - 1] + 2; + continue; + } + + // first ten digits represent the length of the uncompressed string + int value = (int)str.size; + col_data.resize(col_data.size() + 10); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + for (int i = 0; i < 4; i++) { + unsigned char byte = (value >> (i * 8)) & 0xFF; + col_data[idx + 2 + i * 2] = "0123456789ABCDEF"[byte >> 4]; // 高4位 + col_data[idx + 3 + i * 2] = "0123456789ABCDEF"[byte & 0x0F]; // 低4位 + } + idx += 10; + + col_data.resize(col_data.size() + 2 * compressed_str.size()); + // memcpy(col_data.data() + col_data.size(), compressed_str.data(), compressed_str.size()); + + unsigned char* src = compressed_str.data(); + { + auto transform = [](char ch) -> unsigned char { + char x; + if (ch < 10) { + x = ch + '0'; + } else { + x = ch - 10 + 'A'; + } + // LOG(INFO) << "transform" << (int)x << "->" << x; + return x; + }; + for (int i = 0; i < compressed_str.size(); i++) { + col_data[idx] = transform(((*src) >> 4) & 0x0F); + col_data[idx + 1] = transform(*src & 0x0F); + LOG(INFO) << (unsigned int)(*src) << " -> " << (unsigned int)col_data[idx] + << " and " << (unsigned int)col_data[idx + 1]; + idx += 2; + src++; + } + + // Print the compressed string (after compression) + // LOG(INFO) << "Compressed string at row " << row << ": " + // << std::string(reinterpret_cast<const char*>(col_data.data())); + col_offset[row] = col_offset[row - 1] + 10 + compressed_str.size() * 2; + } + } + + block.replace_by_position( + result, ColumnNullable::create(std::move(result_column), std::move(null_column))); + return Status::OK(); + } +}; + +class FunctionUncompress : public IFunction { +public: + static constexpr auto name = "uncompress"; + static FunctionPtr create() { return std::make_shared<FunctionUncompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + LOG(INFO) << "Executing FunctionUncompress with " << input_rows_count + << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + + auto result_column = ColumnString::create(); + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + std::string uncompressed; + Slice data; + Slice uncompressed_slice; + for (int row = 0; row < input_rows_count; row++) { + auto check = [](char x) { + if (x >= '0' && x <= '9') return true; + if (x >= 'a' && x <= 'f') return true; + if (x >= 'A' && x <= 'F') return true; + return false; + }; + auto trans = [](char x) { + if (x >= '0' && x <= '9') { + return x - '0'; + } + if (x >= 'A' && x <= 'F') { + return x - 'A' + 10; + } + return x - 'a' + 10; + }; + + null_map[row] = false; + const auto& str = arg_column.get_data_at(row); + data = Slice(str.data, str.size); + + // Print the compressed string (before decompression) + // LOG(INFO) << "Compressed string at row " << row << ": " << data.to_string() << ' ' + // << data.size; + + int illegal = 0; + if ((int)str.size < 10 || (int)str.size % 2 == 1) { + // The first ten digits are "0x" and length, followed by hexadecimal, each two digits is a byte + illegal = 1; + } else { + if (data[0] != '0' || data[1] != 'x') { + LOG(INFO) << "illegal: " Review Comment: dont log info here ########## be/src/vec/functions/function_compress.cpp: ########## @@ -0,0 +1,299 @@ +#include <glog/logging.h> + +#include <cctype> +#include <cstddef> +#include <cstring> +#include <memory> +#include <string> +#include <utility> + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared<FunctionCompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // LOG(INFO) << "Executing FunctionCompress with " << input_rows_count + // << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + auto result_column = ColumnString::create(); + + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + faststring compressed_str; + Slice data; + for (int row = 0; row < input_rows_count; row++) { + null_map[row] = false; + const auto& str = arg_column.get_data_at(row); + data = Slice(str.data, str.size); + + // Print the original string (before compression) + // LOG(INFO) << "Original string at row " << row << ": " + // << std::string(str.data, str.size); + + auto st = compression_codec->compress(data, &compressed_str); + + if (!st.ok()) { + // LOG(INFO) << "Compression failed at row " << row + // << ", skipping this row."; // Log failure + col_offset[row] = col_offset[row - 1]; + null_map[row] = true; + continue; + } + + size_t idx = col_data.size(); + if (!str.size) { // null -> 0x + col_data.resize(col_data.size() + 2); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + col_offset[row] = col_offset[row - 1] + 2; + continue; + } + + // first ten digits represent the length of the uncompressed string + int value = (int)str.size; + col_data.resize(col_data.size() + 10); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + for (int i = 0; i < 4; i++) { + unsigned char byte = (value >> (i * 8)) & 0xFF; + col_data[idx + 2 + i * 2] = "0123456789ABCDEF"[byte >> 4]; // 高4位 + col_data[idx + 3 + i * 2] = "0123456789ABCDEF"[byte & 0x0F]; // 低4位 + } + idx += 10; + + col_data.resize(col_data.size() + 2 * compressed_str.size()); + // memcpy(col_data.data() + col_data.size(), compressed_str.data(), compressed_str.size()); + + unsigned char* src = compressed_str.data(); + { + auto transform = [](char ch) -> unsigned char { + char x; + if (ch < 10) { + x = ch + '0'; + } else { + x = ch - 10 + 'A'; + } + // LOG(INFO) << "transform" << (int)x << "->" << x; + return x; + }; + for (int i = 0; i < compressed_str.size(); i++) { + col_data[idx] = transform(((*src) >> 4) & 0x0F); + col_data[idx + 1] = transform(*src & 0x0F); + LOG(INFO) << (unsigned int)(*src) << " -> " << (unsigned int)col_data[idx] + << " and " << (unsigned int)col_data[idx + 1]; + idx += 2; + src++; + } + + // Print the compressed string (after compression) + // LOG(INFO) << "Compressed string at row " << row << ": " + // << std::string(reinterpret_cast<const char*>(col_data.data())); + col_offset[row] = col_offset[row - 1] + 10 + compressed_str.size() * 2; + } + } + + block.replace_by_position( + result, ColumnNullable::create(std::move(result_column), std::move(null_column))); + return Status::OK(); + } +}; + +class FunctionUncompress : public IFunction { +public: + static constexpr auto name = "uncompress"; + static FunctionPtr create() { return std::make_shared<FunctionUncompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + LOG(INFO) << "Executing FunctionUncompress with " << input_rows_count + << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + + auto result_column = ColumnString::create(); + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + std::string uncompressed; + Slice data; + Slice uncompressed_slice; + for (int row = 0; row < input_rows_count; row++) { + auto check = [](char x) { + if (x >= '0' && x <= '9') return true; + if (x >= 'a' && x <= 'f') return true; + if (x >= 'A' && x <= 'F') return true; + return false; + }; + auto trans = [](char x) { Review Comment: just use `from_chars` and `to_chars` to replace your user implemented lambdas ########## be/src/vec/functions/function_compress.cpp: ########## @@ -0,0 +1,299 @@ +#include <glog/logging.h> + +#include <cctype> +#include <cstddef> +#include <cstring> +#include <memory> +#include <string> +#include <utility> + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared<FunctionCompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // LOG(INFO) << "Executing FunctionCompress with " << input_rows_count + // << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + auto result_column = ColumnString::create(); + + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + faststring compressed_str; + Slice data; + for (int row = 0; row < input_rows_count; row++) { + null_map[row] = false; + const auto& str = arg_column.get_data_at(row); + data = Slice(str.data, str.size); + + // Print the original string (before compression) + // LOG(INFO) << "Original string at row " << row << ": " + // << std::string(str.data, str.size); + + auto st = compression_codec->compress(data, &compressed_str); + + if (!st.ok()) { + // LOG(INFO) << "Compression failed at row " << row + // << ", skipping this row."; // Log failure + col_offset[row] = col_offset[row - 1]; + null_map[row] = true; + continue; + } + + size_t idx = col_data.size(); + if (!str.size) { // null -> 0x + col_data.resize(col_data.size() + 2); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + col_offset[row] = col_offset[row - 1] + 2; + continue; + } + + // first ten digits represent the length of the uncompressed string + int value = (int)str.size; + col_data.resize(col_data.size() + 10); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + for (int i = 0; i < 4; i++) { + unsigned char byte = (value >> (i * 8)) & 0xFF; + col_data[idx + 2 + i * 2] = "0123456789ABCDEF"[byte >> 4]; // 高4位 + col_data[idx + 3 + i * 2] = "0123456789ABCDEF"[byte & 0x0F]; // 低4位 + } + idx += 10; + + col_data.resize(col_data.size() + 2 * compressed_str.size()); + // memcpy(col_data.data() + col_data.size(), compressed_str.data(), compressed_str.size()); + + unsigned char* src = compressed_str.data(); + { + auto transform = [](char ch) -> unsigned char { + char x; + if (ch < 10) { + x = ch + '0'; + } else { + x = ch - 10 + 'A'; + } + // LOG(INFO) << "transform" << (int)x << "->" << x; + return x; + }; + for (int i = 0; i < compressed_str.size(); i++) { + col_data[idx] = transform(((*src) >> 4) & 0x0F); + col_data[idx + 1] = transform(*src & 0x0F); + LOG(INFO) << (unsigned int)(*src) << " -> " << (unsigned int)col_data[idx] + << " and " << (unsigned int)col_data[idx + 1]; + idx += 2; + src++; + } + + // Print the compressed string (after compression) + // LOG(INFO) << "Compressed string at row " << row << ": " + // << std::string(reinterpret_cast<const char*>(col_data.data())); + col_offset[row] = col_offset[row - 1] + 10 + compressed_str.size() * 2; + } + } + + block.replace_by_position( + result, ColumnNullable::create(std::move(result_column), std::move(null_column))); + return Status::OK(); + } +}; + +class FunctionUncompress : public IFunction { +public: + static constexpr auto name = "uncompress"; + static FunctionPtr create() { return std::make_shared<FunctionUncompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + LOG(INFO) << "Executing FunctionUncompress with " << input_rows_count + << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + + auto result_column = ColumnString::create(); + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + std::string uncompressed; + Slice data; + Slice uncompressed_slice; + for (int row = 0; row < input_rows_count; row++) { Review Comment: use `size_t`, not `int` ########## be/src/vec/functions/function_compress.cpp: ########## @@ -0,0 +1,299 @@ +#include <glog/logging.h> + +#include <cctype> +#include <cstddef> +#include <cstring> +#include <memory> +#include <string> +#include <utility> + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared<FunctionCompress>(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared<DataTypeString>()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // LOG(INFO) << "Executing FunctionCompress with " << input_rows_count + // << " rows."; // Log the number of rows being processed + + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column); + auto result_column = ColumnString::create(); + + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + faststring compressed_str; + Slice data; + for (int row = 0; row < input_rows_count; row++) { + null_map[row] = false; + const auto& str = arg_column.get_data_at(row); + data = Slice(str.data, str.size); + + // Print the original string (before compression) + // LOG(INFO) << "Original string at row " << row << ": " + // << std::string(str.data, str.size); + + auto st = compression_codec->compress(data, &compressed_str); + + if (!st.ok()) { + // LOG(INFO) << "Compression failed at row " << row + // << ", skipping this row."; // Log failure + col_offset[row] = col_offset[row - 1]; + null_map[row] = true; + continue; + } + + size_t idx = col_data.size(); + if (!str.size) { // null -> 0x + col_data.resize(col_data.size() + 2); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + col_offset[row] = col_offset[row - 1] + 2; + continue; + } + + // first ten digits represent the length of the uncompressed string + int value = (int)str.size; + col_data.resize(col_data.size() + 10); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + for (int i = 0; i < 4; i++) { + unsigned char byte = (value >> (i * 8)) & 0xFF; + col_data[idx + 2 + i * 2] = "0123456789ABCDEF"[byte >> 4]; // 高4位 Review Comment: and make magic values -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org