This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a396f34e380 [feature](iceberg) support insert into iceberg table with 
sort-order (#60540)
a396f34e380 is described below

commit a396f34e380545534643c071ed142876a76fa7f0
Author: zhangstar333 <[email protected]>
AuthorDate: Sun Feb 22 08:19:24 2026 +0800

    [feature](iceberg) support insert into iceberg table with sort-order 
(#60540)
    
    ### What problem does this PR solve?
    Problem Summary:
    
    support write iceberg table with sort-order, the write data have been
    local sorted, and have add lower/upper_bounds metadata. so the iceberg
    plan could use it to prune datafile.
    **Notes**: this is only a local sort, not global sort. so if you are
    more parallel about iceberg writer, you many see overlapping of
    lower/upper_bounds between files.
    if you need a global sort, maybe could add order by cluster in the
    insert SQL.
    
    you could create table, and then alter table eg:
    ```
    CREATE TABLE test_table2 (
        id INT,
        name STRING,
        score DOUBLE,
        create_time datetime
    )
    ORDER BY (
        id ASC NULLS FIRST,
        score DESC NULLS LAST)
    PROPERTIES (
      'write-format'='ORC'
    );
    ```
---
 .../pipeline/exec/iceberg_table_sink_operator.cpp  |   2 +-
 be/src/pipeline/exec/operator.cpp                  |   4 +
 .../exec/spill_iceberg_table_sink_operator.cpp     | 196 ++++++++++++
 .../exec/spill_iceberg_table_sink_operator.h       |  93 ++++++
 be/src/pipeline/pipeline_fragment_context.cpp      |  12 +-
 be/src/vec/common/sort/sorter.cpp                  |   6 +-
 be/src/vec/common/sort/sorter.h                    |   6 +-
 be/src/vec/exec/format/table/parquet_utils.cpp     |  33 ++
 be/src/vec/exec/format/table/parquet_utils.h       |   5 +
 be/src/vec/runtime/vorc_transformer.cpp            | 209 +++++++++++-
 be/src/vec/runtime/vorc_transformer.h              |  12 +-
 be/src/vec/runtime/vparquet_transformer.cpp        |  64 ++++
 be/src/vec/runtime/vparquet_transformer.h          |   2 +
 .../writer/iceberg/viceberg_partition_writer.cpp   |  56 ++--
 .../writer/iceberg/viceberg_partition_writer.h     |  29 +-
 .../vec/sink/writer/iceberg/viceberg_sort_writer.h | 352 +++++++++++++++++++++
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |  68 ++--
 .../sink/writer/iceberg/viceberg_table_writer.h    |  21 +-
 .../sink/writer/iceberg/vpartition_writer_base.h   |  64 ++++
 be/test/vec/exec/sort/full_sort_test.cpp           |   4 +-
 .../create_preinstalled_scripts/iceberg/run26.sql  |  73 +++++
 .../datasource/iceberg/IcebergTransaction.java     |  12 +-
 .../iceberg/helper/IcebergWriterHelper.java        |  56 +++-
 .../glue/translator/PhysicalPlanTranslator.java    |   4 +
 .../org/apache/doris/planner/IcebergTableSink.java |  40 ++-
 gensrc/thrift/DataSinks.thrift                     |  11 +
 .../iceberg/write/test_iceberg_write_stats2.out    |  33 ++
 .../iceberg/write/test_iceberg_write_stats2.groovy | 118 +++++++
 28 files changed, 1465 insertions(+), 120 deletions(-)

diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.cpp 
b/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
index 9f59ff040a5..ab76beac655 100644
--- a/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
@@ -26,7 +26,7 @@ Status IcebergTableSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo&
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<Parent>();
-    RETURN_IF_ERROR(_writer->init_properties(p._pool));
+    RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index ff92b598b79..13d9b5a5851 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -76,6 +76,7 @@
 #include "pipeline/exec/set_source_operator.h"
 #include "pipeline/exec/sort_sink_operator.h"
 #include "pipeline/exec/sort_source_operator.h"
+#include "pipeline/exec/spill_iceberg_table_sink_operator.h"
 #include "pipeline/exec/spill_sort_sink_operator.h"
 #include "pipeline/exec/spill_sort_source_operator.h"
 #include "pipeline/exec/streaming_aggregation_operator.h"
@@ -820,6 +821,7 @@ DECLARE_OPERATOR(OlapTableSinkV2LocalState)
 DECLARE_OPERATOR(HiveTableSinkLocalState)
 DECLARE_OPERATOR(TVFTableSinkLocalState)
 DECLARE_OPERATOR(IcebergTableSinkLocalState)
+DECLARE_OPERATOR(SpillIcebergTableSinkLocalState)
 DECLARE_OPERATOR(AnalyticSinkLocalState)
 DECLARE_OPERATOR(BlackholeSinkLocalState)
 DECLARE_OPERATOR(SortSinkLocalState)
@@ -938,6 +940,8 @@ template class 
AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOp
 template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, 
OlapTableSinkV2OperatorX>;
 template class AsyncWriterSink<doris::vectorized::VHiveTableWriter, 
HiveTableSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VIcebergTableWriter, 
IcebergTableSinkOperatorX>;
+template class AsyncWriterSink<doris::vectorized::VIcebergTableWriter,
+                               SpillIcebergTableSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VTVFTableWriter, 
TVFTableSinkOperatorX>;
 
 #ifdef BE_TEST
diff --git a/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp 
b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
new file mode 100644
index 00000000000..d8f577af648
--- /dev/null
+++ b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "spill_iceberg_table_sink_operator.h"
+
+#include "common/status.h"
+#include "pipeline/exec/iceberg_table_sink_operator.h"
+#include "pipeline/exec/spill_utils.h"
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
+#include "vec/sink/writer/iceberg/viceberg_table_writer.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+SpillIcebergTableSinkLocalState::SpillIcebergTableSinkLocalState(DataSinkOperatorXBase*
 parent,
+                                                                 RuntimeState* 
state)
+        : Base(parent, state) {}
+
+Status SpillIcebergTableSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+
+    _init_spill_counters();
+
+    auto& p = _parent->cast<Parent>();
+    RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
+    return Status::OK();
+}
+
+Status SpillIcebergTableSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(Base::exec_time_counter());
+    SCOPED_TIMER(Base::_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    return Status::OK();
+}
+
+bool SpillIcebergTableSinkLocalState::is_blockable() const {
+    return true;
+}
+
+size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
+    if (!_writer || !_writer->_current_writer) {
+        return 0;
+    }
+
+    auto* sort_writer =
+            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+    if (!sort_writer || !sort_writer->sorter()) {
+        return 0;
+    }
+
+    return sort_writer->sorter()->get_reserve_mem_size(state, eos);
+}
+
+size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState* 
state) const {
+    if (!_writer || !_writer->_current_writer) {
+        return 0;
+    }
+
+    auto* sort_writer =
+            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+    if (!sort_writer || !sort_writer->sorter()) {
+        return 0;
+    }
+
+    return sort_writer->sorter()->data_size();
+}
+
+Status SpillIcebergTableSinkLocalState::revoke_memory(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
+    if (!_writer || !_writer->_current_writer) {
+        if (spill_context) {
+            spill_context->on_task_finished();
+        }
+        return Status::OK();
+    }
+
+    auto* sort_writer =
+            
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+
+    if (!sort_writer || !sort_writer->sorter()) {
+        if (spill_context) {
+            spill_context->on_task_finished();
+        }
+        return Status::OK();
+    }
+
+    auto exception_catch_func = [sort_writer]() {
+        auto status = [&]() {
+            RETURN_IF_CATCH_EXCEPTION({ return sort_writer->trigger_spill(); 
});
+        }();
+        return status;
+    };
+
+    
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
+    auto status =
+            SpillSinkRunnable(state, spill_context, operator_profile(), 
exception_catch_func).run();
+    if (!status.ok()) {
+        
state->get_query_ctx()->resource_ctx()->task_controller()->decrease_revoking_tasks_count();
+    }
+    return status;
+}
+
+SpillIcebergTableSinkOperatorX::SpillIcebergTableSinkOperatorX(
+        ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
+        const std::vector<TExpr>& t_output_expr)
+        : Base(operator_id, 0, 0), _row_desc(row_desc), 
_t_output_expr(t_output_expr), _pool(pool) {
+    _spillable = true;
+}
+
+Status SpillIcebergTableSinkOperatorX::init(const TDataSink& thrift_sink) {
+    RETURN_IF_ERROR(Base::init(thrift_sink));
+    _name = "SPILL_ICEBERG_TABLE_SINK_OPERATOR";
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
+    return Status::OK();
+}
+
+Status SpillIcebergTableSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::prepare(state));
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
+    return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+}
+
+Status SpillIcebergTableSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
+                                            bool eos) {
+    auto& local_state = get_local_state(state);
+    SCOPED_TIMER(local_state.exec_time_counter());
+    COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+    return local_state.sink(state, in_block, eos);
+}
+
+size_t SpillIcebergTableSinkOperatorX::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
+    auto& local_state = get_local_state(state);
+    return local_state.get_reserve_mem_size(state, eos);
+}
+
+size_t SpillIcebergTableSinkOperatorX::revocable_mem_size(RuntimeState* state) 
const {
+    auto& local_state = get_local_state(state);
+    return local_state.get_revocable_mem_size(state);
+}
+
+Status SpillIcebergTableSinkOperatorX::revoke_memory(
+        RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
+    auto& local_state = get_local_state(state);
+    return local_state.revoke_memory(state, spill_context);
+}
+
+void SpillIcebergTableSinkLocalState::_init_spill_counters() {
+    auto* profile = custom_profile();
+    //seems init_spill_write_counters()
+    ADD_TIMER_WITH_LEVEL(profile, "SpillWriteTime", 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteTaskWaitInQueueCount", 
TUnit::UNIT, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteTaskCount", TUnit::UNIT, 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillWriteTaskWaitInQueueTime", 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillWriteFileTime", 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillWriteSerializeBlockTime", 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteBlockCount", TUnit::UNIT, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteBlockBytes", TUnit::BYTES, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileBytes", TUnit::BYTES, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteRows", TUnit::UNIT, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileTotalCount", TUnit::UNIT, 
1);
+
+    //seems init_spill_read_counters()
+    ADD_TIMER_WITH_LEVEL(profile, "SpillTotalTime", 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillRecoverTime", 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueCount", 
TUnit::UNIT, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskCount", TUnit::UNIT, 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueTime", 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillReadFileTime", 1);
+    ADD_TIMER_WITH_LEVEL(profile, "SpillReadDerializeBlockTime", 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockCount", TUnit::UNIT, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockBytes", TUnit::BYTES, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileBytes", TUnit::BYTES, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillReadRows", TUnit::UNIT, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileCount", TUnit::UNIT, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentBytes", 
TUnit::BYTES, 1);
+    ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT, 
1);
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/spill_iceberg_table_sink_operator.h 
b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.h
new file mode 100644
index 00000000000..fb0e50ee794
--- /dev/null
+++ b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "operator.h"
+#include "vec/sink/writer/iceberg/viceberg_table_writer.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+class SpillIcebergTableSinkLocalState;
+class SpillIcebergTableSinkOperatorX;
+
+class SpillIcebergTableSinkLocalState final
+        : public AsyncWriterSink<vectorized::VIcebergTableWriter, 
SpillIcebergTableSinkOperatorX> {
+public:
+    using Base = AsyncWriterSink<vectorized::VIcebergTableWriter, 
SpillIcebergTableSinkOperatorX>;
+    using Parent = SpillIcebergTableSinkOperatorX;
+    ENABLE_FACTORY_CREATOR(SpillIcebergTableSinkLocalState);
+
+    SpillIcebergTableSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state);
+    ~SpillIcebergTableSinkLocalState() override = default;
+
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
+
+    bool is_blockable() const override;
+    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
+    Status revoke_memory(RuntimeState* state, const 
std::shared_ptr<SpillContext>& spill_context);
+    size_t get_revocable_mem_size(RuntimeState* state) const;
+
+private:
+    void _init_spill_counters();
+    friend class SpillIcebergTableSinkOperatorX;
+};
+
+class SpillIcebergTableSinkOperatorX final
+        : public DataSinkOperatorX<SpillIcebergTableSinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<SpillIcebergTableSinkLocalState>;
+    using LocalStateType = SpillIcebergTableSinkLocalState;
+
+    SpillIcebergTableSinkOperatorX(ObjectPool* pool, int operator_id, const 
RowDescriptor& row_desc,
+                                   const std::vector<TExpr>& t_output_expr);
+
+    Status init(const TDataSink& thrift_sink) override;
+
+    Status prepare(RuntimeState* state) override;
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
+
+    size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
+
+    size_t revocable_mem_size(RuntimeState* state) const override;
+
+    Status revoke_memory(RuntimeState* state,
+                         const std::shared_ptr<SpillContext>& spill_context) 
override;
+
+    using DataSinkOperatorX<LocalStateType>::node_id;
+    using DataSinkOperatorX<LocalStateType>::operator_id;
+    using DataSinkOperatorX<LocalStateType>::get_local_state;
+
+private:
+    friend class SpillIcebergTableSinkLocalState;
+    template <typename Writer, typename Parent>
+        requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
+    friend class AsyncWriterSink;
+
+    const RowDescriptor& _row_desc;
+    vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+    const std::vector<TExpr>& _t_output_expr;
+    ObjectPool* _pool = nullptr;
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 4dfa58e45a0..98c79c4e492 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -96,6 +96,7 @@
 #include "pipeline/exec/set_source_operator.h"
 #include "pipeline/exec/sort_sink_operator.h"
 #include "pipeline/exec/sort_source_operator.h"
+#include "pipeline/exec/spill_iceberg_table_sink_operator.h"
 #include "pipeline/exec/spill_sort_sink_operator.h"
 #include "pipeline/exec/spill_sort_source_operator.h"
 #include "pipeline/exec/streaming_aggregation_operator.h"
@@ -1074,10 +1075,15 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
     }
     case TDataSinkType::ICEBERG_TABLE_SINK: {
         if (!thrift_sink.__isset.iceberg_table_sink) {
-            return Status::InternalError("Missing hive table sink.");
+            return Status::InternalError("Missing iceberg table sink.");
+        }
+        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
+            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, 
next_sink_operator_id(),
+                                                                     row_desc, 
output_exprs);
+        } else {
+            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, 
next_sink_operator_id(),
+                                                                row_desc, 
output_exprs);
         }
-        _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, 
next_sink_operator_id(), row_desc,
-                                                            output_exprs);
         break;
     }
     case TDataSinkType::JDBC_TABLE_SINK: {
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 0e588b119f0..6f909acd916 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -235,7 +235,7 @@ Status FullSorter::append_block(Block* block) {
 
     // iff have reach limit and the unsorted block capacity can't hold the 
block data size
     if (_reach_limit() && !has_enough_capacity(block, 
_state->unsorted_block().get())) {
-        RETURN_IF_ERROR(_do_sort());
+        RETURN_IF_ERROR(do_sort());
     }
 
     {
@@ -268,7 +268,7 @@ Status FullSorter::prepare_for_read(bool is_spill) {
         _state->ignore_offset();
     }
     if (_state->unsorted_block()->rows() > 0) {
-        RETURN_IF_ERROR(_do_sort());
+        RETURN_IF_ERROR(do_sort());
     }
     return _state->build_merge_tree(_sort_description);
 }
@@ -282,7 +282,7 @@ Status FullSorter::merge_sort_read_for_spill(RuntimeState* 
state, doris::vectori
     return _state->merge_sort_read(block, batch_size, eos);
 }
 
-Status FullSorter::_do_sort() {
+Status FullSorter::do_sort() {
     Block* src_block = _state->unsorted_block().get();
     Block desc_block = src_block->clone_without_columns();
     COUNTER_UPDATE(_partial_sort_counter, 1);
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index bd5ae9f9726..d1d1229d908 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -202,6 +202,10 @@ public:
         _max_buffered_block_bytes = max_buffered_block_bytes;
     }
 
+    auto merge_sort_state() { return _state.get(); }
+
+    Status do_sort();
+
 private:
     bool _reach_limit() {
         return _state->unsorted_block()->allocated_bytes() >= 
_max_buffered_block_bytes;
@@ -209,8 +213,6 @@ private:
 
     bool has_enough_capacity(Block* input_block, Block* unsorted_block) const;
 
-    Status _do_sort();
-
     std::unique_ptr<MergeSorterState> _state;
 
     static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 * 1024 * 1024;
diff --git a/be/src/vec/exec/format/table/parquet_utils.cpp 
b/be/src/vec/exec/format/table/parquet_utils.cpp
index d5aee2079a5..4892516f573 100644
--- a/be/src/vec/exec/format/table/parquet_utils.cpp
+++ b/be/src/vec/exec/format/table/parquet_utils.cpp
@@ -433,4 +433,37 @@ void build_path_map(const FieldSchema& field, const 
std::string& prefix,
     }
 }
 
+#define MERGE_STATS_CASE(ParquetType)                                          
           \
+    case ParquetType: {                                                        
           \
+        auto typed_left_stat = std::static_pointer_cast<                       
           \
+                
::parquet::TypedStatistics<::parquet::PhysicalType<ParquetType>>>(left);  \
+        auto typed_right_stat = std::static_pointer_cast<                      
           \
+                
::parquet::TypedStatistics<::parquet::PhysicalType<ParquetType>>>(right); \
+        typed_left_stat->Merge(*typed_right_stat);                             
           \
+        return;                                                                
           \
+    }
+
+void merge_stats(const std::shared_ptr<::parquet::Statistics>& left,
+                 const std::shared_ptr<::parquet::Statistics>& right) {
+    if (left == nullptr || right == nullptr) {
+        return;
+    }
+    DCHECK(left->physical_type() == right->physical_type());
+
+    switch (left->physical_type()) {
+        MERGE_STATS_CASE(::parquet::Type::BOOLEAN);
+        MERGE_STATS_CASE(::parquet::Type::INT32);
+        MERGE_STATS_CASE(::parquet::Type::INT64);
+        MERGE_STATS_CASE(::parquet::Type::INT96);
+        MERGE_STATS_CASE(::parquet::Type::FLOAT);
+        MERGE_STATS_CASE(::parquet::Type::DOUBLE);
+        MERGE_STATS_CASE(::parquet::Type::BYTE_ARRAY);
+        MERGE_STATS_CASE(::parquet::Type::FIXED_LEN_BYTE_ARRAY);
+    default:
+        LOG(WARNING) << "Unsupported parquet type for statistics merge: "
+                     << static_cast<int>(left->physical_type());
+        break;
+    }
+}
+
 } // namespace doris::vectorized::parquet_utils
diff --git a/be/src/vec/exec/format/table/parquet_utils.h 
b/be/src/vec/exec/format/table/parquet_utils.h
index 0f966ab0676..b79dee1cb39 100644
--- a/be/src/vec/exec/format/table/parquet_utils.h
+++ b/be/src/vec/exec/format/table/parquet_utils.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <parquet/statistics.h>
+
 #include <array>
 #include <string>
 #include <unordered_map>
@@ -176,4 +178,7 @@ std::string decode_statistics_value(const FieldSchema* 
schema_field,
 void build_path_map(const FieldSchema& field, const std::string& prefix,
                     std::unordered_map<std::string, const FieldSchema*>* map);
 
+void merge_stats(const std::shared_ptr<::parquet::Statistics>& left,
+                 const std::shared_ptr<::parquet::Statistics>& right);
+
 } // namespace doris::vectorized::parquet_utils
diff --git a/be/src/vec/runtime/vorc_transformer.cpp 
b/be/src/vec/runtime/vorc_transformer.cpp
index a42a8eb84d4..39c226a971b 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -23,6 +23,7 @@
 
 #include <exception>
 #include <ostream>
+#include <sstream>
 
 #include "common/cast_set.h"
 #include "common/status.h"
@@ -55,6 +56,7 @@
 #include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_struct.h"
+#include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 #include "vec/runtime/vdatetime_value.h"
@@ -113,8 +115,10 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state, 
doris::io::FileWriter* fil
                                  const VExprContextSPtrs& output_vexpr_ctxs, 
std::string schema,
                                  std::vector<std::string> column_names, bool 
output_object_data,
                                  TFileCompressType::type compress_type,
-                                 const iceberg::Schema* iceberg_schema)
+                                 const iceberg::Schema* iceberg_schema,
+                                 std::shared_ptr<io::FileSystem> fs)
         : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+          _fs(fs),
           _file_writer(file_writer),
           _column_names(std::move(column_names)),
           _write_options(new orc::WriterOptions()),
@@ -343,6 +347,209 @@ Status VOrcTransformer::close() {
     return Status::OK();
 }
 
+Status 
VOrcTransformer::collect_file_statistics_after_close(TIcebergColumnStats* 
stats) {
+    if (stats == nullptr || _iceberg_schema == nullptr || _fs == nullptr) {
+        return Status::OK();
+    }
+
+    try {
+        // orc writer do not provide api to get column statistics
+        // so we do not implement it now, we could implement it in future if 
really needed
+        // maybe at the close of orc writer, we could do statistics by hands
+        // eg: 
https://github.com/trinodb/trino/blob/master/lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java
+        io::FileReaderSPtr file_reader;
+        io::FileReaderOptions reader_options;
+        RETURN_IF_ERROR(_fs->open_file(_file_writer->path(), &file_reader, 
&reader_options));
+        auto input_stream = std::make_unique<ORCFileInputStream>(
+                _file_writer->path().native(), file_reader, nullptr, nullptr, 
8L * 1024L * 1024L,
+                1L * 1024L * 1024L);
+        std::unique_ptr<orc::Reader> reader =
+                orc::createReader(std::move(input_stream), 
orc::ReaderOptions());
+        std::unique_ptr<orc::Statistics> file_stats = reader->getStatistics();
+
+        if (file_stats == nullptr) {
+            return Status::OK();
+        }
+
+        std::map<int32_t, int64_t> value_counts;
+        std::map<int32_t, int64_t> null_value_counts;
+        std::map<int32_t, std::string> lower_bounds;
+        std::map<int32_t, std::string> upper_bounds;
+        bool has_any_null_count = false;
+        bool has_any_min_max = false;
+
+        const iceberg::StructType& root_struct = 
_iceberg_schema->root_struct();
+        const auto& nested_fields = root_struct.fields();
+        for (uint32_t i = 0; i < nested_fields.size(); i++) {
+            uint32_t orc_col_id = i + 1; // skip root struct
+            if (orc_col_id >= file_stats->getNumberOfColumns()) {
+                continue;
+            }
+
+            const orc::ColumnStatistics* col_stats = 
file_stats->getColumnStatistics(orc_col_id);
+            if (col_stats == nullptr) {
+                continue;
+            }
+
+            int32_t field_id = nested_fields[i].field_id();
+            int64_t non_null_count = col_stats->getNumberOfValues();
+            value_counts[field_id] = non_null_count;
+            if (col_stats->hasNull()) {
+                has_any_null_count = true;
+                int64_t null_count = _cur_written_rows - non_null_count;
+                null_value_counts[field_id] = null_count;
+                value_counts[field_id] += null_count;
+            }
+
+            if (_collect_column_bounds(col_stats, field_id,
+                                       
_output_vexpr_ctxs[i]->root()->data_type(), &lower_bounds,
+                                       &upper_bounds)) {
+                has_any_min_max = true;
+            }
+        }
+
+        stats->__set_value_counts(value_counts);
+        if (has_any_null_count) {
+            stats->__set_null_value_counts(null_value_counts);
+        }
+        if (has_any_min_max) {
+            stats->__set_lower_bounds(lower_bounds);
+            stats->__set_upper_bounds(upper_bounds);
+        }
+        return Status::OK();
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "Failed to collect ORC file statistics: " << e.what();
+        return Status::OK();
+    }
+}
+
+bool VOrcTransformer::_collect_column_bounds(const orc::ColumnStatistics* 
col_stats,
+                                             int32_t field_id, const 
DataTypePtr& data_type,
+                                             std::map<int32_t, std::string>* 
lower_bounds,
+                                             std::map<int32_t, std::string>* 
upper_bounds) {
+    bool has_bounds = false;
+    auto primitive_type = remove_nullable(data_type)->get_primitive_type();
+    if (const auto* bool_stats = dynamic_cast<const 
orc::BooleanColumnStatistics*>(col_stats)) {
+        if (bool_stats->hasCount()) {
+            uint64_t true_count = bool_stats->getTrueCount();
+            uint64_t false_count = bool_stats->getFalseCount();
+            if (true_count > 0 || false_count > 0) {
+                has_bounds = true;
+                bool min_val = (false_count == 0);
+                bool max_val = (true_count > 0);
+                (*lower_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(bool));
+                (*upper_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(bool));
+            }
+        }
+    } else if (const auto* int_stats =
+                       dynamic_cast<const 
orc::IntegerColumnStatistics*>(col_stats)) {
+        if (int_stats->hasMinimum() && int_stats->hasMaximum()) {
+            has_bounds = true;
+            int64_t min_val = int_stats->getMinimum();
+            int64_t max_val = int_stats->getMaximum();
+            (*lower_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(int64_t));
+            (*upper_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(int64_t));
+        }
+    } else if (const auto* double_stats =
+                       dynamic_cast<const 
orc::DoubleColumnStatistics*>(col_stats)) {
+        if (double_stats->hasMinimum() && double_stats->hasMaximum()) {
+            has_bounds = true;
+            if (primitive_type == TYPE_FLOAT) {
+                auto min_val = static_cast<float>(double_stats->getMinimum());
+                auto max_val = static_cast<float>(double_stats->getMaximum());
+                (*lower_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(float));
+                (*upper_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(float));
+            } else {
+                double min_val = double_stats->getMinimum();
+                double max_val = double_stats->getMaximum();
+                (*lower_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(double));
+                (*upper_bounds)[field_id] =
+                        std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(double));
+            }
+        }
+    } else if (const auto* string_stats =
+                       dynamic_cast<const 
orc::StringColumnStatistics*>(col_stats)) {
+        if (string_stats->hasMinimum() && string_stats->hasMaximum()) {
+            has_bounds = true;
+            (*lower_bounds)[field_id] = string_stats->getMinimum();
+            (*upper_bounds)[field_id] = string_stats->getMaximum();
+        }
+    } else if (const auto* date_stats = dynamic_cast<const 
orc::DateColumnStatistics*>(col_stats)) {
+        if (date_stats->hasMinimum() && date_stats->hasMaximum()) {
+            has_bounds = true;
+            int32_t min_val = date_stats->getMinimum();
+            int32_t max_val = date_stats->getMaximum();
+            (*lower_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(int32_t));
+            (*upper_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(int32_t));
+        }
+    } else if (const auto* ts_stats =
+                       dynamic_cast<const 
orc::TimestampColumnStatistics*>(col_stats)) {
+        if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) {
+            has_bounds = true;
+            int64_t min_val = ts_stats->getMinimum() * 1000;
+            int64_t max_val = ts_stats->getMaximum() * 1000;
+            (*lower_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&min_val), 
sizeof(int64_t));
+            (*upper_bounds)[field_id] =
+                    std::string(reinterpret_cast<const char*>(&max_val), 
sizeof(int64_t));
+        }
+    } else if (const auto* decimal_stats =
+                       dynamic_cast<const 
orc::DecimalColumnStatistics*>(col_stats)) {
+        if (decimal_stats->hasMinimum() && decimal_stats->hasMaximum()) {
+            has_bounds = true;
+            (*lower_bounds)[field_id] = 
_decimal_to_bytes(decimal_stats->getMinimum());
+            (*upper_bounds)[field_id] = 
_decimal_to_bytes(decimal_stats->getMaximum());
+        }
+    }
+
+    return has_bounds;
+}
+
+std::string VOrcTransformer::_decimal_to_bytes(const orc::Decimal& decimal) {
+    orc::Int128 val = decimal.value;
+    if (val == 0) {
+        char zero = 0;
+        return std::string(&zero, 1);
+    }
+
+    // Convert Int128 -> signed big-endian minimal bytes
+    bool negative = val < 0;
+    auto high = static_cast<uint64_t>(val.getHighBits());
+    auto low = val.getLowBits();
+
+    // If negative, convert to two's complement explicitly
+    if (negative) {
+        // two's complement for 128-bit
+        low = ~low + 1;
+        high = ~high + (low == 0 ? 1 : 0);
+    }
+
+    // Serialize to big-endian bytes
+    uint8_t buf[16];
+    for (int i = 0; i < 8; ++i) {
+        buf[i] = static_cast<uint8_t>(high >> (56 - i * 8));
+        buf[i + 8] = static_cast<uint8_t>(low >> (56 - i * 8));
+    }
+
+    // Strip leading sign-extension bytes (Iceberg minimal encoding)
+    int start = 0;
+    uint8_t sign_byte = negative ? 0xFF : 0x00;
+    while (start < 15 && buf[start] == sign_byte &&
+           ((buf[start + 1] & 0x80) == (sign_byte & 0x80))) {
+        ++start;
+    }
+    return std::string(reinterpret_cast<const char*>(buf + start), 16 - start);
+}
+
 Status VOrcTransformer::write(const Block& block) {
     if (block.rows() == 0) {
         return Status::OK();
diff --git a/be/src/vec/runtime/vorc_transformer.h 
b/be/src/vec/runtime/vorc_transformer.h
index 3523c6b14af..d3876c7b5c6 100644
--- a/be/src/vec/runtime/vorc_transformer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -83,7 +83,8 @@ public:
                     const VExprContextSPtrs& output_vexpr_ctxs, std::string 
schema,
                     std::vector<std::string> column_names, bool 
output_object_data,
                     TFileCompressType::type compression,
-                    const iceberg::Schema* iceberg_schema = nullptr);
+                    const iceberg::Schema* iceberg_schema = nullptr,
+                    std::shared_ptr<io::FileSystem> fs = nullptr);
 
     ~VOrcTransformer() = default;
 
@@ -95,6 +96,8 @@ public:
 
     int64_t written_len() override;
 
+    Status collect_file_statistics_after_close(TIcebergColumnStats* stats);
+
 private:
     void set_compression_type(const TFileCompressType::type& compress_type);
     std::unique_ptr<orc::Type> _build_orc_type(const DataTypePtr& type,
@@ -106,7 +109,12 @@ private:
     // so we need to resize the subtype of a complex type
     Status _resize_row_batch(const DataTypePtr& type, const IColumn& column,
                              orc::ColumnVectorBatch* orc_col_batch);
-
+    bool _collect_column_bounds(const orc::ColumnStatistics* col_stats, 
int32_t field_id,
+                                const DataTypePtr& data_type,
+                                std::map<int32_t, std::string>* lower_bounds,
+                                std::map<int32_t, std::string>* upper_bounds);
+    std::string _decimal_to_bytes(const orc::Decimal& decimal);
+    std::shared_ptr<io::FileSystem> _fs = nullptr;
     doris::io::FileWriter* _file_writer = nullptr;
     std::vector<std::string> _column_names;
     std::unique_ptr<orc::OutputStream> _output_stream;
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
index d75513b0b0a..1563e092810 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -21,6 +21,7 @@
 #include <arrow/table.h>
 #include <arrow/util/key_value_metadata.h>
 #include <glog/logging.h>
+#include <parquet/api/reader.h>
 #include <parquet/column_writer.h>
 #include <parquet/platform.h>
 #include <parquet/schema.h>
@@ -42,6 +43,7 @@
 #include "util/arrow/utils.h"
 #include "util/debug_util.h"
 #include "vec/exec/format/table/iceberg/arrow_schema_util.h"
+#include "vec/exec/format/table/parquet_utils.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 
@@ -309,7 +311,69 @@ Status VParquetTransformer::close() {
         LOG(WARNING) << "Parquet writer close error: " << e.what();
         return Status::IOError(e.what());
     }
+
     return Status::OK();
 }
 
+Status 
VParquetTransformer::collect_file_statistics_after_close(TIcebergColumnStats* 
stats) {
+    std::shared_ptr<parquet::FileMetaData> file_metadata = _writer->metadata();
+    if (file_metadata == nullptr) {
+        return Status::InternalError("File metadata is not available");
+    }
+    std::map<int, int64_t> column_sizes;
+    std::map<int, int64_t> value_counts;
+    std::map<int, int64_t> null_value_counts;
+    std::map<int, std::string> lower_bounds;
+    std::map<int, std::string> upper_bounds;
+    std::map<int, std::shared_ptr<parquet::Statistics>> merged_column_stats;
+
+    const int num_row_groups = file_metadata->num_row_groups();
+    const int num_columns = file_metadata->num_columns();
+    for (int col_idx = 0; col_idx < num_columns; ++col_idx) {
+        auto field_id = 
file_metadata->schema()->Column(col_idx)->schema_node()->field_id();
+
+        for (int rg_idx = 0; rg_idx < num_row_groups; ++rg_idx) {
+            auto row_group = file_metadata->RowGroup(rg_idx);
+            auto column_chunk = row_group->ColumnChunk(col_idx);
+            column_sizes[field_id] += column_chunk->total_compressed_size();
+
+            if (column_chunk->is_stats_set()) {
+                auto column_stat = column_chunk->statistics();
+                if (!merged_column_stats.contains(field_id)) {
+                    merged_column_stats[field_id] = column_stat;
+                } else {
+                    parquet_utils::merge_stats(merged_column_stats[field_id], 
column_stat);
+                }
+            }
+        }
+    }
+
+    bool has_any_null_count = false;
+    bool has_any_min_max = false;
+    for (const auto& [field_id, column_stat] : merged_column_stats) {
+        value_counts[field_id] = column_stat->num_values();
+        if (column_stat->HasNullCount()) {
+            has_any_null_count = true;
+            int64_t null_count = column_stat->null_count();
+            null_value_counts[field_id] = null_count;
+            value_counts[field_id] += null_count;
+        }
+        if (column_stat->HasMinMax()) {
+            has_any_min_max = true;
+            lower_bounds[field_id] = column_stat->EncodeMin();
+            upper_bounds[field_id] = column_stat->EncodeMax();
+        }
+    }
+
+    stats->__set_column_sizes(column_sizes);
+    stats->__set_value_counts(value_counts);
+    if (has_any_null_count) {
+        stats->__set_null_value_counts(null_value_counts);
+    }
+    if (has_any_min_max) {
+        stats->__set_lower_bounds(lower_bounds);
+        stats->__set_upper_bounds(upper_bounds);
+    }
+    return Status::OK();
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vparquet_transformer.h 
b/be/src/vec/runtime/vparquet_transformer.h
index b654b331414..228a31c55a3 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -111,6 +111,8 @@ public:
 
     int64_t written_len() override;
 
+    Status collect_file_statistics_after_close(TIcebergColumnStats* stats);
+
 private:
     Status _parse_properties();
     Status _parse_schema();
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 933c8d3f562..f5c4ed1dd54 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -17,6 +17,9 @@
 
 #include "viceberg_partition_writer.h"
 
+#include <memory>
+#include <sstream>
+
 #include "io/file_factory.h"
 #include "runtime/runtime_state.h"
 #include "vec/columns/column_map.h"
@@ -47,9 +50,9 @@ VIcebergPartitionWriter::VIcebergPartitionWriter(
           _compress_type(compress_type),
           _hadoop_conf(hadoop_conf) {}
 
-Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* 
profile) {
+Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* 
profile,
+                                     const RowDescriptor* row_desc) {
     _state = state;
-
     io::FSPropertiesRef fs_properties(_write_info.file_type);
     fs_properties.properties = &_hadoop_conf;
     if (!_write_info.broker_addresses.empty()) {
@@ -83,17 +86,19 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profil
                                          to_string(_compress_type));
         }
         }
-        ParquetFileOptions parquet_options = {parquet_compression_type,
-                                              TParquetVersion::PARQUET_1_0, 
false, false};
-        _file_format_transformer.reset(new VParquetTransformer(
+        ParquetFileOptions parquet_options = {.compression_type = 
parquet_compression_type,
+                                              .parquet_version = 
TParquetVersion::PARQUET_1_0,
+                                              .parquet_disable_dictionary = 
false,
+                                              .enable_int96_timestamps = 
false};
+        _file_format_transformer = std::make_unique<VParquetTransformer>(
                 state, _file_writer.get(), _write_output_expr_ctxs, 
_write_column_names, false,
-                parquet_options, _iceberg_schema_json, &_schema));
+                parquet_options, _iceberg_schema_json, &_schema);
         return _file_format_transformer->open();
     }
     case TFileFormatType::FORMAT_ORC: {
-        _file_format_transformer.reset(
-                new VOrcTransformer(state, _file_writer.get(), 
_write_output_expr_ctxs, "",
-                                    _write_column_names, false, 
_compress_type, &_schema));
+        _file_format_transformer = std::make_unique<VOrcTransformer>(
+                state, _file_writer.get(), _write_output_expr_ctxs, "", 
_write_column_names, false,
+                _compress_type, &_schema, _fs);
         return _file_format_transformer->open();
     }
     default: {
@@ -121,7 +126,8 @@ Status VIcebergPartitionWriter::close(const Status& status) 
{
         }
     }
     if (status_ok) {
-        auto commit_data = _build_iceberg_commit_data();
+        TIcebergCommitData commit_data;
+        RETURN_IF_ERROR(_build_iceberg_commit_data(&commit_data));
         _state->add_iceberg_commit_datas(commit_data);
     }
     return result_status;
@@ -133,16 +139,28 @@ Status VIcebergPartitionWriter::write(vectorized::Block& 
block) {
     return Status::OK();
 }
 
-TIcebergCommitData VIcebergPartitionWriter::_build_iceberg_commit_data() {
-    TIcebergCommitData iceberg_commit_data;
-    iceberg_commit_data.__set_file_path(
-            fmt::format("{}/{}", _write_info.original_write_path, 
_get_target_file_name()));
-    iceberg_commit_data.__set_row_count(_row_count);
+Status VIcebergPartitionWriter::_build_iceberg_commit_data(TIcebergCommitData* 
commit_data) {
+    DCHECK(commit_data != nullptr);
     DCHECK(_file_format_transformer != nullptr);
-    
iceberg_commit_data.__set_file_size(_file_format_transformer->written_len());
-    iceberg_commit_data.__set_file_content(TFileContent::DATA);
-    iceberg_commit_data.__set_partition_values(_partition_values);
-    return iceberg_commit_data;
+
+    commit_data->__set_file_path(
+            fmt::format("{}/{}", _write_info.original_write_path, 
_get_target_file_name()));
+    commit_data->__set_row_count(_row_count);
+    commit_data->__set_file_size(_file_format_transformer->written_len());
+    commit_data->__set_file_content(TFileContent::DATA);
+    commit_data->__set_partition_values(_partition_values);
+    if (_file_format_type == TFileFormatType::FORMAT_PARQUET) {
+        TIcebergColumnStats column_stats;
+        
RETURN_IF_ERROR(static_cast<VParquetTransformer*>(_file_format_transformer.get())
+                                
->collect_file_statistics_after_close(&column_stats));
+        commit_data->__set_column_stats(column_stats);
+    } else if (_file_format_type == TFileFormatType::FORMAT_ORC) {
+        TIcebergColumnStats column_stats;
+        
RETURN_IF_ERROR(static_cast<VOrcTransformer*>(_file_format_transformer.get())
+                                
->collect_file_statistics_after_close(&column_stats));
+        commit_data->__set_column_stats(column_stats);
+    }
+    return Status::OK();
 }
 
 std::string VIcebergPartitionWriter::_get_file_extension(
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
index 28605f80426..b4db050dd71 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
@@ -20,10 +20,10 @@
 #include <gen_cpp/DataSinks_types.h>
 
 #include "io/fs/file_writer.h"
-#include "vec/columns/column.h"
 #include "vec/exec/format/table/iceberg/schema.h"
 #include "vec/exprs/vexpr_fwd.h"
 #include "vec/runtime/vfile_format_transformer.h"
+#include "vec/sink/writer/iceberg/vpartition_writer_base.h"
 
 namespace doris {
 namespace io {
@@ -43,16 +43,8 @@ namespace vectorized {
 class Block;
 class VFileFormatTransformer;
 
-class VIcebergPartitionWriter {
+class VIcebergPartitionWriter : public IPartitionWriterBase {
 public:
-    struct WriteInfo {
-        std::string write_path;
-        std::string original_write_path;
-        std::string target_path;
-        TFileType::type file_type;
-        std::vector<TNetworkAddress> broker_addresses;
-    };
-
     VIcebergPartitionWriter(const TDataSink& t_sink, std::vector<std::string> 
partition_values,
                             const VExprContextSPtrs& write_output_expr_ctxs,
                             const doris::iceberg::Schema& schema,
@@ -63,24 +55,23 @@ public:
                             TFileCompressType::type compress_type,
                             const std::map<std::string, std::string>& 
hadoop_conf);
 
-    Status init_properties(ObjectPool* pool) { return Status::OK(); }
-
-    Status open(RuntimeState* state, RuntimeProfile* profile);
+    Status open(RuntimeState* state, RuntimeProfile* profile,
+                const RowDescriptor* row_desc) override;
 
-    Status write(vectorized::Block& block);
+    Status write(vectorized::Block& block) override;
 
-    Status close(const Status& status);
+    Status close(const Status& status) override;
 
-    inline const std::string& file_name() const { return _file_name; }
+    inline const std::string& file_name() const override { return _file_name; }
 
-    inline int file_name_index() const { return _file_name_index; }
+    inline int file_name_index() const override { return _file_name_index; }
 
-    inline size_t written_len() { return 
_file_format_transformer->written_len(); }
+    inline size_t written_len() const override { return 
_file_format_transformer->written_len(); }
 
 private:
     std::string _get_target_file_name();
 
-    TIcebergCommitData _build_iceberg_commit_data();
+    Status _build_iceberg_commit_data(TIcebergCommitData* commit_data);
 
     std::string _get_file_extension(TFileFormatType::type file_format_type,
                                     TFileCompressType::type 
write_compress_type);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
new file mode 100644
index 00000000000..f9b91021f2b
--- /dev/null
+++ b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/object_pool.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/common/sort/sorter.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
+#include "vec/sink/writer/iceberg/vpartition_writer_base.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris {
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+class VIcebergSortWriter : public IPartitionWriterBase {
+public:
+    using CreateWriterLambda = 
std::function<std::shared_ptr<VIcebergPartitionWriter>(
+            const std::string* file_name, int file_name_index)>;
+
+    VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter> 
partition_writer,
+                       TSortInfo sort_info, int64_t target_file_size_bytes,
+                       CreateWriterLambda create_writer_lambda = nullptr)
+            : _sort_info(std::move(sort_info)),
+              _iceberg_partition_writer(std::move(partition_writer)),
+              _create_writer_lambda(std::move(create_writer_lambda)),
+              _target_file_size_bytes(target_file_size_bytes) {}
+
+    Status open(RuntimeState* state, RuntimeProfile* profile,
+                const RowDescriptor* row_desc) override {
+        DCHECK(row_desc != nullptr);
+        _runtime_state = state;
+        _profile = profile;
+        _row_desc = row_desc;
+
+        RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+        RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, 
*row_desc));
+        RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+        _sorter = vectorized::FullSorter::create_unique(
+                _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, 
_sort_info.nulls_first,
+                *row_desc, state, _profile);
+        _sorter->init_profile(_profile);
+        _sorter->set_enable_spill();
+        _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount", 
TUnit::UNIT);
+        RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile, 
row_desc));
+        return Status::OK();
+    }
+
+    Status write(vectorized::Block& block) override {
+        RETURN_IF_ERROR(_sorter->append_block(&block));
+        _update_spill_block_batch_row_count(block);
+        // sort in memory and write directly to Parquet file
+        if (_sorter->data_size() >= _target_file_size_bytes) {
+            return _flush_to_file();
+        }
+        // trigger_spill() will be called by memory management system
+        return Status::OK();
+    }
+
+    Status close(const Status& status) override {
+        Defer defer {[&]() {
+            Status st = _iceberg_partition_writer->close(status);
+            if (!st.ok()) {
+                LOG(WARNING) << fmt::format("_iceberg_partition_writer close 
failed, reason: {}",
+                                            st.to_string());
+            }
+            _cleanup_spill_streams();
+        }};
+
+        if (!status.ok() || _runtime_state->is_cancelled()) {
+            return status;
+        }
+
+        if (_sorter == nullptr) {
+            return Status::OK();
+        }
+
+        if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
+            !_sorter->merge_sort_state()->get_sorted_block().empty()) {
+            if (_sorted_streams.empty()) {
+                // data remaining in memory
+                RETURN_IF_ERROR(_sorter->do_sort());
+                RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+                RETURN_IF_ERROR(_write_sorted_data());
+                return Status::OK();
+            }
+
+            // spill remaining data
+            RETURN_IF_ERROR(_do_spill());
+        }
+
+        // Merge all spilled streams and output final sorted data
+        if (!_sorted_streams.empty()) {
+            RETURN_IF_ERROR(_combine_files_output());
+        }
+
+        return Status::OK();
+    }
+
+    inline const std::string& file_name() const override {
+        return _iceberg_partition_writer->file_name();
+    }
+
+    inline int file_name_index() const override {
+        return _iceberg_partition_writer->file_name_index();
+    }
+
+    inline size_t written_len() const override { return 
_iceberg_partition_writer->written_len(); }
+
+    auto sorter() const { return _sorter.get(); }
+
+    Status trigger_spill() { return _do_spill(); }
+
+private:
+    // how many rows need in spill block batch
+    void _update_spill_block_batch_row_count(const vectorized::Block& block) {
+        auto rows = block.rows();
+        if (rows > 0 && 0 == _avg_row_bytes) {
+            _avg_row_bytes = std::max(1UL, block.bytes() / rows);
+            int64_t spill_batch_bytes = 
_runtime_state->spill_sort_batch_bytes(); // default 8MB
+            _spill_block_batch_row_count =
+                    (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes;
+        }
+    }
+
+    // have enought data, flush in-memory sorted data to file
+    Status _flush_to_file() {
+        RETURN_IF_ERROR(_sorter->do_sort());
+        RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+        RETURN_IF_ERROR(_write_sorted_data());
+        RETURN_IF_ERROR(_close_current_writer_and_open_next());
+        _sorter->reset();
+        return Status::OK();
+    }
+
+    // write data into file
+    Status _write_sorted_data() {
+        bool eos = false;
+        Block block;
+        while (!eos && !_runtime_state->is_cancelled()) {
+            RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos));
+            RETURN_IF_ERROR(_iceberg_partition_writer->write(block));
+            block.clear_column_data();
+        }
+        return Status::OK();
+    }
+
+    // close current writer and open a new one with incremented file index
+    Status _close_current_writer_and_open_next() {
+        std::string current_file_name = _iceberg_partition_writer->file_name();
+        int current_file_index = _iceberg_partition_writer->file_name_index();
+        RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK()));
+
+        _iceberg_partition_writer =
+                _create_writer_lambda(&current_file_name, current_file_index + 
1);
+        if (!_iceberg_partition_writer) {
+            return Status::InternalError("Failed to create new partition 
writer");
+        }
+
+        RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state, 
_profile, _row_desc));
+        return Status::OK();
+    }
+
+    // batch size max is int32_t max
+    int32_t _get_spill_batch_size() const {
+        if (_spill_block_batch_row_count > 
std::numeric_limits<int32_t>::max()) {
+            return std::numeric_limits<int32_t>::max();
+        }
+        return static_cast<int32_t>(_spill_block_batch_row_count);
+    }
+
+    Status _do_spill() {
+        COUNTER_UPDATE(_do_spill_count_counter, 1);
+        RETURN_IF_ERROR(_sorter->prepare_for_read(true));
+        int32_t batch_size = _get_spill_batch_size();
+
+        SpillStreamSPtr spilling_stream;
+        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+                _runtime_state, spilling_stream, 
print_id(_runtime_state->query_id()),
+                "iceberg-sort", 1 /* node_id */, batch_size,
+                _runtime_state->spill_sort_batch_bytes(), _profile));
+        _sorted_streams.emplace_back(spilling_stream);
+
+        // spill sorted data to stream
+        bool eos = false;
+        Block block;
+        while (!eos && !_runtime_state->is_cancelled()) {
+            RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(
+                    _runtime_state, &block, (int)_spill_block_batch_row_count, 
&eos));
+            RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state, 
block, eos));
+            block.clear_column_data();
+        }
+        _sorter->reset();
+        return Status::OK();
+    }
+
+    // merge spilled streams and output sorted data to Parquet files
+    Status _combine_files_output() {
+        // merge until all streams can be merged in one pass
+        while (_sorted_streams.size() > 
static_cast<size_t>(_calc_max_merge_streams())) {
+            RETURN_IF_ERROR(_do_intermediate_merge());
+        }
+        RETURN_IF_ERROR(_create_final_merger());
+
+        bool eos = false;
+        Block output_block;
+        size_t current_file_bytes = _iceberg_partition_writer->written_len();
+        while (!eos && !_runtime_state->is_cancelled()) {
+            RETURN_IF_ERROR(_merger->get_next(&output_block, &eos));
+            if (output_block.rows() > 0) {
+                size_t block_bytes = output_block.bytes();
+                
RETURN_IF_ERROR(_iceberg_partition_writer->write(output_block));
+                current_file_bytes += block_bytes;
+                if (current_file_bytes > _target_file_size_bytes) {
+                    // close current writer and commit to file
+                    RETURN_IF_ERROR(_close_current_writer_and_open_next());
+                    current_file_bytes = 0;
+                }
+            }
+            output_block.clear_column_data();
+        }
+        return Status::OK();
+    }
+
+    Status _do_intermediate_merge() {
+        int max_stream_count = _calc_max_merge_streams();
+        RETURN_IF_ERROR(_create_merger(false, _spill_block_batch_row_count, 
max_stream_count));
+
+        // register new spill stream for merged output
+        int32_t batch_size = _get_spill_batch_size();
+        SpillStreamSPtr tmp_stream;
+        
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+                _runtime_state, tmp_stream, 
print_id(_runtime_state->query_id()),
+                "iceberg-sort-merge", 1 /* node_id */, batch_size,
+                _runtime_state->spill_sort_batch_bytes(), _profile));
+
+        _sorted_streams.emplace_back(tmp_stream);
+
+        // merge current streams and write to new spill stream
+        bool eos = false;
+        Block merge_sorted_block;
+        while (!eos && !_runtime_state->is_cancelled()) {
+            merge_sorted_block.clear_column_data();
+            RETURN_IF_ERROR(_merger->get_next(&merge_sorted_block, &eos));
+            RETURN_IF_ERROR(tmp_stream->spill_block(_runtime_state, 
merge_sorted_block, eos));
+        }
+
+        // clean up merged streams
+        for (auto& stream : _current_merging_streams) {
+            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+        }
+        _current_merging_streams.clear();
+        return Status::OK();
+    }
+
+    int _calc_max_merge_streams() const {
+        auto count =
+                _runtime_state->spill_sort_mem_limit() / 
_runtime_state->spill_sort_batch_bytes();
+        if (count > std::numeric_limits<int>::max()) {
+            return std::numeric_limits<int>::max();
+        }
+        return std::max(2, static_cast<int>(count));
+    }
+
+    // create merger for merging spill streams
+    Status _create_merger(bool is_final_merge, size_t batch_size, int 
num_streams) {
+        std::vector<vectorized::BlockSupplier> child_block_suppliers;
+        _merger = 
std::make_unique<vectorized::VSortedRunMerger>(_sorter->get_sort_description(),
+                                                                 batch_size, 
-1, 0, _profile);
+        _current_merging_streams.clear();
+        size_t streams_to_merge = is_final_merge ? _sorted_streams.size() : 
num_streams;
+
+        for (size_t i = 0; i < streams_to_merge && !_sorted_streams.empty(); 
++i) {
+            auto stream = _sorted_streams.front();
+            stream->set_read_counters(_profile);
+            _current_merging_streams.emplace_back(stream);
+            child_block_suppliers.emplace_back([stream](vectorized::Block* 
block, bool* eos) {
+                return stream->read_next_block_sync(block, eos);
+            });
+            _sorted_streams.pop_front();
+        }
+
+        RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
+        return Status::OK();
+    }
+
+    Status _create_final_merger() { return _create_merger(true, 
_runtime_state->batch_size(), 1); }
+
+    // clean up all spill streams to ensure proper resource cleanup
+    void _cleanup_spill_streams() {
+        for (auto& stream : _sorted_streams) {
+            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+        }
+        _sorted_streams.clear();
+
+        for (auto& stream : _current_merging_streams) {
+            
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+        }
+        _current_merging_streams.clear();
+    }
+
+    RuntimeState* _runtime_state = nullptr;
+    RuntimeProfile* _profile = nullptr;
+    const RowDescriptor* _row_desc = nullptr;
+    ObjectPool _pool;
+    TSortInfo _sort_info;
+    VSortExecExprs _vsort_exec_exprs;
+    std::shared_ptr<VIcebergPartitionWriter> _iceberg_partition_writer;
+    CreateWriterLambda _create_writer_lambda; // creating new writers after 
commit
+
+    // Sorter and merger
+    std::unique_ptr<vectorized::FullSorter> _sorter;
+    std::unique_ptr<vectorized::VSortedRunMerger> _merger;
+    std::deque<vectorized::SpillStreamSPtr> _sorted_streams;
+    std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
+
+    int64_t _target_file_size_bytes = 0; //config::iceberg_sink_max_file_size 
default 1GB
+    size_t _avg_row_bytes = 0;
+    size_t _spill_block_batch_row_count = 4096;
+
+    RuntimeProfile::Counter* _do_spill_count_counter = nullptr;
+};
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 6f4400418eb..9b495f093a5 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -29,6 +29,7 @@
 #include "vec/exprs/vexpr_context.h"
 #include "vec/sink/writer/iceberg/partition_transformers.h"
 #include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
 #include "vec/sink/writer/vhive_utils.h"
 
 namespace doris {
@@ -43,10 +44,6 @@ VIcebergTableWriter::VIcebergTableWriter(const TDataSink& 
t_sink,
     DCHECK(_t_sink.__isset.iceberg_table_sink);
 }
 
-Status VIcebergTableWriter::init_properties(ObjectPool* pool) {
-    return Status::OK();
-}
-
 Status VIcebergTableWriter::open(RuntimeState* state, RuntimeProfile* profile) 
{
     _state = state;
 
@@ -193,12 +190,12 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
             _vec_output_expr_ctxs, block, &output_block, false));
     materialize_block_inplace(output_block);
 
-    std::unordered_map<std::shared_ptr<VIcebergPartitionWriter>, 
IColumn::Filter> writer_positions;
+    std::unordered_map<std::shared_ptr<IPartitionWriterBase>, IColumn::Filter> 
writer_positions;
     _row_count += output_block.rows();
 
     // Case 1: Full static partition - all data goes to a single partition
     if (_is_full_static_partition) {
-        std::shared_ptr<VIcebergPartitionWriter> writer;
+        std::shared_ptr<IPartitionWriterBase> writer;
         {
             SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
             auto writer_iter = 
_partitions_to_writers.find(_static_partition_path);
@@ -209,14 +206,14 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
                     return e.to_status();
                 }
                 _partitions_to_writers.insert({_static_partition_path, 
writer});
-                RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+                RETURN_IF_ERROR(writer->open(_state, _operator_profile, 
_row_desc));
             } else {
                 if (writer_iter->second->written_len() > 
_target_file_size_bytes) {
                     std::string file_name(writer_iter->second->file_name());
                     int file_name_index = 
writer_iter->second->file_name_index();
                     {
                         SCOPED_RAW_TIMER(&_close_ns);
-                        
static_cast<void>(writer_iter->second->close(Status::OK()));
+                        
RETURN_IF_ERROR(writer_iter->second->close(Status::OK()));
                     }
                     _partitions_to_writers.erase(writer_iter);
                     try {
@@ -226,7 +223,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
                         return e.to_status();
                     }
                     _partitions_to_writers.insert({_static_partition_path, 
writer});
-                    RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+                    RETURN_IF_ERROR(writer->open(_state, _operator_profile, 
_row_desc));
                 } else {
                     writer = writer_iter->second;
                 }
@@ -235,12 +232,13 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
         SCOPED_RAW_TIMER(&_partition_writers_write_ns);
         output_block.erase(_non_write_columns_indices);
         RETURN_IF_ERROR(writer->write(output_block));
+        _current_writer = writer;
         return Status::OK();
     }
 
     // Case 2: Non-partitioned table
     if (_iceberg_partition_columns.empty()) {
-        std::shared_ptr<VIcebergPartitionWriter> writer;
+        std::shared_ptr<IPartitionWriterBase> writer;
         {
             SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
             auto writer_iter = _partitions_to_writers.find("");
@@ -251,14 +249,14 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
                     return e.to_status();
                 }
                 _partitions_to_writers.insert({"", writer});
-                RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+                RETURN_IF_ERROR(writer->open(_state, _operator_profile, 
_row_desc));
             } else {
                 if (writer_iter->second->written_len() > 
_target_file_size_bytes) {
                     std::string file_name(writer_iter->second->file_name());
                     int file_name_index = 
writer_iter->second->file_name_index();
                     {
                         SCOPED_RAW_TIMER(&_close_ns);
-                        
static_cast<void>(writer_iter->second->close(Status::OK()));
+                        
RETURN_IF_ERROR(writer_iter->second->close(Status::OK()));
                     }
                     _partitions_to_writers.erase(writer_iter);
                     try {
@@ -268,7 +266,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
                         return e.to_status();
                     }
                     _partitions_to_writers.insert({"", writer});
-                    RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+                    RETURN_IF_ERROR(writer->open(_state, _operator_profile, 
_row_desc));
                 } else {
                     writer = writer_iter->second;
                 }
@@ -277,9 +275,11 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
         SCOPED_RAW_TIMER(&_partition_writers_write_ns);
         output_block.erase(_non_write_columns_indices);
         RETURN_IF_ERROR(writer->write(output_block));
+        _current_writer = writer;
         return Status::OK();
     }
 
+    // Case 3: Partitioned table - handle multiple partitions
     {
         Block transformed_block;
         SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
@@ -319,11 +319,11 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
             auto create_and_open_writer =
                     [&](const std::string& partition_name, int position,
                         const std::string* file_name, int file_name_index,
-                        std::shared_ptr<VIcebergPartitionWriter>& writer_ptr) 
-> Status {
+                        std::shared_ptr<IPartitionWriterBase>& writer_ptr) -> 
Status {
                 try {
                     auto writer = _create_partition_writer(&transformed_block, 
position, file_name,
                                                            file_name_index);
-                    RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+                    RETURN_IF_ERROR(writer->open(_state, _operator_profile, 
_row_desc));
                     IColumn::Filter filter(output_block.rows(), 0);
                     filter[position] = 1;
                     writer_positions.insert({writer, std::move(filter)});
@@ -337,7 +337,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
 
             auto writer_iter = _partitions_to_writers.find(partition_name);
             if (writer_iter == _partitions_to_writers.end()) {
-                std::shared_ptr<VIcebergPartitionWriter> writer;
+                std::shared_ptr<IPartitionWriterBase> writer;
                 if (_partitions_to_writers.size() + 1 >
                     
config::table_sink_partition_write_max_partition_nums_per_writer) {
                     return Status::InternalError(
@@ -346,13 +346,13 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
                 }
                 RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
nullptr, 0, writer));
             } else {
-                std::shared_ptr<VIcebergPartitionWriter> writer;
+                std::shared_ptr<IPartitionWriterBase> writer;
                 if (writer_iter->second->written_len() > 
_target_file_size_bytes) {
                     std::string file_name(writer_iter->second->file_name());
                     int file_name_index = 
writer_iter->second->file_name_index();
                     {
                         SCOPED_RAW_TIMER(&_close_ns);
-                        
static_cast<void>(writer_iter->second->close(Status::OK()));
+                        
RETURN_IF_ERROR(writer_iter->second->close(Status::OK()));
                     }
                     writer_positions.erase(writer_iter->second);
                     _partitions_to_writers.erase(writer_iter);
@@ -378,6 +378,7 @@ Status VIcebergTableWriter::write(RuntimeState* state, 
vectorized::Block& block)
         Block filtered_block;
         RETURN_IF_ERROR(_filter_block(output_block, &it->second, 
&filtered_block));
         RETURN_IF_ERROR(it->first->write(filtered_block));
+        _current_writer = it->first;
     }
     return Status::OK();
 }
@@ -491,7 +492,7 @@ std::vector<std::string> 
VIcebergTableWriter::_partition_values(
     return partition_values;
 }
 
-std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_writer(
+std::shared_ptr<IPartitionWriterBase> 
VIcebergTableWriter::_create_partition_writer(
         vectorized::Block* transformed_block, int position, const std::string* 
file_name,
         int file_name_index) {
     auto& iceberg_table_sink = _t_sink.iceberg_table_sink;
@@ -531,7 +532,7 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
         write_path = output_path;
     }
 
-    VIcebergPartitionWriter::WriteInfo write_info = {
+    IPartitionWriterBase::WriteInfo write_info = {
             .write_path = std::move(write_path),
             .original_write_path = std::move(original_write_path),
             .target_path = std::move(target_path),
@@ -542,7 +543,6 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
                                            
iceberg_table_sink.broker_addresses.end());
     }
 
-    _write_file_count++;
     std::vector<std::string> column_names;
     column_names.reserve(_write_output_vexpr_ctxs.size());
     for (int i = 0; i < _schema->columns().size(); i++) {
@@ -550,12 +550,26 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
             column_names.emplace_back(_schema->columns()[i].field_name());
         }
     }
-    return std::make_shared<VIcebergPartitionWriter>(
-            _t_sink, std::move(partition_values), _write_output_vexpr_ctxs, 
*_schema,
-            &_t_sink.iceberg_table_sink.schema_json, std::move(column_names), 
std::move(write_info),
-            (file_name == nullptr) ? _compute_file_name() : *file_name, 
file_name_index,
-            iceberg_table_sink.file_format, 
iceberg_table_sink.compression_type,
-            iceberg_table_sink.hadoop_config);
+
+    auto create_writer_lambda =
+            [this, partition_values, column_names, write_info](
+                    const std::string* file_name,
+                    int file_name_index) -> 
std::shared_ptr<VIcebergPartitionWriter> {
+        auto& iceberg_table_sink = _t_sink.iceberg_table_sink;
+        _write_file_count++;
+        return std::make_shared<VIcebergPartitionWriter>(
+                _t_sink, partition_values, _write_output_vexpr_ctxs, *_schema,
+                &_t_sink.iceberg_table_sink.schema_json, column_names, 
write_info,
+                (file_name == nullptr) ? _compute_file_name() : *file_name, 
file_name_index,
+                iceberg_table_sink.file_format, 
iceberg_table_sink.compression_type,
+                iceberg_table_sink.hadoop_config);
+    };
+    auto partition_write = create_writer_lambda(file_name, file_name_index);
+    if (iceberg_table_sink.__isset.sort_info) {
+        return std::make_shared<VIcebergSortWriter>(partition_write, 
iceberg_table_sink.sort_info,
+                                                    _target_file_size_bytes, 
create_writer_lambda);
+    }
+    return partition_write;
 }
 
 PartitionData VIcebergTableWriter::_get_partition_data(vectorized::Block* 
transformed_block,
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index b6e3c4edc57..842f962713f 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -19,11 +19,8 @@
 
 #include <gen_cpp/DataSinks_types.h>
 
-#include <optional>
-
 #include "util/runtime_profile.h"
 #include "vec/columns/column.h"
-#include "vec/common/string_ref.h"
 #include "vec/core/block.h"
 #include "vec/exec/format/table/iceberg/partition_spec_parser.h"
 #include "vec/exec/format/table/iceberg/schema_parser.h"
@@ -40,7 +37,8 @@ class RuntimeState;
 namespace vectorized {
 
 class IColumn;
-class VIcebergPartitionWriter;
+class IPartitionWriterBase;
+class VIcebergSortWriter;
 struct ColumnWithTypeAndName;
 
 class VIcebergTableWriter final : public AsyncResultWriter {
@@ -51,7 +49,10 @@ public:
 
     ~VIcebergTableWriter() = default;
 
-    Status init_properties(ObjectPool* pool);
+    Status init_properties(ObjectPool* pool, const RowDescriptor& row_desc) {
+        _row_desc = &row_desc;
+        return Status::OK();
+    }
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
@@ -59,6 +60,8 @@ public:
 
     Status close(Status) override;
 
+    std::shared_ptr<IPartitionWriterBase> _current_writer;
+
 private:
     class IcebergPartitionColumn {
     public:
@@ -102,7 +105,7 @@ private:
     // Build static partition path from static partition values
     std::string _build_static_partition_path();
 
-    std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer(
+    std::shared_ptr<IPartitionWriterBase> _create_partition_writer(
             vectorized::Block* transformed_block, int position,
             const std::string* file_name = nullptr, int file_name_index = 0);
 
@@ -146,12 +149,10 @@ private:
     // Pre-computed static partition value list (for full static mode only)
     std::vector<std::string> _static_partition_value_list;
 
-    std::unordered_map<std::string, std::shared_ptr<VIcebergPartitionWriter>>
-            _partitions_to_writers;
-
+    std::unordered_map<std::string, std::shared_ptr<IPartitionWriterBase>> 
_partitions_to_writers;
     VExprContextSPtrs _write_output_vexpr_ctxs;
-
     size_t _row_count = 0;
+    const RowDescriptor* _row_desc = nullptr;
 
     // profile counters
     int64_t _send_data_ns = 0;
diff --git a/be/src/vec/sink/writer/iceberg/vpartition_writer_base.h 
b/be/src/vec/sink/writer/iceberg/vpartition_writer_base.h
new file mode 100644
index 00000000000..1afd75f40dd
--- /dev/null
+++ b/be/src/vec/sink/writer/iceberg/vpartition_writer_base.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/status.h"
+#include "runtime/descriptors.h"
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+
+// IPartitionWriterBase
+//     ├── VIcebergPartitionWriter
+//     └── VIcebergSortWriter
+//             └── wraps VIcebergPartitionWriter
+class IPartitionWriterBase {
+public:
+    struct WriteInfo {
+        std::string write_path;
+        std::string original_write_path;
+        std::string target_path;
+        TFileType::type file_type;
+        std::vector<TNetworkAddress> broker_addresses;
+    };
+
+    IPartitionWriterBase() = default;
+
+    virtual ~IPartitionWriterBase() = default;
+
+    virtual Status open(RuntimeState* state, RuntimeProfile* profile,
+                        const RowDescriptor* row_desc) = 0;
+
+    virtual Status write(vectorized::Block& block) = 0;
+
+    virtual Status close(const Status& status) = 0;
+
+    virtual const std::string& file_name() const = 0;
+
+    virtual int file_name_index() const = 0;
+
+    virtual size_t written_len() const = 0;
+};
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/test/vec/exec/sort/full_sort_test.cpp 
b/be/test/vec/exec/sort/full_sort_test.cpp
index 8add8b7e9f5..9d59890017b 100644
--- a/be/test/vec/exec/sort/full_sort_test.cpp
+++ b/be/test/vec/exec/sort/full_sort_test.cpp
@@ -109,13 +109,13 @@ TEST_F(FullSorterTest, test_full_sorter3) {
     {
         Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5, 6, 7, 8, 9, 10});
         EXPECT_TRUE(sorter->append_block(&block).ok());
-        EXPECT_TRUE(sorter->_do_sort());
+        EXPECT_TRUE(sorter->do_sort());
     }
 
     {
         Block block = ColumnHelper::create_block<DataTypeInt64>({4, 5, 6, 7});
         EXPECT_TRUE(sorter->append_block(&block).ok());
-        EXPECT_TRUE(sorter->_do_sort());
+        EXPECT_TRUE(sorter->do_sort());
     }
     EXPECT_EQ(sorter->_state->get_sorted_block()[0]->rows(), 6);
     EXPECT_EQ(sorter->_state->get_sorted_block()[1]->rows(), 4);
diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run26.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run26.sql
new file mode 100644
index 00000000000..5b7513e1a53
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run26.sql
@@ -0,0 +1,73 @@
+create database if not exists demo.test_stats_order;
+USE demo.test_stats_order;
+
+DROP TABLE IF EXISTS iceberg_all_types_orc;
+DROP TABLE IF EXISTS iceberg_all_types_parquet;
+
+CREATE TABLE iceberg_all_types_orc (
+    `boolean_col` boolean,
+    `int_col` int,
+    `bigint_col` bigint,
+    `float_col` float,
+    `double_col` double,
+    `decimal_col1` decimal(9,0),
+    `decimal_col2` decimal(8,4),
+    `decimal_col3` decimal(18,6),
+    `decimal_col4` decimal(38,12),
+    `string_col` string,
+    `date_col` date,
+    `datetime_col1` TIMESTAMP_NTZ
+)
+USING iceberg
+TBLPROPERTIES(
+    'write.format.default' = 'orc',
+    'format-version' = '2'
+);
+
+CREATE TABLE iceberg_all_types_parquet (
+    `boolean_col` boolean,
+    `int_col` int,
+    `bigint_col` bigint,
+    `float_col` float,
+    `double_col` double,
+    `decimal_col1` decimal(9,0),
+    `decimal_col2` decimal(8,4),
+    `decimal_col3` decimal(18,6),
+    `decimal_col4` decimal(38,12),
+    `string_col` string,
+    `date_col` date,
+    `datetime_col1` TIMESTAMP_NTZ
+)
+USING iceberg
+TBLPROPERTIES(
+    'write.format.default' = 'parquet',
+    'format-version' = '2'
+);
+
+ALTER TABLE iceberg_all_types_parquet WRITE ORDERED BY int_col ASC;
+ALTER TABLE iceberg_all_types_orc WRITE ORDERED BY int_col ASC;
+
+CREATE TABLE iceberg_int_order (
+    int_col int
+)
+USING iceberg
+TBLPROPERTIES(
+    'write.format.default' = 'parquet',
+    'format-version' = '2'
+);
+
+
+CREATE TABLE iceberg_int_no_order (
+    int_col int
+)
+USING iceberg
+TBLPROPERTIES(
+    'write.format.default' = 'parquet',
+    'format-version' = '2'
+);
+
+ALTER TABLE iceberg_int_order WRITE ORDERED BY int_col ASC;
+
+
+
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 502b1eaa7c8..c6473ca64dd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -34,7 +34,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.OverwriteFiles;
 import org.apache.iceberg.PartitionField;
@@ -177,12 +176,8 @@ public class IcebergTransaction implements Transaction {
             return;
         }
 
-        // Get table specification information
-        PartitionSpec spec = transaction.table().spec();
-        FileFormat fileFormat = 
IcebergUtils.getFileFormat(transaction.table());
-
         // Convert commit data to DataFile objects using the same logic as 
insert
-        WriteResult writeResult = 
IcebergWriterHelper.convertToWriterResult(fileFormat, spec, commitDataList);
+        WriteResult writeResult = 
IcebergWriterHelper.convertToWriterResult(transaction.table(), commitDataList);
 
         // Add the generated DataFiles to filesToAdd list
         synchronized (filesToAdd) {
@@ -249,16 +244,13 @@ public class IcebergTransaction implements Transaction {
     }
 
     private void updateManifestAfterInsert(TUpdateMode updateMode) {
-        PartitionSpec spec = transaction.table().spec();
-        FileFormat fileFormat = 
IcebergUtils.getFileFormat(transaction.table());
-
         List<WriteResult> pendingResults;
         if (commitDataList.isEmpty()) {
             pendingResults = Collections.emptyList();
         } else {
             //convert commitDataList to writeResult
             WriteResult writeResult = IcebergWriterHelper
-                    .convertToWriterResult(fileFormat, spec, commitDataList);
+                    .convertToWriterResult(transaction.table(), 
commitDataList);
             pendingResults = Lists.newArrayList(writeResult);
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
index d429e95e8ce..9a47874762c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
@@ -19,32 +19,46 @@ package org.apache.doris.datasource.iceberg.helper;
 
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.statistics.CommonStatistics;
+import org.apache.doris.thrift.TIcebergColumnStats;
 import org.apache.doris.thrift.TIcebergCommitData;
 
 import com.google.common.base.VerifyException;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.types.Types;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class IcebergWriterHelper {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergWriterHelper.class);
 
     private static final int DEFAULT_FILE_COUNT = 1;
 
     public static WriteResult convertToWriterResult(
-            FileFormat format,
-            PartitionSpec spec,
+            Table table,
             List<TIcebergCommitData> commitDataList) {
         List<DataFile> dataFiles = new ArrayList<>();
+
+        // Get table specification information
+        PartitionSpec spec = table.spec();
+        FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+
         for (TIcebergCommitData commitData : commitDataList) {
             //get the files path
             String location = commitData.getFilePath();
@@ -53,7 +67,7 @@ public class IcebergWriterHelper {
             long fileSize = commitData.getFileSize();
             long recordCount = commitData.getRowCount();
             CommonStatistics stat = new CommonStatistics(recordCount, 
DEFAULT_FILE_COUNT, fileSize);
-
+            Metrics metrics = buildDataFileMetrics(table, fileFormat, 
commitData);
             Optional<PartitionData> partitionData = Optional.empty();
             //get and check partitionValues when table is partitionedTable
             if (spec.isPartitioned()) {
@@ -67,7 +81,8 @@ public class IcebergWriterHelper {
                 // Convert human-readable partition values to PartitionData
                 partitionData = 
Optional.of(convertToPartitionData(partitionValues, spec));
             }
-            DataFile dataFile = genDataFile(format, location, spec, 
partitionData, stat);
+            DataFile dataFile = genDataFile(fileFormat, location, spec, 
partitionData, stat, metrics,
+                    table.sortOrder());
             dataFiles.add(dataFile);
         }
         return WriteResult.builder()
@@ -81,12 +96,14 @@ public class IcebergWriterHelper {
             String location,
             PartitionSpec spec,
             Optional<PartitionData> partitionData,
-            CommonStatistics statistics) {
+            CommonStatistics statistics, Metrics metrics, SortOrder sortOrder) 
{
 
         DataFiles.Builder builder = DataFiles.builder(spec)
                 .withPath(location)
                 .withFileSizeInBytes(statistics.getTotalFileBytes())
                 .withRecordCount(statistics.getRowCount())
+                .withMetrics(metrics)
+                .withSortOrder(sortOrder)
                 .withFormat(format);
 
         partitionData.ifPresent(builder::withPartition);
@@ -132,4 +149,33 @@ public class IcebergWriterHelper {
 
         return partitionData;
     }
+
+    private static Metrics buildDataFileMetrics(Table table, FileFormat 
fileFormat, TIcebergCommitData commitData) {
+        Map<Integer, Long> columnSizes = new HashMap<>();
+        Map<Integer, Long> valueCounts = new HashMap<>();
+        Map<Integer, Long> nullValueCounts = new HashMap<>();
+        Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
+        Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
+        if (commitData.isSetColumnStats()) {
+            TIcebergColumnStats stats = commitData.column_stats;
+            if (stats.isSetColumnSizes()) {
+                columnSizes = stats.column_sizes;
+            }
+            if (stats.isSetValueCounts()) {
+                valueCounts = stats.value_counts;
+            }
+            if (stats.isSetNullValueCounts()) {
+                nullValueCounts = stats.null_value_counts;
+            }
+            if (stats.isSetLowerBounds()) {
+                lowerBounds = stats.lower_bounds;
+            }
+            if (stats.isSetUpperBounds()) {
+                upperBounds = stats.upper_bounds;
+            }
+        }
+
+        return new Metrics(commitData.getRowCount(), columnSizes, valueCounts,
+                nullValueCounts, null, lowerBounds, upperBounds);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index f99f1b337e7..3f87d1a856b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -575,8 +575,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                                                       PlanTranslatorContext 
context) {
         PlanFragment rootFragment = icebergTableSink.child().accept(this, 
context);
         rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
+        List<Expr> outputExprs = Lists.newArrayList();
+        icebergTableSink.getOutput().stream().map(Slot::getExprId)
+                .forEach(exprId -> 
outputExprs.add(context.findSlotRef(exprId)));
         IcebergTableSink sink = new IcebergTableSink((IcebergExternalTable) 
icebergTableSink.getTargetTable());
         rootFragment.setSink(sink);
+        sink.setOutputExprs(outputExprs);
         return rootFragment;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index ef1f7c28772..581eee08a5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.planner;
 
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SortInfo;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.credentials.VendedCredentialsFactory;
@@ -32,10 +34,8 @@ import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TIcebergTableSink;
-import org.apache.doris.thrift.TSortField;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.iceberg.NullOrder;
 import org.apache.iceberg.PartitionSpecParser;
@@ -44,16 +44,19 @@ import org.apache.iceberg.SortDirection;
 import org.apache.iceberg.SortField;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
 public class IcebergTableSink extends BaseExternalTableDataSink {
 
+    private List<Expr> outputExprs;
     private final IcebergExternalTable targetTable;
     private static final HashSet<TFileFormatType> supportedTypes = new 
HashSet<TFileFormatType>() {{
             add(TFileFormatType.FORMAT_ORC);
@@ -82,6 +85,10 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         return supportedTypes;
     }
 
+    public void setOutputExprs(List<Expr> outputExprs) {
+        this.outputExprs = outputExprs;
+    }
+
     @Override
     public String getExplainString(String prefix, TExplainLevel explainLevel) {
         StringBuilder strBuilder = new StringBuilder();
@@ -122,24 +129,25 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         // sort order
         if (icebergTable.sortOrder().isSorted()) {
             SortOrder sortOrder = icebergTable.sortOrder();
-            Set<Integer> baseColumnFieldIds = 
icebergTable.schema().columns().stream()
-                    .map(Types.NestedField::fieldId)
-                    .collect(ImmutableSet.toImmutableSet());
-            ImmutableList.Builder<TSortField> sortFields = 
ImmutableList.builder();
+            ArrayList<Expr> orderingExprs = Lists.newArrayList();
+            ArrayList<Boolean> isAscOrder = Lists.newArrayList();
+            ArrayList<Boolean> isNullsFirst = Lists.newArrayList();
             for (SortField sortField : sortOrder.fields()) {
                 if (!sortField.transform().isIdentity()) {
                     continue;
                 }
-                if (!baseColumnFieldIds.contains(sortField.sourceId())) {
-                    continue;
+                for (int i = 0; i < icebergTable.schema().columns().size(); 
++i) {
+                    NestedField column  = 
icebergTable.schema().columns().get(i);
+                    if (column.fieldId() == sortField.sourceId()) {
+                        orderingExprs.add(outputExprs.get(i));
+                        
isAscOrder.add(sortField.direction().equals(SortDirection.ASC));
+                        
isNullsFirst.add(sortField.nullOrder().equals(NullOrder.NULLS_FIRST));
+                        break;
+                    }
                 }
-                TSortField tSortField = new TSortField();
-                tSortField.setSourceColumnId(sortField.sourceId());
-                
tSortField.setAscending(sortField.direction().equals(SortDirection.ASC));
-                
tSortField.setNullFirst(sortField.nullOrder().equals(NullOrder.NULLS_FIRST));
-                sortFields.add(tSortField);
             }
-            tSink.setSortFields(sortFields.build());
+            SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, 
isNullsFirst, null);
+            tSink.setSortInfo(sortInfo.toThrift());
         }
 
         // file info
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index cb817b7dbe8..67c2b216de8 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -397,6 +397,15 @@ enum TFileContent {
     EQUALITY_DELETES = 2
 }
 
+struct TIcebergColumnStats {
+    1: optional map<i32, i64> column_sizes
+    2: optional map<i32, i64> value_counts
+    3: optional map<i32, i64> null_value_counts
+    4: optional map<i32, i64> nan_value_counts
+    5: optional map<i32, binary> lower_bounds;
+    6: optional map<i32, binary> upper_bounds;
+}
+
 struct TIcebergCommitData {
     1: optional string file_path
     2: optional i64 row_count
@@ -404,6 +413,7 @@ struct TIcebergCommitData {
     4: optional TFileContent file_content
     5: optional list<string> partition_values 
     6: optional list<string> referenced_data_files
+    7: optional TIcebergColumnStats column_stats
 }
 
 struct TSortField {
@@ -431,6 +441,7 @@ struct TIcebergTableSink {
     // Key: partition column name, Value: partition value as string
     // When set, BE should use these values directly instead of computing from 
data
     15: optional map<string, string> static_partition_values;
+    16: optional PlanNodes.TSortInfo sort_info;
 }
 
 enum TDictLayoutType {
diff --git 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out
 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out
new file mode 100644
index 00000000000..48765f6c4dd
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out
@@ -0,0 +1,33 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_0 --
+false  22      222     2.2     2.2     2222    8765.4321       8765.432100     
987654321098765432.987654321099 bbb     2023-06-15      2023-06-15T23:45:01
+true   11      111     1.1     1.1     1111    1234.5678       1234.567890     
123456789012345678.123456789012 aaa     2023-01-01      2023-01-01T12:34:56
+
+-- !sql_1 --
+0      PARQUET 2       {1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2, 
11:2, 12:2} {1:0, 2:0, 3:0, 4:0, 5:0, 6:0, 7:0, 8:0, 9:0, 10:0, 11:0, 12:0} 
{1:0x00, 2:0x0B000000, 3:0x6F00000000000000, 4:0xCDCC8C3F, 
5:0x9A9999999999F13F, 6:0x00000457, 7:0x00BC614E, 8:0x00000000499602D2, 
9:0x000000018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000, 
12:0x005CE70F33F10500}     {1:0x01, 2:0x16000000, 3:0xDE00000000000000, 
4:0xCDCC0C40, 5:0x9A99999999990140, 6:0x000008AE, 7:0x05397FB1, 
8:0x000000020A75E124, 9 [...]
+
+-- !sql_2 --
+{"bigint_col":{"column_size":118, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":111, "upper_bound":222}, 
"boolean_col":{"column_size":49, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":0, "upper_bound":1}, 
"date_col":{"column_size":94, "value_count":2, "null_value_count":0, 
"nan_value_count":null, "lower_bound":"2023-01-01", 
"upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":118, 
"value_count":2, "null_value_count":0,  [...]
+
+-- !sql_3 --
+false  22      222     2.2     2.2     2222    8765.4321       8765.432100     
987654321098765432.987654321099 bbb     2023-06-15      2023-06-15T23:45:01
+true   11      111     1.1     1.1     1111    1234.5678       1234.567890     
123456789012345678.123456789012 aaa     2023-01-01      2023-01-01T12:34:56
+
+-- !sql_4 --
+0      ORC     2       {1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2, 
11:2, 12:2} {}      {1:0x00, 2:0x0B00000000000000, 3:0x6F00000000000000, 
4:0xCDCC8C3F, 5:0x9A9999999999F13F, 6:0x0457, 7:0x00BC614E, 8:0x075BCD15, 
9:0x018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000, 
12:0x005CE70F33F10500}       {1:0x01, 2:0x1600000000000000, 
3:0xDE00000000000000, 4:0xCDCC0C40, 5:0x9A99999999990140, 6:0x08AE, 
7:0x05397FB1, 8:0x05397FB1, 9:0x0C7748819DFFB62505316873CB, 10:0x626262, 
11:0x434C0000, 12:0x40D91FA833FE0500}
+
+-- !sql_5 --
+{"bigint_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":111, "upper_bound":222}, 
"boolean_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":0, "upper_bound":1}, 
"date_col":{"column_size":null, "value_count":2, "null_value_count":null, 
"nan_value_count":null, "lower_bound":"2023-01-01", 
"upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":null, 
"value_count":2, "null_v [...]
+
+-- !sql_6 --
+{1:0x00, 2:0x0B000000, 3:0x6F00000000000000, 4:0xCDCC8C3F, 
5:0x9A9999999999F13F, 6:0x00000457, 7:0x00BC614E, 8:0x00000000499602D2, 
9:0x000000018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000, 
12:0x005CE70F33F10500}
+
+-- !sql_7 --
+{1:0x00, 2:0x0B00000000000000, 3:0x6F00000000000000, 4:0xCDCC8C3F, 
5:0x9A9999999999F13F, 6:0x0457, 7:0x00BC614E, 8:0x075BCD15, 
9:0x018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000, 
12:0x005CE70F33F10500}
+
+-- !sql_8 --
+{1:0x01, 2:0x16000000, 3:0xDE00000000000000, 4:0xCDCC0C40, 
5:0x9A99999999990140, 6:0x000008AE, 7:0x05397FB1, 8:0x000000020A75E124, 
9:0x0000000C7748819DFFB62505316873CB, 10:0x626262, 11:0x434C0000, 
12:0x40D91FA833FE0500}
+
+-- !sql_9 --
+{1:0x01, 2:0x1600000000000000, 3:0xDE00000000000000, 4:0xCDCC0C40, 
5:0x9A99999999990140, 6:0x08AE, 7:0x05397FB1, 8:0x05397FB1, 
9:0x0C7748819DFFB62505316873CB, 10:0x626262, 11:0x434C0000, 
12:0x40D91FA833FE0500}
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_stats2.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_stats2.groovy
new file mode 100644
index 00000000000..8ae25ab76fe
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_stats2.groovy
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_write_stats2", 
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    for (String hivePrefix : ["hive2"]) {
+        setHivePrefix(hivePrefix)
+        String catalog_name = "test_iceberg_write_stats"
+        String db_name = "test_stats"
+        String rest_port = 
context.config.otherConfigs.get("iceberg_rest_uri_port")
+        String minio_port = 
context.config.otherConfigs.get("iceberg_minio_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """
+        CREATE CATALOG ${catalog_name} PROPERTIES (
+            'type'='iceberg',
+            'iceberg.catalog.type'='rest',
+            'uri' = 'http://${externalEnvIp}:${rest_port}',
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+            "s3.region" = "us-east-1",
+            "enable.mapping.varbinary"="true"
+        );"""
+
+        sql """switch ${catalog_name}"""
+        sql """create database if not exists ${db_name}"""
+        sql """use ${db_name}"""
+
+        sql """ DROP TABLE IF EXISTS `iceberg_all_types_parquet`; """
+
+        sql """
+        CREATE TABLE `iceberg_all_types_parquet`(
+            `boolean_col` boolean,
+            `int_col` int,
+            `bigint_col` bigint,
+            `float_col` float,
+            `double_col` double,
+            `decimal_col1` decimal(9,0),
+            `decimal_col2` decimal(8,4),
+            `decimal_col3` decimal(18,6),
+            `decimal_col4` decimal(38,12),
+            `string_col` string,
+            `date_col` date,
+            `datetime_col1` datetime) 
+            ENGINE=iceberg
+            properties (
+            "write-format"="parquet"
+            );
+        """
+
+        sql """ 
+        INSERT INTO iceberg_all_types_parquet VALUES
+        (true, 11, 111, 1.1, 1.1, 1111.0, 1234.5678, 1234.56789, 
123456789012345678.123456789012, 'aaa', '2023-01-01', '2023-01-01 12:34:56'),
+        (false, 22, 222, 2.2, 2.2, 2222.0, 8765.4321, 8765.43210, 
987654321098765432.98765432109876, 'bbb', '2023-06-15', '2023-06-15 23:45:01');
+        """
+        
+        qt_sql_0 """ select * from iceberg_all_types_parquet order by 1; """;
+        qt_sql_1 """ select 
content,file_format,record_count,value_counts,null_value_counts,lower_bounds,upper_bounds
 from iceberg_all_types_parquet\$files order by 1; """;
+        qt_sql_2 """ select readable_metrics from 
iceberg_all_types_parquet\$files order by 1; """;
+
+
+        sql """ DROP TABLE IF EXISTS `iceberg_all_types_orc`; """
+
+        sql """
+        CREATE TABLE `iceberg_all_types_orc`(
+            `boolean_col` boolean,
+            `int_col` int,
+            `bigint_col` bigint,
+            `float_col` float,
+            `double_col` double,
+            `decimal_col1` decimal(9,0),
+            `decimal_col2` decimal(8,4),
+            `decimal_col3` decimal(18,6),
+            `decimal_col4` decimal(38,12),
+            `string_col` string,
+            `date_col` date,
+            `datetime_col1` datetime) 
+            ENGINE=iceberg
+            properties (
+            "write-format"="orc"
+            );
+        """
+
+        sql """ 
+        INSERT INTO iceberg_all_types_orc VALUES
+        (true, 11, 111, 1.1, 1.1, 1111.0, 1234.5678, 1234.56789, 
123456789012345678.123456789012, 'aaa', '2023-01-01', '2023-01-01 12:34:56'),
+        (false, 22, 222, 2.2, 2.2, 2222.0, 8765.4321, 8765.43210, 
987654321098765432.98765432109876, 'bbb', '2023-06-15', '2023-06-15 23:45:01');
+        """
+        
+        qt_sql_3 """ select * from iceberg_all_types_orc order by 1; """;
+        qt_sql_4 """ select 
content,file_format,record_count,value_counts,null_value_counts,lower_bounds,upper_bounds
 from iceberg_all_types_orc\$files order by 1; """;
+        qt_sql_5 """ select readable_metrics from iceberg_all_types_orc\$files 
order by 1; """;
+        qt_sql_6 """ select lower_bounds from iceberg_all_types_parquet\$files 
order by 1; """;
+        qt_sql_7 """ select lower_bounds from iceberg_all_types_orc\$files 
order by 1; """;
+        qt_sql_8 """ select upper_bounds from iceberg_all_types_parquet\$files 
order by 1; """;
+        qt_sql_9 """ select upper_bounds from iceberg_all_types_orc\$files 
order by 1; """;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to