Copilot commented on code in PR #60540:
URL: https://github.com/apache/doris/pull/60540#discussion_r2772578172


##########
be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h:
##########
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/object_pool.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/common/sort/sorter.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
+#include "vec/sink/writer/iceberg/vpartition_writer_base.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris {
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+class VIcebergSortWriter : public IPartitionWriterBase {
+public:
+    using CreateWriterLambda = 
std::function<std::shared_ptr<VIcebergPartitionWriter>(
+            const std::string* file_name, int file_name_index)>;
+
+    VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter> 
partition_writer,
+                       TSortInfo sort_info, int64_t target_file_size_bytes,
+                       CreateWriterLambda create_writer_lambda = nullptr)
+            : _sort_info(std::move(sort_info)),
+              _iceberg_partition_writer(std::move(partition_writer)),
+              _create_writer_lambda(std::move(create_writer_lambda)),
+              _target_file_size_bytes(target_file_size_bytes) {}
+
+    Status open(RuntimeState* state, RuntimeProfile* profile,
+                const RowDescriptor* row_desc) override {
+        DCHECK(row_desc != nullptr);
+        _runtime_state = state;
+        _profile = profile;
+        _row_desc = row_desc;
+
+        RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+        RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, 
*row_desc));
+        RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+        _sorter = vectorized::FullSorter::create_unique(
+                _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, 
_sort_info.nulls_first,
+                *row_desc, state, _profile);
+        _sorter->init_profile(_profile);
+        _sorter->set_enable_spill();
+        _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", 
TUnit::UNIT);
+        RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, 
row_desc));
+        return Status::OK();
+    }
+
+    Status write(vectorized::Block& block) override {
+        RETURN_IF_ERROR(_sorter->append_block(&block));
+        _update_spill_block_batch_row_count(block);
+        // sort in memory and write directly to Parquet file

Review Comment:
   The comment on line 79 mentions "Parquet file" specifically, but this writer 
handles both Parquet and ORC formats based on the table configuration. The 
comment should be format-agnostic.
   ```suggestion
           // sort in memory and write directly to the output data file
   ```



##########
regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out:
##########
@@ -0,0 +1,33 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_0 --
+false  22      222     2.2     2.2     2222    8765.4321       8765.432100     
987654321098765432.987654321099 bbb     2023-06-15      2023-06-15T23:45:01
+true   11      111     1.1     1.1     1111    1234.5678       1234.567890     
123456789012345678.123456789012 aaa     2023-01-01      2023-01-01T12:34:56
+
+-- !sql_1 --
+0      PARQUET 2       {1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2, 
11:2, 12:2} {1:0, 2:0, 3:0, 4:0, 5:0, 6:0, 7:0, 8:0, 9:0, 10:0, 11:0, 12:0} 
{1:0x00, 2:0x0B000000, 3:0x6F00000000000000, 4:0xCDCC8C3F, 
5:0x9A9999999999F13F, 6:0x00000457, 7:0x00BC614E, 8:0x00000000499602D2, 
9:0x000000018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000, 
12:0x005CE70F33F10500}     {1:0x01, 2:0x16000000, 3:0xDE00000000000000, 
4:0xCDCC0C40, 5:0x9A99999999990140, 6:0x000008AE, 7:0x05397FB1, 
8:0x000000020A75E124, 9:0x0000000C7748819DFFB62505316873CB, 10:0x626262, 
11:0x434C0000, 12:0x40D91FA833FE0500}
+
+-- !sql_2 --
+{"bigint_col":{"column_size":118, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":111, "upper_bound":222}, 
"boolean_col":{"column_size":49, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":0, "upper_bound":1}, 
"date_col":{"column_size":94, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":"2023-01-01", 
"upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":118, 
"value_count":2, "null_value_count":0, "nan_value_count":null, 
"lower_bound":"2023-01-01 20:34:56.000000", "upper_bound":"2023-06-16 
07:45:01.000000"}, "decimal_col1":{"column_size":94, "value_count":2, 
"null_value_count":0, "nan_value_count":null, "lower_bound":1111, 
"upper_bound":2222}, "decimal_col2":{"column_size":94, "value_count":2, 
"null_value_count":0, "nan_value_count":null, "lower_bound":1234.5678, 
"upper_bound":8765.4321}, "decimal_col3":{"column_size":118, "value_count":2, 
"null_value_count":0, "nan_value_count":null, "lower_b
 ound":1234.567890, "upper_bound":8765.432100}, 
"decimal_col4":{"column_size":166, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":123456789012345678.123456789012, 
"upper_bound":987654321098765432.987654321099}, 
"double_col":{"column_size":118, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, 
"float_col":{"column_size":94, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, 
"int_col":{"column_size":94, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":11, "upper_bound":22}, 
"string_col":{"column_size":86, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":"aaa", "upper_bound":"bbb"}}
+
+-- !sql_3 --
+false  22      222     2.2     2.2     2222    8765.4321       8765.432100     
987654321098765432.987654321099 bbb     2023-06-15      2023-06-15T23:45:01
+true   11      111     1.1     1.1     1111    1234.5678       1234.567890     
123456789012345678.123456789012 aaa     2023-01-01      2023-01-01T12:34:56
+
+-- !sql_4 --
+0      ORC     2       {1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2, 
11:2, 12:2} {}      {1:0x00, 2:0x0B00000000000000, 3:0x6F00000000000000, 
4:0xCDCC8C3F, 5:0x9A9999999999F13F, 6:0x0457, 7:0x00BC614E, 8:0x075BCD15, 
9:0x018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000, 
12:0x005CE70F33F10500}       {1:0x01, 2:0x1600000000000000, 
3:0xDE00000000000000, 4:0xCDCC0C40, 5:0x9A99999999990140, 6:0x08AE, 
7:0x05397FB1, 8:0x05397FB1, 9:0x0C7748819DFFB62505316873CB, 10:0x626262, 
11:0x434C0000, 12:0x40D91FA833FE0500}
+
+-- !sql_5 --
+{"bigint_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":111, "upper_bound":222}, 
"boolean_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":0, "upper_bound":1}, 
"date_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":"2023-01-01", 
"upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":null, 
"value_count":2, "null_value_count":null, "nan_value_count":null, 
"lower_bound":"2023-01-01 20:34:56.000000", "upper_bound":"2023-06-16 
07:45:01.000000"}, "decimal_col1":{"column_size":null, "value_count":2, 
"null_value_count":null, "nan_value_count":null, "lower_bound":1111, 
"upper_bound":2222}, "decimal_col2":{"column_size":null, "value_count":2, 
"null_value_count":null, "nan_value_count":null, "lower_bound":1234.5678, 
"upper_bound":8765.4321}, "decimal_col3":{"column_size":null, "value_count":2, 
"null_value_count":null, 
 "nan_value_count":null, "lower_bound":123.456789, "upper_bound":87.654321}, 
"decimal_col4":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":123456789012345678.123456789012, 
"upper_bound":987654321098765432.987654321099}, 
"double_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, 
"float_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, 
"int_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":11, "upper_bound":22}, 
"string_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":"aaa", "upper_bound":"bbb"}}

Review Comment:
   The test output shows that decimal_col3 bounds are incorrect for ORC format. 
In sql_5 (ORC readable_metrics), decimal_col3 shows lower_bound:123.456789 and 
upper_bound:87.654321, but these values are 10x smaller than the actual data 
(1234.567890 and 8765.432100) and the upper bound is actually less than the 
lower bound which is impossible. This indicates a bug in the ORC decimal 
statistics collection, likely in the _decimal_to_bytes or 
_collect_column_bounds methods. The Parquet version (sql_2) shows the correct 
bounds.
   ```suggestion
   {"bigint_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":111, "upper_bound":222}, 
"boolean_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":0, "upper_bound":1}, 
"date_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":"2023-01-01", 
"upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":null, 
"value_count":2, "null_value_count":null, "nan_value_count":null, 
"lower_bound":"2023-01-01 20:34:56.000000", "upper_bound":"2023-06-16 
07:45:01.000000"}, "decimal_col1":{"column_size":null, "value_count":2, 
"null_value_count":null, "nan_value_count":null, "lower_bound":1111, 
"upper_bound":2222}, "decimal_col2":{"column_size":null, "value_count":2, 
"null_value_count":null, "nan_value_count":null, "lower_bound":1234.5678, 
"upper_bound":8765.4321}, "decimal_col3":{"column_size":null, "value_count":2, 
"null_value_count":null
 , "nan_value_count":null, "lower_bound":1234.567890, 
"upper_bound":8765.432100}, "decimal_col4":{"column_size":null, 
"value_count":2, "null_value_count":null, "nan_value_count":null, 
"lower_bound":123456789012345678.123456789012, 
"upper_bound":987654321098765432.987654321099}, 
"double_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, 
"float_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":1.1, "upper_bound":2.2}, 
"int_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":11, "upper_bound":22}, 
"string_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":"aaa", "upper_bound":"bbb"}}
   ```



##########
be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h:
##########
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/object_pool.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/common/sort/sorter.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
+#include "vec/sink/writer/iceberg/vpartition_writer_base.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris {
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+class VIcebergSortWriter : public IPartitionWriterBase {
+public:
+    using CreateWriterLambda = 
std::function<std::shared_ptr<VIcebergPartitionWriter>(
+            const std::string* file_name, int file_name_index)>;
+
+    VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter> 
partition_writer,
+                       TSortInfo sort_info, int64_t target_file_size_bytes,
+                       CreateWriterLambda create_writer_lambda = nullptr)
+            : _sort_info(std::move(sort_info)),
+              _iceberg_partition_writer(std::move(partition_writer)),
+              _create_writer_lambda(std::move(create_writer_lambda)),
+              _target_file_size_bytes(target_file_size_bytes) {}
+
+    Status open(RuntimeState* state, RuntimeProfile* profile,
+                const RowDescriptor* row_desc) override {
+        DCHECK(row_desc != nullptr);
+        _runtime_state = state;
+        _profile = profile;
+        _row_desc = row_desc;
+
+        RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+        RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, 
*row_desc));
+        RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+        _sorter = vectorized::FullSorter::create_unique(
+                _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, 
_sort_info.nulls_first,
+                *row_desc, state, _profile);
+        _sorter->init_profile(_profile);
+        _sorter->set_enable_spill();
+        _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", 
TUnit::UNIT);
+        RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, 
row_desc));
+        return Status::OK();
+    }
+
+    Status write(vectorized::Block& block) override {
+        RETURN_IF_ERROR(_sorter->append_block(&block));
+        _update_spill_block_batch_row_count(block);
+        // sort in memory and write directly to Parquet file
+        if (_sorter->data_size() >= _target_file_size_bytes) {
+            return _flush_to_file();
+        }
+        // trigger_spill() will be called by memory management system
+        return Status::OK();
+    }
+
+    Status close(const Status& status) override {
+        Defer defer {[&]() {
+            Status st = _iceberg_partition_writer->close(status);
+            if (!st.ok()) {
+                LOG(WARNING) << fmt::format("_iceberg_partition_writer close 
failed, reason: {}",
+                                            st.to_string());
+            }
+            _cleanup_spill_streams();
+        }};
+
+        if (!status.ok() || _runtime_state->is_cancelled()) {
+            return status;
+        }
+
+        if (_sorter == nullptr) {
+            return Status::OK();
+        }
+
+        if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
+            !_sorter->merge_sort_state()->get_sorted_block().empty()) {
+            if (_sorted_streams.empty()) {
+                // data remaining in memory
+                RETURN_IF_ERROR(_sorter->_do_sort());
+                RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+                RETURN_IF_ERROR(_write_sorted_data());
+                return Status::OK();
+            }
+
+            // spill remaining data
+            RETURN_IF_ERROR(_do_spill());
+        }
+
+        // Merge all spilled streams and output final sorted data
+        if (!_sorted_streams.empty()) {
+            RETURN_IF_ERROR(_combine_files_output());
+        }
+
+        return Status::OK();
+    }
+
+    inline const std::string& file_name() const override {
+        return _iceberg_partition_writer->file_name();
+    }
+
+    inline int file_name_index() const override {
+        return _iceberg_partition_writer->file_name_index();
+    }
+
+    inline size_t written_len() const override { return 
_iceberg_partition_writer->written_len(); }
+
+    auto sorter() const { return _sorter.get(); }
+
+    Status trigger_spill() { return _do_spill(); }
+
+private:
+    // how many rows need in spill block batch
+    void _update_spill_block_batch_row_count(const vectorized::Block& block) {
+        auto rows = block.rows();
+        if (rows > 0 && 0 == _avg_row_bytes) {
+            _avg_row_bytes = std::max(1UL, block.bytes() / rows);
+            int64_t spill_batch_bytes = 
_runtime_state->spill_sort_batch_bytes(); // default 8MB
+            _spill_block_batch_row_count =
+                    (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes;
+        }
+    }
+
+    // have enought data, flush in-memory sorted data to file

Review Comment:
   There's a spelling error in the comment. "enought" should be "enough".
   ```suggestion
       // have enough data, flush in-memory sorted data to file
   ```



##########
be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h:
##########
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/object_pool.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/common/sort/sorter.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
+#include "vec/sink/writer/iceberg/vpartition_writer_base.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris {
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+class VIcebergSortWriter : public IPartitionWriterBase {
+public:
+    using CreateWriterLambda = 
std::function<std::shared_ptr<VIcebergPartitionWriter>(
+            const std::string* file_name, int file_name_index)>;
+
+    VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter> 
partition_writer,
+                       TSortInfo sort_info, int64_t target_file_size_bytes,
+                       CreateWriterLambda create_writer_lambda = nullptr)
+            : _sort_info(std::move(sort_info)),
+              _iceberg_partition_writer(std::move(partition_writer)),
+              _create_writer_lambda(std::move(create_writer_lambda)),
+              _target_file_size_bytes(target_file_size_bytes) {}
+
+    Status open(RuntimeState* state, RuntimeProfile* profile,
+                const RowDescriptor* row_desc) override {
+        DCHECK(row_desc != nullptr);
+        _runtime_state = state;
+        _profile = profile;
+        _row_desc = row_desc;
+
+        RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+        RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, 
*row_desc));
+        RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+        _sorter = vectorized::FullSorter::create_unique(
+                _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, 
_sort_info.nulls_first,
+                *row_desc, state, _profile);
+        _sorter->init_profile(_profile);
+        _sorter->set_enable_spill();
+        _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", 
TUnit::UNIT);
+        RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, 
row_desc));
+        return Status::OK();
+    }
+
+    Status write(vectorized::Block& block) override {
+        RETURN_IF_ERROR(_sorter->append_block(&block));
+        _update_spill_block_batch_row_count(block);
+        // sort in memory and write directly to Parquet file
+        if (_sorter->data_size() >= _target_file_size_bytes) {
+            return _flush_to_file();
+        }
+        // trigger_spill() will be called by memory management system
+        return Status::OK();
+    }
+
+    Status close(const Status& status) override {
+        Defer defer {[&]() {
+            Status st = _iceberg_partition_writer->close(status);
+            if (!st.ok()) {
+                LOG(WARNING) << fmt::format("_iceberg_partition_writer close 
failed, reason: {}",
+                                            st.to_string());
+            }
+            _cleanup_spill_streams();
+        }};
+
+        if (!status.ok() || _runtime_state->is_cancelled()) {
+            return status;
+        }
+
+        if (_sorter == nullptr) {
+            return Status::OK();
+        }
+
+        if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
+            !_sorter->merge_sort_state()->get_sorted_block().empty()) {
+            if (_sorted_streams.empty()) {
+                // data remaining in memory
+                RETURN_IF_ERROR(_sorter->_do_sort());
+                RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+                RETURN_IF_ERROR(_write_sorted_data());
+                return Status::OK();
+            }
+
+            // spill remaining data
+            RETURN_IF_ERROR(_do_spill());
+        }
+
+        // Merge all spilled streams and output final sorted data
+        if (!_sorted_streams.empty()) {
+            RETURN_IF_ERROR(_combine_files_output());
+        }
+
+        return Status::OK();
+    }
+
+    inline const std::string& file_name() const override {
+        return _iceberg_partition_writer->file_name();
+    }
+
+    inline int file_name_index() const override {
+        return _iceberg_partition_writer->file_name_index();
+    }
+
+    inline size_t written_len() const override { return 
_iceberg_partition_writer->written_len(); }
+
+    auto sorter() const { return _sorter.get(); }
+
+    Status trigger_spill() { return _do_spill(); }
+
+private:
+    // how many rows need in spill block batch
+    void _update_spill_block_batch_row_count(const vectorized::Block& block) {
+        auto rows = block.rows();
+        if (rows > 0 && 0 == _avg_row_bytes) {
+            _avg_row_bytes = std::max(1UL, block.bytes() / rows);
+            int64_t spill_batch_bytes = 
_runtime_state->spill_sort_batch_bytes(); // default 8MB
+            _spill_block_batch_row_count =
+                    (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes;
+        }
+    }
+
+    // have enought data, flush in-memory sorted data to file
+    Status _flush_to_file() {
+        RETURN_IF_ERROR(_sorter->_do_sort());
+        RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+        RETURN_IF_ERROR(_write_sorted_data());
+        RETURN_IF_ERROR(_close_current_writer_and_open_next());
+        _sorter->reset();
+        return Status::OK();
+    }
+
+    // write data into file
+    Status _write_sorted_data() {
+        bool eos = false;
+        Block block;
+        while (!eos && !_runtime_state->is_cancelled()) {
+            RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos));
+            RETURN_IF_ERROR(_iceberg_partition_writer->write(block));
+            block.clear_column_data();
+        }
+        return Status::OK();
+    }
+
+    // close current writer and open a new one with incremented file index
+    Status _close_current_writer_and_open_next() {
+        std::string current_file_name = _iceberg_partition_writer->file_name();
+        int current_file_index = _iceberg_partition_writer->file_name_index();
+        RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK()));
+
+        _iceberg_partition_writer =
+                _create_writer_lambda(&current_file_name, current_file_index + 
1);
+        if (!_iceberg_partition_writer) {
+            return Status::InternalError("Failed to create new partition 
writer");
+        }
+
+        RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state, 
_profile, _row_desc));
+        return Status::OK();
+    }
+
+    // batch size max is int32_t max
+    int32_t _get_spill_batch_size() const {
+        if (_spill_block_batch_row_count > 
std::numeric_limits<int32_t>::max()) {
+            return std::numeric_limits<int32_t>::max();
+        }
+        return static_cast<int32_t>(_spill_block_batch_row_count);
+    }
+
+    Status _do_spill() {
+        COUNTER_UPDATE(_do_spill_count_counter, 1);
+        RETURN_IF_ERROR(_sorter->prepare_for_read(true));
+        int32_t batch_size = _get_spill_batch_size();
+
+        SpillStreamSPtr spilling_stream;
+        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+                _runtime_state, spilling_stream, 
print_id(_runtime_state->query_id()),
+                "iceberg-sort", 1 /* node_id */, batch_size,
+                _runtime_state->spill_sort_batch_bytes(), _profile));
+        _sorted_streams.emplace_back(spilling_stream);
+
+        // spill sorted data to stream
+        bool eos = false;
+        Block block;
+        while (!eos && !_runtime_state->is_cancelled()) {
+            RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(
+                    _runtime_state, &block, (int)_spill_block_batch_row_count, 
&eos));
+            RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state, 
block, eos));
+            block.clear_column_data();
+        }
+        _sorter->reset();
+        return Status::OK();
+    }
+
+    // merge spilled streams and output sorted data to Parquet files

Review Comment:
   Similar to line 79, the comment on line 224 mentions "Parquet files" 
specifically but should be format-agnostic since the code handles both Parquet 
and ORC formats.
   ```suggestion
       // merge spilled streams and output sorted data to files
   ```



##########
be/src/vec/sink/writer/iceberg/viceberg_table_writer.h:
##########
@@ -51,14 +49,19 @@ class VIcebergTableWriter final : public AsyncResultWriter {
 
     ~VIcebergTableWriter() = default;
 
-    Status init_properties(ObjectPool* pool);
+    Status init_properties(ObjectPool* pool, const RowDescriptor& row_desc) {
+        _row_desc = &row_desc;
+        return Status::OK();
+    }

Review Comment:
   The signature of init_properties() has been changed to accept a 
RowDescriptor parameter, which breaks API consistency with other table writers 
like VHiveTableWriter (which only takes ObjectPool*). This inconsistency could 
cause confusion. Consider either keeping the signature consistent across all 
table writers or documenting why this particular writer needs the RowDescriptor.



##########
be/src/vec/runtime/vorc_transformer.cpp:
##########
@@ -343,6 +347,209 @@ Status VOrcTransformer::close() {
     return Status::OK();
 }
 
+Status 
VOrcTransformer::collect_file_statistics_after_close(TIcebergColumnStats* 
stats) {
+    if (stats == nullptr || _iceberg_schema == nullptr || _fs == nullptr) {
+        return Status::OK();
+    }
+
+    try {
+        // orc writer do not provide api to get column statistics
+        // so we do not implement it now, we could implement it in future if 
really needed
+        // maybe at the close of orc writer, we could do statistics by hands
+        // eg: 
https://github.com/trinodb/trino/blob/master/lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java
+        io::FileReaderSPtr file_reader;
+        io::FileReaderOptions reader_options;
+        RETURN_IF_ERROR(_fs->open_file(_file_writer->path(), &file_reader, 
&reader_options));
+        auto input_stream = std::make_unique<ORCFileInputStream>(
+                _file_writer->path().native(), file_reader, nullptr, nullptr, 
8L * 1024L * 1024L,
+                1L * 1024L * 1024L);
+        std::unique_ptr<orc::Reader> reader =
+                orc::createReader(std::move(input_stream), 
orc::ReaderOptions());
+        std::unique_ptr<orc::Statistics> file_stats = reader->getStatistics();
+
+        if (file_stats == nullptr) {
+            return Status::OK();
+        }
+
+        std::map<int32_t, int64_t> value_counts;
+        std::map<int32_t, int64_t> null_value_counts;
+        std::map<int32_t, std::string> lower_bounds;
+        std::map<int32_t, std::string> upper_bounds;
+        bool has_any_null_count = false;
+        bool has_any_min_max = false;
+
+        const iceberg::StructType& root_struct = 
_iceberg_schema->root_struct();
+        const auto& nested_fields = root_struct.fields();
+        for (uint32_t i = 0; i < nested_fields.size(); i++) {
+            uint32_t orc_col_id = i + 1; // skip root struct
+            if (orc_col_id >= file_stats->getNumberOfColumns()) {
+                continue;
+            }
+
+            const orc::ColumnStatistics* col_stats = 
file_stats->getColumnStatistics(orc_col_id);
+            if (col_stats == nullptr) {
+                continue;
+            }
+
+            int32_t field_id = nested_fields[i].field_id();
+            int64_t non_null_count = col_stats->getNumberOfValues();
+            value_counts[field_id] = non_null_count;
+            if (col_stats->hasNull()) {
+                has_any_null_count = true;
+                int64_t null_count = _cur_written_rows - non_null_count;
+                null_value_counts[field_id] = null_count;
+                value_counts[field_id] += null_count;
+            }
+
+            if (_collect_column_bounds(col_stats, field_id,
+                                       
_output_vexpr_ctxs[i]->root()->data_type(), &lower_bounds,
+                                       &upper_bounds)) {
+                has_any_min_max = true;
+            }
+        }
+
+        stats->__set_value_counts(value_counts);
+        if (has_any_null_count) {
+            stats->__set_null_value_counts(null_value_counts);
+        }
+        if (has_any_min_max) {
+            stats->__set_lower_bounds(lower_bounds);
+            stats->__set_upper_bounds(upper_bounds);
+        }
+        return Status::OK();
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "Failed to collect ORC file statistics: " << e.what();
+        return Status::OK();
+    }
+}
+
+bool VOrcTransformer::_collect_column_bounds(const orc::ColumnStatistics* 
col_stats,
+                                             int32_t field_id, const 
DataTypePtr& data_type,
+                                             std::map<int32_t, std::string>* 
lower_bounds,
+                                             std::map<int32_t, std::string>* 
upper_bounds) {
+    bool has_bounds = false;
+    auto primitive_type = remove_nullable(data_type)->get_primitive_type();
+    if (const auto* bool_stats = dynamic_cast<const 
orc::BooleanColumnStatistics*>(col_stats)) {
+        if (bool_stats->hasCount()) {
+            uint64_t true_count = bool_stats->getTrueCount();
+            uint64_t false_count = bool_stats->getFalseCount();
+            if (true_count > 0 || false_count > 0) {
+                has_bounds = true;
+                bool min_val = (false_count == 0);
+                bool max_val = (true_count > 0);
+                (*lower_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(bool));
+                (*upper_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(bool));
+            }
+        }
+    } else if (const auto* int_stats =
+                       dynamic_cast<const 
orc::IntegerColumnStatistics*>(col_stats)) {
+        if (int_stats->hasMinimum() && int_stats->hasMaximum()) {
+            has_bounds = true;
+            int64_t min_val = int_stats->getMinimum();
+            int64_t max_val = int_stats->getMaximum();
+            (*lower_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(int64_t));
+            (*upper_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(int64_t));
+        }
+    } else if (const auto* double_stats =
+                       dynamic_cast<const 
orc::DoubleColumnStatistics*>(col_stats)) {
+        if (double_stats->hasMinimum() && double_stats->hasMaximum()) {
+            has_bounds = true;
+            if (primitive_type == TYPE_FLOAT) {
+                auto min_val = static_cast<float>(double_stats->getMinimum());
+                auto max_val = static_cast<float>(double_stats->getMaximum());
+                (*lower_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(float));
+                (*upper_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(float));
+            } else {
+                double min_val = double_stats->getMinimum();
+                double max_val = double_stats->getMaximum();
+                (*lower_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(double));
+                (*upper_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(double));
+            }
+        }
+    } else if (const auto* string_stats =
+                       dynamic_cast<const 
orc::StringColumnStatistics*>(col_stats)) {
+        if (string_stats->hasMinimum() && string_stats->hasMaximum()) {
+            has_bounds = true;
+            (*lower_bounds)[field_id] = string_stats->getMinimum();
+            (*upper_bounds)[field_id] = string_stats->getMaximum();
+        }
+    } else if (const auto* date_stats = dynamic_cast<const 
orc::DateColumnStatistics*>(col_stats)) {
+        if (date_stats->hasMinimum() && date_stats->hasMaximum()) {
+            has_bounds = true;
+            int32_t min_val = date_stats->getMinimum();
+            int32_t max_val = date_stats->getMaximum();
+            (*lower_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(int32_t));
+            (*upper_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(int32_t));
+        }
+    } else if (const auto* ts_stats =
+                       dynamic_cast<const 
orc::TimestampColumnStatistics*>(col_stats)) {
+        if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) {
+            has_bounds = true;
+            int64_t min_val = ts_stats->getMinimum() * 1000;
+            int64_t max_val = ts_stats->getMaximum() * 1000;
+            (*lower_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(int64_t));
+            (*upper_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(int64_t));
+        }
+    } else if (const auto* decimal_stats =
+                       dynamic_cast<const 
orc::DecimalColumnStatistics*>(col_stats)) {
+        if (decimal_stats->hasMinimum() && decimal_stats->hasMaximum()) {
+            has_bounds = true;
+            (*lower_bounds)[field_id] = 
_decimal_to_bytes(decimal_stats->getMinimum());
+            (*upper_bounds)[field_id] = 
_decimal_to_bytes(decimal_stats->getMaximum());
+        }
+    }
+
+    return has_bounds;
+}
+
+std::string VOrcTransformer::_decimal_to_bytes(const orc::Decimal& decimal) {
+    orc::Int128 val = decimal.value;
+    if (val == 0) {
+        char zero = 0;
+        return std::string(&zero, 1);
+    }
+
+    // Convert Int128 -> signed big-endian minimal bytes
+    bool negative = val < 0;
+    auto high = static_cast<uint64_t>(val.getHighBits());
+    auto low = val.getLowBits();
+
+    // If negative, convert to two's complement explicitly
+    if (negative) {
+        // two's complement for 128-bit
+        low = ~low + 1;
+        high = ~high + (low == 0 ? 1 : 0);
+    }
+

Review Comment:
   The two's complement conversion for negative decimals has a potential bug. 
When converting a negative Int128 to two's complement, the code applies two's 
complement to the already-negative value. However, orc::Int128 likely already 
stores values in two's complement format. Applying two's complement again to a 
negative number would give an incorrect result. The code should check if 
val.getHighBits() and val.getLowBits() already provide the correct two's 
complement representation or use val directly for conversion without the manual 
two's complement step.
   ```suggestion
       // Convert Int128 (already stored in two's complement) -> signed 
big-endian minimal bytes
       bool negative = val < 0;
       auto high = static_cast<uint64_t>(val.getHighBits());
       auto low = val.getLowBits();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to