Copilot commented on code in PR #60540: URL: https://github.com/apache/doris/pull/60540#discussion_r2772578172
########## be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h: ########## @@ -0,0 +1,352 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <limits> +#include <utility> +#include <vector> + +#include "common/config.h" +#include "common/object_pool.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/common/sort/sorter.h" +#include "vec/core/block.h" +#include "vec/exprs/vslot_ref.h" +#include "vec/sink/writer/iceberg/viceberg_partition_writer.h" +#include "vec/sink/writer/iceberg/vpartition_writer_base.h" +#include "vec/spill/spill_stream.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris { +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { +class VIcebergSortWriter : public IPartitionWriterBase { +public: + using CreateWriterLambda = std::function<std::shared_ptr<VIcebergPartitionWriter>( + const std::string* file_name, int file_name_index)>; + + VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter> partition_writer, + TSortInfo sort_info, int64_t target_file_size_bytes, + CreateWriterLambda create_writer_lambda = nullptr) + : _sort_info(std::move(sort_info)), + _iceberg_partition_writer(std::move(partition_writer)), + _create_writer_lambda(std::move(create_writer_lambda)), + _target_file_size_bytes(target_file_size_bytes) {} + + Status open(RuntimeState* state, RuntimeProfile* profile, + const RowDescriptor* row_desc) override { + DCHECK(row_desc != nullptr); + _runtime_state = state; + _profile = profile; + _row_desc = row_desc; + + RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc)); + RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + + _sorter = vectorized::FullSorter::create_unique( + _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, _sort_info.nulls_first, + *row_desc, state, _profile); + _sorter->init_profile(_profile); + _sorter->set_enable_spill(); + _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", TUnit::UNIT); + RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc)); + return Status::OK(); + } + + Status write(vectorized::Block& block) override { + RETURN_IF_ERROR(_sorter->append_block(&block)); + _update_spill_block_batch_row_count(block); + // sort in memory and write directly to Parquet file Review Comment: The comment on line 79 mentions "Parquet file" specifically, but this writer handles both Parquet and ORC formats based on the table configuration. The comment should be format-agnostic. ```suggestion // sort in memory and write directly to the output data file ``` ########## regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out: ########## @@ -0,0 +1,33 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_0 -- +false 22 222 2.2 2.2 2222 8765.4321 8765.432100 987654321098765432.987654321099 bbb 2023-06-15 2023-06-15T23:45:01 +true 11 111 1.1 1.1 1111 1234.5678 1234.567890 123456789012345678.123456789012 aaa 2023-01-01 2023-01-01T12:34:56 + +-- !sql_1 -- +0 PARQUET 2 {1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2, 11:2, 12:2} {1:0, 2:0, 3:0, 4:0, 5:0, 6:0, 7:0, 8:0, 9:0, 10:0, 11:0, 12:0} {1:0x00, 2:0x0B000000, 3:0x6F00000000000000, 4:0xCDCC8C3F, 5:0x9A9999999999F13F, 6:0x00000457, 7:0x00BC614E, 8:0x00000000499602D2, 9:0x000000018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000, 12:0x005CE70F33F10500} {1:0x01, 2:0x16000000, 3:0xDE00000000000000, 4:0xCDCC0C40, 5:0x9A99999999990140, 6:0x000008AE, 7:0x05397FB1, 8:0x000000020A75E124, 9:0x0000000C7748819DFFB62505316873CB, 10:0x626262, 11:0x434C0000, 12:0x40D91FA833FE0500} + +-- !sql_2 -- +{"bigint_col":{"column_size":118, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":111, "upper_bound":222}, "boolean_col":{"column_size":49, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":0, "upper_bound":1}, "date_col":{"column_size":94, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":"2023-01-01", "upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":118, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":"2023-01-01 20:34:56.000000", "upper_bound":"2023-06-16 07:45:01.000000"}, "decimal_col1":{"column_size":94, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":1111, "upper_bound":2222}, "decimal_col2":{"column_size":94, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":1234.5678, "upper_bound":8765.4321}, "decimal_col3":{"column_size":118, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_b ound":1234.567890, "upper_bound":8765.432100}, "decimal_col4":{"column_size":166, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":123456789012345678.123456789012, "upper_bound":987654321098765432.987654321099}, "double_col":{"column_size":118, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, "float_col":{"column_size":94, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, "int_col":{"column_size":94, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":11, "upper_bound":22}, "string_col":{"column_size":86, "value_count":2, "null_value_count":0, "nan_value_count":null, "lower_bound":"aaa", "upper_bound":"bbb"}} + +-- !sql_3 -- +false 22 222 2.2 2.2 2222 8765.4321 8765.432100 987654321098765432.987654321099 bbb 2023-06-15 2023-06-15T23:45:01 +true 11 111 1.1 1.1 1111 1234.5678 1234.567890 123456789012345678.123456789012 aaa 2023-01-01 2023-01-01T12:34:56 + +-- !sql_4 -- +0 ORC 2 {1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2, 11:2, 12:2} {} {1:0x00, 2:0x0B00000000000000, 3:0x6F00000000000000, 4:0xCDCC8C3F, 5:0x9A9999999999F13F, 6:0x0457, 7:0x00BC614E, 8:0x075BCD15, 9:0x018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000, 12:0x005CE70F33F10500} {1:0x01, 2:0x1600000000000000, 3:0xDE00000000000000, 4:0xCDCC0C40, 5:0x9A99999999990140, 6:0x08AE, 7:0x05397FB1, 8:0x05397FB1, 9:0x0C7748819DFFB62505316873CB, 10:0x626262, 11:0x434C0000, 12:0x40D91FA833FE0500} + +-- !sql_5 -- +{"bigint_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":111, "upper_bound":222}, "boolean_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":0, "upper_bound":1}, "date_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":"2023-01-01", "upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":"2023-01-01 20:34:56.000000", "upper_bound":"2023-06-16 07:45:01.000000"}, "decimal_col1":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":1111, "upper_bound":2222}, "decimal_col2":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":1234.5678, "upper_bound":8765.4321}, "decimal_col3":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":123.456789, "upper_bound":87.654321}, "decimal_col4":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":123456789012345678.123456789012, "upper_bound":987654321098765432.987654321099}, "double_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, "float_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, "int_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":11, "upper_bound":22}, "string_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":"aaa", "upper_bound":"bbb"}} Review Comment: The test output shows that decimal_col3 bounds are incorrect for ORC format. In sql_5 (ORC readable_metrics), decimal_col3 shows lower_bound:123.456789 and upper_bound:87.654321, but these values are 10x smaller than the actual data (1234.567890 and 8765.432100) and the upper bound is actually less than the lower bound which is impossible. This indicates a bug in the ORC decimal statistics collection, likely in the _decimal_to_bytes or _collect_column_bounds methods. The Parquet version (sql_2) shows the correct bounds. ```suggestion {"bigint_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":111, "upper_bound":222}, "boolean_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":0, "upper_bound":1}, "date_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":"2023-01-01", "upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":"2023-01-01 20:34:56.000000", "upper_bound":"2023-06-16 07:45:01.000000"}, "decimal_col1":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":1111, "upper_bound":2222}, "decimal_col2":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":1234.5678, "upper_bound":8765.4321}, "decimal_col3":{"column_size":null, "value_count":2, "null_value_count":null , "nan_value_count":null, "lower_bound":1234.567890, "upper_bound":8765.432100}, "decimal_col4":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":123456789012345678.123456789012, "upper_bound":987654321098765432.987654321099}, "double_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, "float_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, "int_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":11, "upper_bound":22}, "string_col":{"column_size":null, "value_count":2, "null_value_count":null, "nan_value_count":null, "lower_bound":"aaa", "upper_bound":"bbb"}} ``` ########## be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h: ########## @@ -0,0 +1,352 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <limits> +#include <utility> +#include <vector> + +#include "common/config.h" +#include "common/object_pool.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/common/sort/sorter.h" +#include "vec/core/block.h" +#include "vec/exprs/vslot_ref.h" +#include "vec/sink/writer/iceberg/viceberg_partition_writer.h" +#include "vec/sink/writer/iceberg/vpartition_writer_base.h" +#include "vec/spill/spill_stream.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris { +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { +class VIcebergSortWriter : public IPartitionWriterBase { +public: + using CreateWriterLambda = std::function<std::shared_ptr<VIcebergPartitionWriter>( + const std::string* file_name, int file_name_index)>; + + VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter> partition_writer, + TSortInfo sort_info, int64_t target_file_size_bytes, + CreateWriterLambda create_writer_lambda = nullptr) + : _sort_info(std::move(sort_info)), + _iceberg_partition_writer(std::move(partition_writer)), + _create_writer_lambda(std::move(create_writer_lambda)), + _target_file_size_bytes(target_file_size_bytes) {} + + Status open(RuntimeState* state, RuntimeProfile* profile, + const RowDescriptor* row_desc) override { + DCHECK(row_desc != nullptr); + _runtime_state = state; + _profile = profile; + _row_desc = row_desc; + + RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc)); + RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + + _sorter = vectorized::FullSorter::create_unique( + _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, _sort_info.nulls_first, + *row_desc, state, _profile); + _sorter->init_profile(_profile); + _sorter->set_enable_spill(); + _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", TUnit::UNIT); + RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc)); + return Status::OK(); + } + + Status write(vectorized::Block& block) override { + RETURN_IF_ERROR(_sorter->append_block(&block)); + _update_spill_block_batch_row_count(block); + // sort in memory and write directly to Parquet file + if (_sorter->data_size() >= _target_file_size_bytes) { + return _flush_to_file(); + } + // trigger_spill() will be called by memory management system + return Status::OK(); + } + + Status close(const Status& status) override { + Defer defer {[&]() { + Status st = _iceberg_partition_writer->close(status); + if (!st.ok()) { + LOG(WARNING) << fmt::format("_iceberg_partition_writer close failed, reason: {}", + st.to_string()); + } + _cleanup_spill_streams(); + }}; + + if (!status.ok() || _runtime_state->is_cancelled()) { + return status; + } + + if (_sorter == nullptr) { + return Status::OK(); + } + + if (!_sorter->merge_sort_state()->unsorted_block()->empty() || + !_sorter->merge_sort_state()->get_sorted_block().empty()) { + if (_sorted_streams.empty()) { + // data remaining in memory + RETURN_IF_ERROR(_sorter->_do_sort()); + RETURN_IF_ERROR(_sorter->prepare_for_read(false)); + RETURN_IF_ERROR(_write_sorted_data()); + return Status::OK(); + } + + // spill remaining data + RETURN_IF_ERROR(_do_spill()); + } + + // Merge all spilled streams and output final sorted data + if (!_sorted_streams.empty()) { + RETURN_IF_ERROR(_combine_files_output()); + } + + return Status::OK(); + } + + inline const std::string& file_name() const override { + return _iceberg_partition_writer->file_name(); + } + + inline int file_name_index() const override { + return _iceberg_partition_writer->file_name_index(); + } + + inline size_t written_len() const override { return _iceberg_partition_writer->written_len(); } + + auto sorter() const { return _sorter.get(); } + + Status trigger_spill() { return _do_spill(); } + +private: + // how many rows need in spill block batch + void _update_spill_block_batch_row_count(const vectorized::Block& block) { + auto rows = block.rows(); + if (rows > 0 && 0 == _avg_row_bytes) { + _avg_row_bytes = std::max(1UL, block.bytes() / rows); + int64_t spill_batch_bytes = _runtime_state->spill_sort_batch_bytes(); // default 8MB + _spill_block_batch_row_count = + (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes; + } + } + + // have enought data, flush in-memory sorted data to file Review Comment: There's a spelling error in the comment. "enought" should be "enough". ```suggestion // have enough data, flush in-memory sorted data to file ``` ########## be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h: ########## @@ -0,0 +1,352 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstdint> +#include <limits> +#include <utility> +#include <vector> + +#include "common/config.h" +#include "common/object_pool.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/common/sort/sorter.h" +#include "vec/core/block.h" +#include "vec/exprs/vslot_ref.h" +#include "vec/sink/writer/iceberg/viceberg_partition_writer.h" +#include "vec/sink/writer/iceberg/vpartition_writer_base.h" +#include "vec/spill/spill_stream.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris { +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { +class VIcebergSortWriter : public IPartitionWriterBase { +public: + using CreateWriterLambda = std::function<std::shared_ptr<VIcebergPartitionWriter>( + const std::string* file_name, int file_name_index)>; + + VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter> partition_writer, + TSortInfo sort_info, int64_t target_file_size_bytes, + CreateWriterLambda create_writer_lambda = nullptr) + : _sort_info(std::move(sort_info)), + _iceberg_partition_writer(std::move(partition_writer)), + _create_writer_lambda(std::move(create_writer_lambda)), + _target_file_size_bytes(target_file_size_bytes) {} + + Status open(RuntimeState* state, RuntimeProfile* profile, + const RowDescriptor* row_desc) override { + DCHECK(row_desc != nullptr); + _runtime_state = state; + _profile = profile; + _row_desc = row_desc; + + RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc)); + RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + + _sorter = vectorized::FullSorter::create_unique( + _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, _sort_info.nulls_first, + *row_desc, state, _profile); + _sorter->init_profile(_profile); + _sorter->set_enable_spill(); + _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", TUnit::UNIT); + RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, row_desc)); + return Status::OK(); + } + + Status write(vectorized::Block& block) override { + RETURN_IF_ERROR(_sorter->append_block(&block)); + _update_spill_block_batch_row_count(block); + // sort in memory and write directly to Parquet file + if (_sorter->data_size() >= _target_file_size_bytes) { + return _flush_to_file(); + } + // trigger_spill() will be called by memory management system + return Status::OK(); + } + + Status close(const Status& status) override { + Defer defer {[&]() { + Status st = _iceberg_partition_writer->close(status); + if (!st.ok()) { + LOG(WARNING) << fmt::format("_iceberg_partition_writer close failed, reason: {}", + st.to_string()); + } + _cleanup_spill_streams(); + }}; + + if (!status.ok() || _runtime_state->is_cancelled()) { + return status; + } + + if (_sorter == nullptr) { + return Status::OK(); + } + + if (!_sorter->merge_sort_state()->unsorted_block()->empty() || + !_sorter->merge_sort_state()->get_sorted_block().empty()) { + if (_sorted_streams.empty()) { + // data remaining in memory + RETURN_IF_ERROR(_sorter->_do_sort()); + RETURN_IF_ERROR(_sorter->prepare_for_read(false)); + RETURN_IF_ERROR(_write_sorted_data()); + return Status::OK(); + } + + // spill remaining data + RETURN_IF_ERROR(_do_spill()); + } + + // Merge all spilled streams and output final sorted data + if (!_sorted_streams.empty()) { + RETURN_IF_ERROR(_combine_files_output()); + } + + return Status::OK(); + } + + inline const std::string& file_name() const override { + return _iceberg_partition_writer->file_name(); + } + + inline int file_name_index() const override { + return _iceberg_partition_writer->file_name_index(); + } + + inline size_t written_len() const override { return _iceberg_partition_writer->written_len(); } + + auto sorter() const { return _sorter.get(); } + + Status trigger_spill() { return _do_spill(); } + +private: + // how many rows need in spill block batch + void _update_spill_block_batch_row_count(const vectorized::Block& block) { + auto rows = block.rows(); + if (rows > 0 && 0 == _avg_row_bytes) { + _avg_row_bytes = std::max(1UL, block.bytes() / rows); + int64_t spill_batch_bytes = _runtime_state->spill_sort_batch_bytes(); // default 8MB + _spill_block_batch_row_count = + (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes; + } + } + + // have enought data, flush in-memory sorted data to file + Status _flush_to_file() { + RETURN_IF_ERROR(_sorter->_do_sort()); + RETURN_IF_ERROR(_sorter->prepare_for_read(false)); + RETURN_IF_ERROR(_write_sorted_data()); + RETURN_IF_ERROR(_close_current_writer_and_open_next()); + _sorter->reset(); + return Status::OK(); + } + + // write data into file + Status _write_sorted_data() { + bool eos = false; + Block block; + while (!eos && !_runtime_state->is_cancelled()) { + RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos)); + RETURN_IF_ERROR(_iceberg_partition_writer->write(block)); + block.clear_column_data(); + } + return Status::OK(); + } + + // close current writer and open a new one with incremented file index + Status _close_current_writer_and_open_next() { + std::string current_file_name = _iceberg_partition_writer->file_name(); + int current_file_index = _iceberg_partition_writer->file_name_index(); + RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK())); + + _iceberg_partition_writer = + _create_writer_lambda(¤t_file_name, current_file_index + 1); + if (!_iceberg_partition_writer) { + return Status::InternalError("Failed to create new partition writer"); + } + + RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state, _profile, _row_desc)); + return Status::OK(); + } + + // batch size max is int32_t max + int32_t _get_spill_batch_size() const { + if (_spill_block_batch_row_count > std::numeric_limits<int32_t>::max()) { + return std::numeric_limits<int32_t>::max(); + } + return static_cast<int32_t>(_spill_block_batch_row_count); + } + + Status _do_spill() { + COUNTER_UPDATE(_do_spill_count_counter, 1); + RETURN_IF_ERROR(_sorter->prepare_for_read(true)); + int32_t batch_size = _get_spill_batch_size(); + + SpillStreamSPtr spilling_stream; + RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + _runtime_state, spilling_stream, print_id(_runtime_state->query_id()), + "iceberg-sort", 1 /* node_id */, batch_size, + _runtime_state->spill_sort_batch_bytes(), _profile)); + _sorted_streams.emplace_back(spilling_stream); + + // spill sorted data to stream + bool eos = false; + Block block; + while (!eos && !_runtime_state->is_cancelled()) { + RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill( + _runtime_state, &block, (int)_spill_block_batch_row_count, &eos)); + RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state, block, eos)); + block.clear_column_data(); + } + _sorter->reset(); + return Status::OK(); + } + + // merge spilled streams and output sorted data to Parquet files Review Comment: Similar to line 79, the comment on line 224 mentions "Parquet files" specifically but should be format-agnostic since the code handles both Parquet and ORC formats. ```suggestion // merge spilled streams and output sorted data to files ``` ########## be/src/vec/sink/writer/iceberg/viceberg_table_writer.h: ########## @@ -51,14 +49,19 @@ class VIcebergTableWriter final : public AsyncResultWriter { ~VIcebergTableWriter() = default; - Status init_properties(ObjectPool* pool); + Status init_properties(ObjectPool* pool, const RowDescriptor& row_desc) { + _row_desc = &row_desc; + return Status::OK(); + } Review Comment: The signature of init_properties() has been changed to accept a RowDescriptor parameter, which breaks API consistency with other table writers like VHiveTableWriter (which only takes ObjectPool*). This inconsistency could cause confusion. Consider either keeping the signature consistent across all table writers or documenting why this particular writer needs the RowDescriptor. ########## be/src/vec/runtime/vorc_transformer.cpp: ########## @@ -343,6 +347,209 @@ Status VOrcTransformer::close() { return Status::OK(); } +Status VOrcTransformer::collect_file_statistics_after_close(TIcebergColumnStats* stats) { + if (stats == nullptr || _iceberg_schema == nullptr || _fs == nullptr) { + return Status::OK(); + } + + try { + // orc writer do not provide api to get column statistics + // so we do not implement it now, we could implement it in future if really needed + // maybe at the close of orc writer, we could do statistics by hands + // eg: https://github.com/trinodb/trino/blob/master/lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java + io::FileReaderSPtr file_reader; + io::FileReaderOptions reader_options; + RETURN_IF_ERROR(_fs->open_file(_file_writer->path(), &file_reader, &reader_options)); + auto input_stream = std::make_unique<ORCFileInputStream>( + _file_writer->path().native(), file_reader, nullptr, nullptr, 8L * 1024L * 1024L, + 1L * 1024L * 1024L); + std::unique_ptr<orc::Reader> reader = + orc::createReader(std::move(input_stream), orc::ReaderOptions()); + std::unique_ptr<orc::Statistics> file_stats = reader->getStatistics(); + + if (file_stats == nullptr) { + return Status::OK(); + } + + std::map<int32_t, int64_t> value_counts; + std::map<int32_t, int64_t> null_value_counts; + std::map<int32_t, std::string> lower_bounds; + std::map<int32_t, std::string> upper_bounds; + bool has_any_null_count = false; + bool has_any_min_max = false; + + const iceberg::StructType& root_struct = _iceberg_schema->root_struct(); + const auto& nested_fields = root_struct.fields(); + for (uint32_t i = 0; i < nested_fields.size(); i++) { + uint32_t orc_col_id = i + 1; // skip root struct + if (orc_col_id >= file_stats->getNumberOfColumns()) { + continue; + } + + const orc::ColumnStatistics* col_stats = file_stats->getColumnStatistics(orc_col_id); + if (col_stats == nullptr) { + continue; + } + + int32_t field_id = nested_fields[i].field_id(); + int64_t non_null_count = col_stats->getNumberOfValues(); + value_counts[field_id] = non_null_count; + if (col_stats->hasNull()) { + has_any_null_count = true; + int64_t null_count = _cur_written_rows - non_null_count; + null_value_counts[field_id] = null_count; + value_counts[field_id] += null_count; + } + + if (_collect_column_bounds(col_stats, field_id, + _output_vexpr_ctxs[i]->root()->data_type(), &lower_bounds, + &upper_bounds)) { + has_any_min_max = true; + } + } + + stats->__set_value_counts(value_counts); + if (has_any_null_count) { + stats->__set_null_value_counts(null_value_counts); + } + if (has_any_min_max) { + stats->__set_lower_bounds(lower_bounds); + stats->__set_upper_bounds(upper_bounds); + } + return Status::OK(); + } catch (const std::exception& e) { + LOG(WARNING) << "Failed to collect ORC file statistics: " << e.what(); + return Status::OK(); + } +} + +bool VOrcTransformer::_collect_column_bounds(const orc::ColumnStatistics* col_stats, + int32_t field_id, const DataTypePtr& data_type, + std::map<int32_t, std::string>* lower_bounds, + std::map<int32_t, std::string>* upper_bounds) { + bool has_bounds = false; + auto primitive_type = remove_nullable(data_type)->get_primitive_type(); + if (const auto* bool_stats = dynamic_cast<const orc::BooleanColumnStatistics*>(col_stats)) { + if (bool_stats->hasCount()) { + uint64_t true_count = bool_stats->getTrueCount(); + uint64_t false_count = bool_stats->getFalseCount(); + if (true_count > 0 || false_count > 0) { + has_bounds = true; + bool min_val = (false_count == 0); + bool max_val = (true_count > 0); + (*lower_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&min_val), sizeof(bool)); + (*upper_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&max_val), sizeof(bool)); + } + } + } else if (const auto* int_stats = + dynamic_cast<const orc::IntegerColumnStatistics*>(col_stats)) { + if (int_stats->hasMinimum() && int_stats->hasMaximum()) { + has_bounds = true; + int64_t min_val = int_stats->getMinimum(); + int64_t max_val = int_stats->getMaximum(); + (*lower_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&min_val), sizeof(int64_t)); + (*upper_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&max_val), sizeof(int64_t)); + } + } else if (const auto* double_stats = + dynamic_cast<const orc::DoubleColumnStatistics*>(col_stats)) { + if (double_stats->hasMinimum() && double_stats->hasMaximum()) { + has_bounds = true; + if (primitive_type == TYPE_FLOAT) { + auto min_val = static_cast<float>(double_stats->getMinimum()); + auto max_val = static_cast<float>(double_stats->getMaximum()); + (*lower_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&min_val), sizeof(float)); + (*upper_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&max_val), sizeof(float)); + } else { + double min_val = double_stats->getMinimum(); + double max_val = double_stats->getMaximum(); + (*lower_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&min_val), sizeof(double)); + (*upper_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&max_val), sizeof(double)); + } + } + } else if (const auto* string_stats = + dynamic_cast<const orc::StringColumnStatistics*>(col_stats)) { + if (string_stats->hasMinimum() && string_stats->hasMaximum()) { + has_bounds = true; + (*lower_bounds)[field_id] = string_stats->getMinimum(); + (*upper_bounds)[field_id] = string_stats->getMaximum(); + } + } else if (const auto* date_stats = dynamic_cast<const orc::DateColumnStatistics*>(col_stats)) { + if (date_stats->hasMinimum() && date_stats->hasMaximum()) { + has_bounds = true; + int32_t min_val = date_stats->getMinimum(); + int32_t max_val = date_stats->getMaximum(); + (*lower_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&min_val), sizeof(int32_t)); + (*upper_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&max_val), sizeof(int32_t)); + } + } else if (const auto* ts_stats = + dynamic_cast<const orc::TimestampColumnStatistics*>(col_stats)) { + if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) { + has_bounds = true; + int64_t min_val = ts_stats->getMinimum() * 1000; + int64_t max_val = ts_stats->getMaximum() * 1000; + (*lower_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&min_val), sizeof(int64_t)); + (*upper_bounds)[field_id] = + std::string(reinterpret_cast<const char*>(&max_val), sizeof(int64_t)); + } + } else if (const auto* decimal_stats = + dynamic_cast<const orc::DecimalColumnStatistics*>(col_stats)) { + if (decimal_stats->hasMinimum() && decimal_stats->hasMaximum()) { + has_bounds = true; + (*lower_bounds)[field_id] = _decimal_to_bytes(decimal_stats->getMinimum()); + (*upper_bounds)[field_id] = _decimal_to_bytes(decimal_stats->getMaximum()); + } + } + + return has_bounds; +} + +std::string VOrcTransformer::_decimal_to_bytes(const orc::Decimal& decimal) { + orc::Int128 val = decimal.value; + if (val == 0) { + char zero = 0; + return std::string(&zero, 1); + } + + // Convert Int128 -> signed big-endian minimal bytes + bool negative = val < 0; + auto high = static_cast<uint64_t>(val.getHighBits()); + auto low = val.getLowBits(); + + // If negative, convert to two's complement explicitly + if (negative) { + // two's complement for 128-bit + low = ~low + 1; + high = ~high + (low == 0 ? 1 : 0); + } + Review Comment: The two's complement conversion for negative decimals has a potential bug. When converting a negative Int128 to two's complement, the code applies two's complement to the already-negative value. However, orc::Int128 likely already stores values in two's complement format. Applying two's complement again to a negative number would give an incorrect result. The code should check if val.getHighBits() and val.getLowBits() already provide the correct two's complement representation or use val directly for conversion without the manual two's complement step. ```suggestion // Convert Int128 (already stored in two's complement) -> signed big-endian minimal bytes bool negative = val < 0; auto high = static_cast<uint64_t>(val.getHighBits()); auto low = val.getLowBits(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
