This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ea33518b0b [Opt](load) use batching to optimize auto partition 
(#26915)
2ea33518b0b is described below

commit 2ea33518b0b8657508f113ed7f17b5b2596a64ac
Author: zclllyybb <[email protected]>
AuthorDate: Thu Nov 23 19:12:28 2023 +0800

    [Opt](load) use batching to optimize auto partition (#26915)
    
    use batching to optimize auto partition
---
 be/src/common/config.cpp                           |   3 +-
 be/src/common/config.h                             |   4 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |   2 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |   2 +-
 be/src/runtime/plan_fragment_executor.cpp          |   5 -
 be/src/vec/columns/column.h                        |   2 +-
 be/src/vec/columns/column_dummy.h                  |   2 +
 be/src/vec/core/block.cpp                          |  25 ++++
 be/src/vec/core/block.h                            |  12 +-
 be/src/vec/exec/scan/pip_scanner_context.h         |   2 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |   8 +-
 be/src/vec/sink/vrow_distribution.cpp              | 108 +++++++++-----
 be/src/vec/sink/vrow_distribution.h                | 101 +++++++------
 be/src/vec/sink/vtablet_finder.h                   |   2 +-
 be/src/vec/sink/writer/async_result_writer.cpp     |  10 +-
 be/src/vec/sink/writer/async_result_writer.h       |   1 -
 be/src/vec/sink/writer/vtablet_writer.cpp          | 161 ++++++++++++---------
 be/src/vec/sink/writer/vtablet_writer.h            |  32 ++--
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  80 ++++++----
 be/src/vec/sink/writer/vtablet_writer_v2.h         |   2 +
 docs/en/docs/admin-manual/config/be-config.md      |  10 ++
 docs/en/docs/advanced/partition/auto-partition.md  |  17 ++-
 docs/zh-CN/docs/admin-manual/config/be-config.md   |  14 +-
 .../docs/advanced/partition/auto-partition.md      |  17 ++-
 .../org/apache/doris/planner/OlapTableSink.java    |  13 +-
 .../auto_partition/test_auto_partition_load.out    |   8 +
 .../auto_partition/test_auto_partition_load.groovy |  12 +-
 27 files changed, 402 insertions(+), 253 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2f52bd44173..adbc61fe569 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -505,7 +505,8 @@ DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB
 
 // OlapTableSink sender's send interval, should be less than the real response 
time of a tablet writer rpc.
 // You may need to lower the speed when the sink receiver bes are too busy.
-DEFINE_mInt32(olap_table_sink_send_interval_ms, "1");
+DEFINE_mInt32(olap_table_sink_send_interval_microseconds, "1000");
+DEFINE_mDouble(olap_table_sink_send_interval_auto_partition_factor, "0.001");
 
 // Fragment thread pool
 DEFINE_Int32(fragment_pool_thread_num_min, "64");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e13d6dcfd4a..f616cfb6081 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -559,7 +559,9 @@ DECLARE_Int64(stream_tvf_buffer_size);
 
 // OlapTableSink sender's send interval, should be less than the real response 
time of a tablet writer rpc.
 // You may need to lower the speed when the sink receiver bes are too busy.
-DECLARE_mInt32(olap_table_sink_send_interval_ms);
+DECLARE_mInt32(olap_table_sink_send_interval_microseconds);
+// For auto partition, the send interval will multiply the factor
+DECLARE_mDouble(olap_table_sink_send_interval_auto_partition_factor);
 
 // Fragment thread pool
 DECLARE_Int32(fragment_pool_thread_num_min);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index f0e03596cc1..71517f377f0 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -375,7 +375,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                         }
                     }
                     cur_block.clear_column_data();
-                    local_state._serializer.get_block()->set_muatable_columns(
+                    local_state._serializer.get_block()->set_mutable_columns(
                             cur_block.mutate_columns());
                 }
             }
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 3193c1b07c4..b19c93cd28c 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -247,7 +247,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
                             }
                         }
                         cur_block.clear_column_data();
-                        
_serializer.get_block()->set_muatable_columns(cur_block.mutate_columns());
+                        
_serializer.get_block()->set_mutable_columns(cur_block.mutate_columns());
                     }
                 }
             }
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 6359a2dbe60..dc7fb350b6c 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -350,11 +350,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
 
             if (!eos || block->rows() > 0) {
                 st = _sink->send(runtime_state(), block.get());
-                //TODO: Asynchronisation need refactor this
-                if (st.is<NEED_SEND_AGAIN>()) { // created partition, do it 
again.
-                    st = _sink->send(runtime_state(), block.get());
-                    DCHECK(!st.is<NEED_SEND_AGAIN>());
-                }
                 handle_group_commit();
                 if (st.is<END_OF_FILE>()) {
                     break;
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 5202c51a3da..58fe0cb87e3 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -609,7 +609,7 @@ public:
     virtual bool is_exclusive() const { return use_count() == 1; }
 
     /// Clear data of column, just like vector clear
-    virtual void clear() {}
+    virtual void clear() = 0;
 
     /** Memory layout properties.
       *
diff --git a/be/src/vec/columns/column_dummy.h 
b/be/src/vec/columns/column_dummy.h
index a152dc9751a..790c135889e 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -62,6 +62,8 @@ public:
 
     void insert_data(const char*, size_t) override { ++s; }
 
+    void clear() override {};
+
     StringRef serialize_value_into_arena(size_t /*n*/, Arena& arena,
                                          char const*& begin) const override {
         return {arena.alloc_continue(0, begin), 0};
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 779f214c97b..0aef622870c 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -21,6 +21,7 @@
 #include "vec/core/block.h"
 
 #include <assert.h>
+#include <bits/ranges_base.h>
 #include <fmt/format.h>
 #include <gen_cpp/data.pb.h>
 #include <snappy.h>
@@ -968,6 +969,22 @@ void MutableBlock::add_rows(const Block* block, size_t 
row_begin, size_t length)
     }
 }
 
+void MutableBlock::add_rows(const Block* block, std::vector<int64_t> rows) {
+    DCHECK_LE(columns(), block->columns());
+    const auto& block_data = block->get_columns_with_type_and_name();
+    const size_t length = std::ranges::distance(rows);
+    for (size_t i = 0; i < _columns.size(); ++i) {
+        DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
+        auto& dst = _columns[i];
+        const auto& src = *block_data[i].column.get();
+        dst->reserve(dst->size() + length);
+        for (size_t row : rows) {
+            // we can introduce a new function like `insert_assume_reserved` 
for IColumn.
+            dst->insert_from(src, row);
+        }
+    }
+}
+
 void MutableBlock::erase(const String& name) {
     auto index_it = index_by_name.find(name);
     if (index_it == index_by_name.end()) {
@@ -1100,6 +1117,14 @@ void MutableBlock::clear_column_data() noexcept {
     }
 }
 
+void MutableBlock::reset_column_data() noexcept {
+    _columns.clear();
+    for (int i = 0; i < _names.size(); i++) {
+        _columns.emplace_back(_data_types[i]->create_column());
+        index_by_name[_names[i]] = i;
+    }
+}
+
 void MutableBlock::initialize_index_by_name() {
     for (size_t i = 0, size = _names.size(); i < size; ++i) {
         index_by_name[_names[i]] = i;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 927ed5c655c..b03d9fa4e21 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -22,9 +22,9 @@
 
 #include <glog/logging.h>
 #include <parallel_hashmap/phmap.h>
-#include <stddef.h>
-#include <stdint.h>
 
+#include <cstddef>
+#include <cstdint>
 #include <initializer_list>
 #include <list>
 #include <memory>
@@ -462,7 +462,7 @@ public:
 
     MutableColumns& mutable_columns() { return _columns; }
 
-    void set_muatable_columns(MutableColumns&& columns) { _columns = 
std::move(columns); }
+    void set_mutable_columns(MutableColumns&& columns) { _columns = 
std::move(columns); }
 
     DataTypes& data_types() { return _data_types; }
 
@@ -583,8 +583,8 @@ public:
         return Status::OK();
     }
 
+    // move to columns' data to a Block. this will invalidate
     Block to_block(int start_column = 0);
-
     Block to_block(int start_column, int end_column);
 
     void swap(MutableBlock& other) noexcept;
@@ -594,6 +594,7 @@ public:
     void add_row(const Block* block, int row);
     void add_rows(const Block* block, const int* row_begin, const int* 
row_end);
     void add_rows(const Block* block, size_t row_begin, size_t length);
+    void add_rows(const Block* block, std::vector<int64_t> rows);
 
     /// remove the column with the specified name
     void erase(const String& name);
@@ -606,7 +607,10 @@ public:
         _names.clear();
     }
 
+    // columns resist. columns' inner data removed.
     void clear_column_data() noexcept;
+    // reset columns by types and names.
+    void reset_column_data() noexcept;
 
     size_t allocated_bytes() const;
 
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 111e6ea2ab9..6c1f8e6325c 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -259,7 +259,7 @@ private:
                     _dependency->set_ready();
                 }
                 _colocate_blocks[loc] = get_free_block();
-                _colocate_mutable_blocks[loc]->set_muatable_columns(
+                _colocate_mutable_blocks[loc]->set_mutable_columns(
                         _colocate_blocks[loc]->mutate_columns());
             }
         }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 128ad0b37f8..5bfec60d1f9 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -126,7 +126,7 @@ template <typename Parent>
 Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
     SCOPED_TIMER(_parent->local_send_timer());
     Block block = _serializer.get_block()->to_block();
-    _serializer.get_block()->set_muatable_columns(block.clone_empty_columns());
+    _serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
     if (_recvr_is_valid()) {
         if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, 
Parent>) {
             COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes());
@@ -568,7 +568,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                         }
                     }
                     cur_block.clear_column_data();
-                    
_serializer.get_block()->set_muatable_columns(cur_block.mutate_columns());
+                    
_serializer.get_block()->set_mutable_columns(cur_block.mutate_columns());
                 }
             }
         } else {
@@ -595,7 +595,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
                     }
                 }
                 cur_block.clear_column_data();
-                
_serializer.get_block()->set_muatable_columns(cur_block.mutate_columns());
+                
_serializer.get_block()->set_mutable_columns(cur_block.mutate_columns());
                 _roll_pb_block();
             }
         }
@@ -750,7 +750,7 @@ Status BlockSerializer<Parent>::serialize_block(PBlock* 
dest, int num_receivers)
         auto block = _mutable_block->to_block();
         RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
         block.clear_column_data();
-        _mutable_block->set_muatable_columns(block.mutate_columns());
+        _mutable_block->set_mutable_columns(block.mutate_columns());
     }
 
     return Status::OK();
diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
index 0071629175f..74561594cfe 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -20,9 +20,11 @@
 #include <gen_cpp/FrontendService.h>
 #include <gen_cpp/FrontendService_types.h>
 
+#include "common/status.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
+#include "util/doris_metrics.h"
 #include "util/thrift_rpc_helper.h"
 #include "vec/columns/column_const.h"
 #include "vec/columns/column_nullable.h"
@@ -36,22 +38,37 @@ VRowDistribution::_get_partition_function() {
     return {_vpartition->get_part_func_ctx(), 
_vpartition->get_partition_function()};
 }
 
-void VRowDistribution::_save_missing_values(vectorized::ColumnPtr col,
-                                            vectorized::DataTypePtr 
value_type) {
-    _partitions_need_create.clear();
-    std::set<std::string> deduper;
-    // de-duplication
-    for (auto row : _missing_map) {
-        deduper.emplace(value_type->to_string(*col, row));
+Status VRowDistribution::_save_missing_values(vectorized::ColumnPtr col,
+                                              vectorized::DataTypePtr 
value_type, Block* block,
+                                              std::vector<int64_t> filter) {
+    // de-duplication for new partitions but save all rows.
+    _batching_block->add_rows(block, filter);
+    for (auto row : filter) {
+        auto val_str = value_type->to_string(*col, row);
+        if (!_deduper.contains(val_str)) {
+            _deduper.emplace(val_str);
+            TStringLiteral node;
+            node.value = std::move(val_str);
+            _partitions_need_create.emplace_back(std::vector {node}); // only 
1 partition column now
+        }
     }
-    for (auto& value : deduper) {
-        TStringLiteral node;
-        node.value = value;
-        _partitions_need_create.emplace_back(std::vector {node}); // only 1 
partition column now
+
+    // to avoid too large mem use
+    if (_batching_rows > _batch_size) {
+        _deal_batched = true;
     }
+
+    return Status::OK();
+}
+
+void VRowDistribution::clear_batching_stats() {
+    _partitions_need_create.clear();
+    _deduper.clear();
+    _batching_rows = 0;
+    _batching_bytes = 0;
 }
 
-Status VRowDistribution::_automatic_create_partition() {
+Status VRowDistribution::automatic_create_partition() {
     SCOPED_TIMER(_add_partition_request_timer);
     TCreatePartitionRequest request;
     TCreatePartitionResult result;
@@ -75,7 +92,7 @@ Status VRowDistribution::_automatic_create_partition() {
     if (result.status.status_code == TStatusCode::OK) {
         // add new created partitions
         RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
-        RETURN_IF_ERROR(_on_partitions_created(_caller, &result));
+        RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
     }
 
     return status;
@@ -126,7 +143,7 @@ Status 
VRowDistribution::_filter_block_by_skip_and_where_clause(
     auto& row_ids = row_part_tablet_id.row_ids;
     auto& partition_ids = row_part_tablet_id.partition_ids;
     auto& tablet_ids = row_part_tablet_id.tablet_ids;
-    if (auto* nullable_column =
+    if (const auto* nullable_column =
                 
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
         for (size_t i = 0; i < block->rows(); i++) {
             if (nullable_column->get_bool_inline(i) && !_skip[i]) {
@@ -135,7 +152,7 @@ Status 
VRowDistribution::_filter_block_by_skip_and_where_clause(
                 tablet_ids.emplace_back(_tablet_ids[i]);
             }
         }
-    } else if (auto* const_column =
+    } else if (const auto* const_column =
                        
vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) {
         bool ret = const_column->get_bool(0);
         if (!ret) {
@@ -144,7 +161,7 @@ Status 
VRowDistribution::_filter_block_by_skip_and_where_clause(
         // should we optimize?
         _filter_block_by_skip(block, row_part_tablet_id);
     } else {
-        auto& filter = assert_cast<const 
vectorized::ColumnUInt8&>(*filter_column).get_data();
+        const auto& filter = assert_cast<const 
vectorized::ColumnUInt8&>(*filter_column).get_data();
         for (size_t i = 0; i < block->rows(); i++) {
             if (filter[i] != 0 && !_skip[i]) {
                 row_ids.emplace_back(i);
@@ -194,7 +211,7 @@ Status 
VRowDistribution::_generate_rows_distribution_for_non_auto_parititon(
 
 Status VRowDistribution::_generate_rows_distribution_for_auto_parititon(
         vectorized::Block* block, int partition_col_idx, bool 
has_filtered_rows,
-        std::vector<RowPartTabletIds>& row_part_tablet_ids) {
+        std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& 
rows_stat_val) {
     auto num_rows = block->rows();
     std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
 
@@ -204,22 +221,23 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_parititon(
     _missing_map.clear();
     _missing_map.reserve(partition_col.column->size());
     bool stop_processing = false;
-    //TODO: we could use the buffer to save tablets we found so that no need 
to find them again when we created partitions and try to append block next time.
+
     RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
                                                  _tablet_indexes, 
stop_processing, _skip,
                                                  &_missing_map));
-    if (_missing_map.empty()) {
-        // we don't calculate it distribution when have missing values
-        if (has_filtered_rows) {
-            for (int i = 0; i < num_rows; i++) {
-                _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
-            }
+
+    // the missing vals for auto partition are also skipped.
+    if (has_filtered_rows) {
+        for (int i = 0; i < num_rows; i++) {
+            _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
         }
-        RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
-    } else { // for missing partition keys, calc the missing partition and 
save in _partitions_need_create
+    }
+    RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
+
+    if (!_missing_map.empty()) {
+        // for missing partition keys, calc the missing partition and save in 
_partitions_need_create
         auto [part_ctx, part_func] = _get_partition_function();
         auto return_type = part_func->data_type();
-
         // expose the data column
         vectorized::ColumnPtr range_left_col = 
block->get_by_position(partition_col_idx).column;
         if (const auto* nullable =
@@ -228,15 +246,19 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_parititon(
             return_type = assert_cast<const 
vectorized::DataTypeNullable*>(return_type.get())
                                   ->get_nested_type();
         }
-        // calc the end value and save them.
-        _save_missing_values(range_left_col, return_type);
-        // then call FE to create it. then FragmentExecutor will redo the load.
-        RETURN_IF_ERROR(_automatic_create_partition());
-        // In the next round, we will _generate_rows_distribution_payload 
again to get right payload of new tablet
-        LOG(INFO) << "Auto created partition. Send block again.";
-        return Status::NeedSendAgain("");
-    } // creating done
-
+        // calc the end value and save them. in the end of sending, we will 
create partitions for them and deal them.
+        RETURN_IF_ERROR(_save_missing_values(range_left_col, return_type, 
block, _missing_map));
+
+        size_t new_bt_rows = _batching_block->rows();
+        size_t new_bt_bytes = _batching_block->bytes();
+        rows_stat_val -= new_bt_rows - _batching_rows;
+        _state->update_num_rows_load_total(_batching_rows - new_bt_rows);
+        _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
+        DorisMetrics::instance()->load_rows->increment(_batching_rows - 
new_bt_rows);
+        DorisMetrics::instance()->load_bytes->increment(_batching_bytes - 
new_bt_bytes);
+        _batching_rows = new_bt_rows;
+        _batching_bytes = new_bt_bytes;
+    }
     return Status::OK();
 }
 
@@ -251,6 +273,7 @@ void VRowDistribution::_reset_row_part_tablet_ids(
         row_ids.clear();
         partition_ids.clear();
         tablet_ids.clear();
+        // This is important for performance.
         row_ids.reserve(rows);
         partition_ids.reserve(rows);
         tablet_ids.reserve(rows);
@@ -260,7 +283,7 @@ void VRowDistribution::_reset_row_part_tablet_ids(
 Status VRowDistribution::generate_rows_distribution(
         vectorized::Block& input_block, std::shared_ptr<vectorized::Block>& 
block,
         int64_t& filtered_rows, bool& has_filtered_rows,
-        std::vector<RowPartTabletIds>& row_part_tablet_ids) {
+        std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& 
rows_stat_val) {
     auto input_rows = input_block.rows();
     _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows);
 
@@ -269,6 +292,11 @@ Status VRowDistribution::generate_rows_distribution(
     RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
             _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, 
has_filtered_rows));
 
+    // batching block rows which need new partitions. deal together at finish.
+    if (!_batching_block) [[unlikely]] {
+        _batching_block = 
MutableBlock::create_unique(block->create_same_struct_block(0).release());
+    }
+
     _row_distribution_watch.start();
     auto num_rows = block->rows();
     _tablet_finder->filter_bitmap().Reset(num_rows);
@@ -283,15 +311,17 @@ Status VRowDistribution::generate_rows_distribution(
     int partition_col_idx = -1;
     if (_vpartition->is_projection_partition()) {
         // calc the start value of missing partition ranges.
+        // in VNodeChannel's add_block. the spare columns will be erased.
         RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), 
&partition_col_idx));
         VLOG_DEBUG << "Partition-calculated block:" << block->dump_data();
         // change the column to compare to transformed.
         _vpartition->set_transformed_slots({(uint16_t)partition_col_idx});
     }
 
-    if (_vpartition->is_auto_partition()) {
+    if (_vpartition->is_auto_partition() && !_deal_batched) {
         RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon(
-                block.get(), partition_col_idx, has_filtered_rows, 
row_part_tablet_ids));
+                block.get(), partition_col_idx, has_filtered_rows, 
row_part_tablet_ids,
+                rows_stat_val));
     } else { // not auto partition
         RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon(
                 block.get(), has_filtered_rows, row_part_tablet_ids));
diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
index 3376eb5ab6f..77104ef26f4 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -22,12 +22,14 @@
 #include <gen_cpp/FrontendService_types.h>
 #include <gen_cpp/PaloInternalService_types.h>
 
+#include <cstdint>
 #include <string>
 #include <unordered_map>
 #include <vector>
 
 #include "common/status.h"
 #include "exec/tablet_info.h"
+#include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
@@ -50,42 +52,46 @@ public:
     std::vector<int64_t> tablet_ids;
 };
 
-typedef Status (*OnPartitionsCreated)(void*, TCreatePartitionResult*);
-
-class VRowDistributionContext {
-public:
-    RuntimeState* state = nullptr;
-    OlapTableBlockConvertor* block_convertor = nullptr;
-    OlapTabletFinder* tablet_finder = nullptr;
-    VOlapTablePartitionParam* vpartition = nullptr;
-    RuntimeProfile::Counter* add_partition_request_timer = nullptr;
-    int64_t txn_id = -1;
-    ObjectPool* pool;
-    OlapTableLocationParam* location;
-    const VExprContextSPtrs* vec_output_expr_ctxs;
-    OnPartitionsCreated on_partitions_created;
-    void* caller;
-    std::shared_ptr<OlapTableSchemaParam> schema;
-};
+// void* for caller
+using CreatePartitionCallback = Status (*)(void*, TCreatePartitionResult*);
 
 class VRowDistribution {
 public:
-    VRowDistribution() {}
-    virtual ~VRowDistribution() {}
-
-    void init(VRowDistributionContext* ctx) {
-        _state = ctx->state;
-        _block_convertor = ctx->block_convertor;
-        _tablet_finder = ctx->tablet_finder;
-        _vpartition = ctx->vpartition;
-        _add_partition_request_timer = ctx->add_partition_request_timer;
-        _txn_id = ctx->txn_id;
-        _pool = ctx->pool;
-        _location = ctx->location;
-        _vec_output_expr_ctxs = ctx->vec_output_expr_ctxs;
-        _on_partitions_created = ctx->on_partitions_created;
-        _caller = ctx->caller;
-        _schema = ctx->schema;
+    // only used to pass parameters for VRowDistribution
+    struct VRowDistributionContext {
+        RuntimeState* state;
+        OlapTableBlockConvertor* block_convertor;
+        OlapTabletFinder* tablet_finder;
+        VOlapTablePartitionParam* vpartition;
+        RuntimeProfile::Counter* add_partition_request_timer;
+        int64_t txn_id = -1;
+        ObjectPool* pool;
+        OlapTableLocationParam* location;
+        const VExprContextSPtrs* vec_output_expr_ctxs;
+        std::shared_ptr<OlapTableSchemaParam> schema;
+        void* caller;
+        CreatePartitionCallback create_partition_callback;
+    };
+    friend class VTabletWriter;
+    friend class VTabletWriterV2;
+
+    VRowDistribution() = default;
+    virtual ~VRowDistribution() = default;
+
+    void init(VRowDistributionContext ctx) {
+        _state = ctx.state;
+        _batch_size = std::max(_state->batch_size(), 8192);
+        _block_convertor = ctx.block_convertor;
+        _tablet_finder = ctx.tablet_finder;
+        _vpartition = ctx.vpartition;
+        _add_partition_request_timer = ctx.add_partition_request_timer;
+        _txn_id = ctx.txn_id;
+        _pool = ctx.pool;
+        _location = ctx.location;
+        _vec_output_expr_ctxs = ctx.vec_output_expr_ctxs;
+        _schema = ctx.schema;
+        _caller = ctx.caller;
+        _create_partition_callback = ctx.create_partition_callback;
     }
 
     Status open(RowDescriptor* output_row_desc) {
@@ -111,15 +117,18 @@ public:
     Status generate_rows_distribution(vectorized::Block& input_block,
                                       std::shared_ptr<vectorized::Block>& 
block,
                                       int64_t& filtered_rows, bool& 
has_filtered_rows,
-                                      std::vector<RowPartTabletIds>& 
row_part_tablet_ids);
+                                      std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
+                                      int64_t& rows_stat_val);
+    bool need_deal_batching() const { return _deal_batched && _batching_rows > 
0; }
+    // create partitions when need for auto-partition table using 
#_partitions_need_create.
+    Status automatic_create_partition();
+    void clear_batching_stats();
 
 private:
     std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr> 
_get_partition_function();
 
-    void _save_missing_values(vectorized::ColumnPtr col, 
vectorized::DataTypePtr value_type);
-
-    // create partitions when need for auto-partition table using 
#_partitions_need_create.
-    Status _automatic_create_partition();
+    Status _save_missing_values(vectorized::ColumnPtr col, 
vectorized::DataTypePtr value_type,
+                                Block* block, std::vector<int64_t> filter);
 
     void _get_tablet_ids(vectorized::Block* block, int32_t index_idx,
                          std::vector<int64_t>& tablet_ids);
@@ -135,7 +144,7 @@ private:
 
     Status _generate_rows_distribution_for_auto_parititon(
             vectorized::Block* block, int partition_col_idx, bool 
has_filtered_rows,
-            std::vector<RowPartTabletIds>& row_part_tablet_ids);
+            std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t& 
rows_stat_val);
 
     Status _generate_rows_distribution_for_non_auto_parititon(
             vectorized::Block* block, bool has_filtered_rows,
@@ -144,11 +153,16 @@ private:
     void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
                                     int64_t rows);
 
-private:
     RuntimeState* _state = nullptr;
+    int _batch_size = 0;
 
-    // support only one partition column now
-    std::vector<std::vector<TStringLiteral>> _partitions_need_create;
+    // for auto partitions
+    std::vector<std::vector<TStringLiteral>>
+            _partitions_need_create; // support only one partition column now
+    std::unique_ptr<MutableBlock> _batching_block;
+    bool _deal_batched = false; // If true, send batched block before any 
block's append.
+    size_t _batching_rows = 0, _batching_bytes = 0;
+    std::set<std::string> _deduper;
 
     MonotonicStopWatch _row_distribution_watch;
     OlapTableBlockConvertor* _block_convertor = nullptr;
@@ -158,10 +172,9 @@ private:
     int64_t _txn_id = -1;
     ObjectPool* _pool;
     OlapTableLocationParam* _location = nullptr;
-    // std::function _on_partition_created;
     // int64_t _number_output_rows = 0;
     const VExprContextSPtrs* _vec_output_expr_ctxs;
-    OnPartitionsCreated _on_partitions_created = nullptr;
+    CreatePartitionCallback _create_partition_callback = nullptr;
     void* _caller;
     std::shared_ptr<OlapTableSchemaParam> _schema;
 
diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h
index bccdc39a066..37592847330 100644
--- a/be/src/vec/sink/vtablet_finder.h
+++ b/be/src/vec/sink/vtablet_finder.h
@@ -45,7 +45,7 @@ public:
     Status find_tablets(RuntimeState* state, vectorized::Block* block, int 
rows,
                         std::vector<VOlapTablePartition*>& partitions,
                         std::vector<uint32_t>& tablet_index, bool& filtered,
-                        std::vector<bool>& is_continue, std::vector<int64_t>* 
miss_rows = nullptr);
+                        std::vector<bool>& skip, std::vector<int64_t>* 
miss_rows = nullptr);
 
     bool is_find_tablet_every_sink() {
         return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK;
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index d1fc46dc8d8..8edde60adb4 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -86,11 +86,6 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_block_from_queue() {
     return block;
 }
 
-void AsyncResultWriter::_return_block_to_queue(std::unique_ptr<Block> 
add_block) {
-    std::lock_guard l(_m);
-    _data_queue.emplace_back(std::move(add_block));
-}
-
 void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* 
profile) {
     
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
             [this, state, profile]() { this->process_block(state, profile); 
}));
@@ -117,10 +112,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
 
             auto block = _get_block_from_queue();
             auto status = write(block);
-            if (status.is<ErrorCode::NEED_SEND_AGAIN>()) {
-                _return_block_to_queue(std::move(block));
-                continue;
-            } else if (UNLIKELY(!status.ok())) {
+            if (!status.ok()) [[unlikely]] {
                 std::unique_lock l(_m);
                 _writer_status = status;
                 if (_dependency && _is_finished()) {
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index a50b9296c7f..0a217b34e6b 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -105,7 +105,6 @@ private:
     [[nodiscard]] bool _is_finished() const { return !_writer_status.ok() || 
_eos; }
 
     std::unique_ptr<Block> _get_block_from_queue();
-    void _return_block_to_queue(std::unique_ptr<Block>);
 
     static constexpr auto QUEUE_SIZE = 3;
     std::mutex _m;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 7138993732b..1639703d986 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -46,11 +46,13 @@
 #include <utility>
 #include <vector>
 
+#include "common/config.h"
 #include "olap/wal_manager.h"
 #include "util/runtime_profile.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
 #include "vec/runtime/vdatetime_value.h"
+#include "vec/sink/vrow_distribution.h"
 #include "vec/sink/vtablet_sink.h"
 
 #ifdef DEBUG
@@ -110,9 +112,9 @@ bvar::PerSecond<bvar::Adder<int64_t>> 
g_sink_write_rows_per_second("sink_through
 
 Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets) {
     SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
-    for (auto& tablet : tablets) {
+    for (const auto& tablet : tablets) {
         // First find the location BEs of this tablet
-        auto tablet_locations = 
_parent->_location->find_tablet(tablet.tablet_id);
+        auto* tablet_locations = 
_parent->_location->find_tablet(tablet.tablet_id);
         if (tablet_locations == nullptr) {
             return Status::InternalError("unknown tablet, tablet_id={}", 
tablet.tablet_id);
         }
@@ -133,7 +135,7 @@ Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPart
             }
             channel->add_tablet(tablet);
             if (_parent->_write_single_replica) {
-                auto slave_location = 
_parent->_slave_location->find_tablet(tablet.tablet_id);
+                auto* slave_location = 
_parent->_slave_location->find_tablet(tablet.tablet_id);
                 if (slave_location != nullptr) {
                     channel->add_slave_tablet_nodes(tablet.tablet_id, 
slave_location->node_ids);
                 }
@@ -267,6 +269,22 @@ Status 
IndexChannel::check_tablet_filtered_rows_consistency() {
     return Status::OK();
 }
 
+static Status none_of(std::initializer_list<bool> vars) {
+    bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return 
var; });
+    Status st = Status::OK();
+    if (!none) {
+        std::string vars_str;
+        std::for_each(vars.begin(), vars.end(),
+                      [&vars_str](bool var) -> void { vars_str += (var ? "1/" 
: "0/"); });
+        if (!vars_str.empty()) {
+            vars_str.pop_back(); // 0/1/0/ -> 0/1/0
+        }
+        st = Status::Uninitialized(vars_str);
+    }
+
+    return st;
+}
+
 VNodeChannel::VNodeChannel(VTabletWriter* parent, IndexChannel* index_channel, 
int64_t node_id,
                            bool is_incremental)
         : _parent(parent),
@@ -296,7 +314,7 @@ Status VNodeChannel::init(RuntimeState* state) {
     _tuple_desc = _parent->_output_tuple_desc;
     _state = state;
     // get corresponding BE node.
-    auto node = _parent->_nodes_info->find_node(_node_id);
+    const auto* node = _parent->_nodes_info->find_node(_node_id);
     if (node == nullptr) {
         _cancelled = true;
         return Status::InternalError("unknown node id, id={}", _node_id);
@@ -306,7 +324,7 @@ Status VNodeChannel::init(RuntimeState* state) {
     _load_info = "load_id=" + print_id(_parent->_load_id) +
                  ", txn_id=" + std::to_string(_parent->_txn_id);
 
-    _row_desc.reset(new RowDescriptor(_tuple_desc, false));
+    _row_desc = std::make_unique<RowDescriptor>(_tuple_desc, false);
     _batch_size = state->batch_size();
 
     _stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(_node_info.host,
@@ -352,7 +370,7 @@ void VNodeChannel::_open_internal(bool is_incremental) {
         if (deduper.contains(tablet.tablet_id)) {
             continue;
         }
-        auto ptablet = request->add_tablets();
+        auto* ptablet = request->add_tablets();
         ptablet->set_partition_id(tablet.partition_id);
         ptablet->set_tablet_id(tablet.tablet_id);
         deduper.insert(tablet.tablet_id);
@@ -556,22 +574,6 @@ void VNodeChannel::_cancel_with_msg(const std::string& 
msg) {
     _cancelled = true;
 }
 
-Status VNodeChannel::none_of(std::initializer_list<bool> vars) {
-    bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return 
var; });
-    Status st = Status::OK();
-    if (!none) {
-        std::string vars_str;
-        std::for_each(vars.begin(), vars.end(),
-                      [&vars_str](bool var) -> void { vars_str += (var ? "1/" 
: "0/"); });
-        if (!vars_str.empty()) {
-            vars_str.pop_back(); // 0/1/0/ -> 0/1/0
-        }
-        st = Status::Uninitialized(vars_str);
-    }
-
-    return st;
-}
-
 void VNodeChannel::try_send_pending_block(RuntimeState* state) {
     SCOPED_ATTACH_TASK(state);
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker);
@@ -608,7 +610,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
             _send_block_callback->clear_in_flight();
             return;
         }
-        if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
+        if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95F) {
             LOG(WARNING) << "send block too large, this rpc may failed. send 
size: "
                          << compressed_bytes << ", threshold: " << 
config::brpc_max_body_size
                          << ", " << channel_info();
@@ -640,12 +642,10 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
         request->set_write_single_replica(false);
         if (_parent->_write_single_replica) {
             request->set_write_single_replica(true);
-            for (std::unordered_map<int64_t, std::vector<int64_t>>::iterator 
iter =
-                         _slave_tablet_nodes.begin();
-                 iter != _slave_tablet_nodes.end(); iter++) {
+            for (auto& _slave_tablet_node : _slave_tablet_nodes) {
                 PSlaveTabletNodes slave_tablet_nodes;
-                for (auto node_id : iter->second) {
-                    auto node = _parent->_nodes_info->find_node(node_id);
+                for (auto node_id : _slave_tablet_node.second) {
+                    const auto* node = 
_parent->_nodes_info->find_node(node_id);
                     if (node == nullptr) {
                         return;
                     }
@@ -655,7 +655,8 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
                     pnode->set_host(node->host);
                     pnode->set_async_internal_port(node->brpc_port);
                 }
-                request->mutable_slave_tablet_nodes()->insert({iter->first, 
slave_tablet_nodes});
+                request->mutable_slave_tablet_nodes()->insert(
+                        {_slave_tablet_node.first, slave_tablet_nodes});
             }
         }
 
@@ -722,7 +723,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
     Status status(Status::create(result.status()));
     if (status.ok()) {
         // if has error tablet, handle them first
-        for (auto& error : result.tablet_errors()) {
+        for (const auto& error : result.tablet_errors()) {
             _index_channel->mark_as_failed(this, "tablet error: " + 
error.msg(), error.tablet_id());
         }
 
@@ -730,7 +731,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
         if (!st.ok()) {
             _cancel_with_msg(st.to_string());
         } else if (is_last_rpc) {
-            for (auto& tablet : result.tablet_vec()) {
+            for (const auto& tablet : result.tablet_vec()) {
                 TTabletCommitInfo commit_info;
                 commit_info.tabletId = tablet.tablet_id();
                 commit_info.backendId = _node_id;
@@ -748,7 +749,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
                               << ", host: " << this->host() << ", txn_id=" << 
_parent->_txn_id;
             }
             if (_parent->_write_single_replica) {
-                for (auto& tablet_slave_node_ids : 
result.success_slave_tablet_node_ids()) {
+                for (const auto& tablet_slave_node_ids : 
result.success_slave_tablet_node_ids()) {
                     for (auto slave_node_id : 
tablet_slave_node_ids.second.slave_node_ids()) {
                         TTabletCommitInfo commit_info;
                         commit_info.tabletId = tablet_slave_node_ids.first;
@@ -776,7 +777,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
     }
     if (result.has_load_channel_profile()) {
         TRuntimeProfileTree tprofile;
-        const uint8_t* buf = (const 
uint8_t*)result.load_channel_profile().data();
+        const auto* buf = (const uint8_t*)result.load_channel_profile().data();
         uint32_t len = result.load_channel_profile().size();
         auto st = deserialize_thrift_msg(buf, &len, false, &tprofile);
         if (st.ok()) {
@@ -944,6 +945,11 @@ void VTabletWriter::_send_batch_process() {
     SCOPED_ATTACH_TASK(_state);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
 
+    int sleep_time = config::olap_table_sink_send_interval_microseconds *
+                     (_vpartition->is_auto_partition()
+                              ? 
config::olap_table_sink_send_interval_auto_partition_factor
+                              : 1);
+
     while (true) {
         // incremental open will temporarily make channels into abnormal 
state. stop checking when this.
         std::unique_lock<std::mutex> l(_stop_check_channel);
@@ -986,7 +992,7 @@ void VTabletWriter::_send_batch_process() {
                 return;
             }
         }
-        bthread_usleep(config::olap_table_sink_send_interval_ms * 1000);
+        bthread_usleep(sleep_time);
     }
 }
 
@@ -1059,25 +1065,20 @@ static Status on_partitions_created(void* writer, 
TCreatePartitionResult* result
 }
 
 Status VTabletWriter::_init_row_distribution() {
-    VRowDistributionContext ctx;
-
-    ctx.state = _state;
-    ctx.block_convertor = _block_convertor.get();
-    ctx.tablet_finder = _tablet_finder.get();
-    ctx.vpartition = _vpartition;
-    ctx.add_partition_request_timer = _add_partition_request_timer;
-    ctx.txn_id = _txn_id;
-    ctx.pool = _pool;
-    ctx.location = _location;
-    ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs;
-    ctx.on_partitions_created = &vectorized::on_partitions_created;
-    ctx.caller = (void*)this;
-    ctx.schema = _schema;
-
-    _row_distribution.init(&ctx);
-
-    RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
-    return Status::OK();
+    _row_distribution.init({.state = _state,
+                            .block_convertor = _block_convertor.get(),
+                            .tablet_finder = _tablet_finder.get(),
+                            .vpartition = _vpartition,
+                            .add_partition_request_timer = 
_add_partition_request_timer,
+                            .txn_id = _txn_id,
+                            .pool = _pool,
+                            .location = _location,
+                            .vec_output_expr_ctxs = &_vec_output_expr_ctxs,
+                            .schema = _schema,
+                            .caller = this,
+                            .create_partition_callback = 
&vectorized::on_partitions_created});
+
+    return _row_distribution.open(_output_row_desc);
 }
 
 Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
@@ -1198,7 +1199,7 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     for (int i = 0; i < _schema->indexes().size(); ++i) {
         // collect all tablets belong to this rollup
         std::vector<TTabletWithPartition> tablets;
-        auto index = _schema->indexes()[i];
+        auto* index = _schema->indexes()[i];
         for (const auto& part : partitions) {
             for (const auto& tablet : part->indexes[i].tablets) {
                 TTabletWithPartition tablet_with_partition;
@@ -1235,7 +1236,7 @@ Status VTabletWriter::_incremental_open_node_channel(
     for (int i = 0; i < _schema->indexes().size(); ++i) {
         const OlapTableIndexSchema* index = _schema->indexes()[i];
         std::vector<TTabletWithPartition> tablets;
-        for (auto& t_part : partitions) {
+        for (const auto& t_part : partitions) {
             VOlapTablePartition* part = nullptr;
             RETURN_IF_ERROR(_vpartition->generate_partition_from(t_part, 
part));
             for (const auto& tablet : part->indexes[i].tablets) {
@@ -1279,7 +1280,7 @@ Status VTabletWriter::_incremental_open_node_channel(
     return Status::OK();
 }
 
-Status VTabletWriter::_cancel_channel_and_check_intolerable_failure(
+static Status cancel_channel_and_check_intolerable_failure(
         Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
         const std::shared_ptr<VNodeChannel> nch) {
     LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << 
err_msg;
@@ -1311,10 +1312,38 @@ void VTabletWriter::_cancel_all_channel(Status status) {
             print_id(_load_id), _txn_id, status);
 }
 
+Status VTabletWriter::_send_new_partition_batch() {
+    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 
1 time
+        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+
+        Block tmp_block = _row_distribution._batching_block->to_block(); // 
Borrow out, for lval ref
+
+        // these order is only.
+        //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
+        //  2. deal batched block
+        //  3. now reuse the column of lval block. cuz append_block doesn't 
real adjust it. it generate a new block from that.
+        _row_distribution.clear_batching_stats();
+        RETURN_IF_ERROR(this->append_block(tmp_block));
+        _row_distribution._batching_block->set_mutable_columns(
+                tmp_block.mutate_columns()); // Recovery back
+        _row_distribution._batching_block->clear_column_data();
+        _row_distribution._deal_batched = false;
+    }
+    return Status::OK();
+}
+
 Status VTabletWriter::try_close(RuntimeState* state, Status exec_status) {
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
-    _try_close = true;
+
+    // must before set _try_close
+    if (status.ok()) {
+        SCOPED_TIMER(_profile->total_time_counter());
+        _row_distribution._deal_batched = true;
+        status = _send_new_partition_batch();
+    }
+
+    _try_close = true; // will stop periodic thread
     if (status.ok()) {
         // only if status is ok can we call this 
_profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that 
_profile is null
@@ -1325,14 +1354,14 @@ Status VTabletWriter::try_close(RuntimeState* state, 
Status exec_status) {
                     break;
                 }
                 index_channel->for_each_node_channel(
-                        [this, &index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
+                        [&index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
                             if (!status.ok() || ch->is_closed()) {
                                 return;
                             }
                             // only first try close, all node channels will 
mark_close()
                             ch->mark_close();
                             if (ch->is_cancelled()) {
-                                status = 
this->_cancel_channel_and_check_intolerable_failure(
+                                status = 
cancel_channel_and_check_intolerable_failure(
                                         status, ch->get_cancel_msg(), 
index_channel, ch);
                             }
                         });
@@ -1413,7 +1442,7 @@ Status VTabletWriter::close(Status exec_status) {
                         // no pipeline, close may block waiting.
                         auto s = ch->close_wait(_state);
                         if (!s.ok()) {
-                            status = 
this->_cancel_channel_and_check_intolerable_failure(
+                            status = 
cancel_channel_and_check_intolerable_failure(
                                     status, s.to_string(), index_channel, ch);
                         }
                         ch->time_report(&node_add_batch_counter_map, 
&serialize_batch_ns,
@@ -1572,6 +1601,9 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
         return status;
     }
 
+    // check out of limit
+    RETURN_IF_ERROR(_send_new_partition_batch());
+
     auto rows = input_block.rows();
     auto bytes = input_block.bytes();
     if (UNLIKELY(rows == 0)) {
@@ -1584,7 +1616,8 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
     int64_t filtered_rows = 0;
 
     RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
-            input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids));
+            input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids,
+            _number_input_rows));
 
     ChannelDistributionPayloadVec channel_to_payload;
 
@@ -1624,11 +1657,7 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
                                                     block.get(), 
_block_convertor.get(),
                                                     _tablet_finder.get()));
     }
-    // TODO: Before load, we need to projection unuseful column
-    // auto slots = _schema->tuple_desc()->slots();
-    // for (auto desc : slots) {
-    //     desc->col_pos();
-    // }
+
     // Add block to node channel
     for (size_t i = 0; i < _channels.size(); i++) {
         for (const auto& entry : channel_to_payload[i]) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index e9f6ae13f43..c6190c2675b 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -29,15 +29,13 @@
 #include <gen_cpp/types.pb.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
-#include <stddef.h>
-#include <stdint.h>
-
-#include <atomic>
 
 #include "olap/wal_writer.h"
 #include "vwal_writer.h"
 // IWYU pragma: no_include <bits/chrono.h>
+#include <atomic>
 #include <chrono> // IWYU pragma: keep
+#include <cstddef>
 #include <cstdint>
 #include <functional>
 #include <initializer_list>
@@ -60,6 +58,7 @@
 #include "exec/data_sink.h"
 #include "exec/tablet_info.h"
 #include "gutil/ref_counted.h"
+#include "olap/wal_writer.h"
 #include "runtime/decimalv2_value.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
@@ -226,7 +225,7 @@ public:
     void add_tablet(const TTabletWithPartition& tablet) { 
_all_tablets.emplace_back(tablet); }
     std::string debug_tablets() const {
         std::stringstream ss;
-        for (auto& tab : _all_tablets) {
+        for (const auto& tab : _all_tablets) {
             tab.printTo(ss);
             ss << '\n';
         }
@@ -272,7 +271,7 @@ public:
         ss << "close wait failed coz rpc error";
         {
             std::lock_guard<doris::SpinLock> l(_cancel_msg_lock);
-            if (_cancel_msg != "") {
+            if (!_cancel_msg.empty()) {
                 ss << ". " << _cancel_msg;
             }
         }
@@ -309,8 +308,6 @@ public:
     std::string host() const { return _node_info.host; }
     std::string name() const { return _name; }
 
-    Status none_of(std::initializer_list<bool> vars);
-
     std::string channel_info() const {
         return fmt::format("{}, {}, node={}:{}", _name, _load_info, 
_node_info.host,
                            _node_info.brpc_port);
@@ -418,9 +415,8 @@ protected:
 // an IndexChannel is related to specific table and its rollup and mv
 class IndexChannel {
 public:
-    IndexChannel(VTabletWriter* parent, int64_t index_id,
-                 const vectorized::VExprContextSPtr& where_clause)
-            : _parent(parent), _index_id(index_id), 
_where_clause(where_clause) {
+    IndexChannel(VTabletWriter* parent, int64_t index_id, 
vectorized::VExprContextSPtr where_clause)
+            : _parent(parent), _index_id(index_id), 
_where_clause(std::move(where_clause)) {
         _index_channel_tracker =
                 std::make_unique<MemTracker>("IndexChannel:indexID=" + 
std::to_string(_index_id));
     }
@@ -447,7 +443,7 @@ public:
 
     size_t get_pending_bytes() const {
         size_t mem_consumption = 0;
-        for (auto& kv : _node_channels) {
+        for (const auto& kv : _node_channels) {
             mem_consumption += kv.second->get_pending_bytes();
         }
         return mem_consumption;
@@ -542,6 +538,8 @@ public:
 
     Status on_partitions_created(TCreatePartitionResult* result);
 
+    Status _send_new_partition_batch();
+
 private:
     friend class VNodeChannel;
     friend class IndexChannel;
@@ -560,18 +558,8 @@ private:
     void _generate_index_channels_payloads(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
                                            ChannelDistributionPayloadVec& 
payload);
 
-    Status _cancel_channel_and_check_intolerable_failure(Status status, const 
std::string& err_msg,
-                                                         const 
std::shared_ptr<IndexChannel> ich,
-                                                         const 
std::shared_ptr<VNodeChannel> nch);
-
     void _cancel_all_channel(Status status);
 
-    void _save_missing_values(vectorized::ColumnPtr col, 
vectorized::DataTypePtr value_type,
-                              std::vector<int64_t> filter);
-
-    // create partitions when need for auto-partition table using 
#_partitions_need_create.
-    Status _automatic_create_partition();
-
     Status _incremental_open_node_channel(const 
std::vector<TOlapTablePartition>& partitions);
 
     TDataSink _t_sink;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 8bc65a4cba4..fe8be28ec3b 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -53,9 +53,7 @@
 #include "vec/sink/vtablet_block_convertor.h"
 #include "vec/sink/vtablet_finder.h"
 
-namespace doris {
-
-namespace vectorized {
+namespace doris::vectorized {
 
 VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs)
         : AsyncResultWriter(output_exprs), _t_sink(t_sink) {
@@ -121,26 +119,20 @@ Status VTabletWriterV2::_incremental_open_streams(
 }
 
 Status VTabletWriterV2::_init_row_distribution() {
-    VRowDistributionContext ctx;
-
-    ctx.state = _state;
-    ctx.block_convertor = _block_convertor.get();
-    ctx.tablet_finder = _tablet_finder.get();
-    ctx.vpartition = _vpartition;
-    ctx.add_partition_request_timer = _add_partition_request_timer;
-    ctx.txn_id = _txn_id;
-    ctx.pool = _pool;
-    ctx.location = _location;
-    ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs;
-    ctx.on_partitions_created = &vectorized::on_partitions_created;
-    ctx.caller = (void*)this;
-    ctx.schema = _schema;
-
-    _row_distribution.init(&ctx);
-
-    RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
-
-    return Status::OK();
+    _row_distribution.init({.state = _state,
+                            .block_convertor = _block_convertor.get(),
+                            .tablet_finder = _tablet_finder.get(),
+                            .vpartition = _vpartition,
+                            .add_partition_request_timer = 
_add_partition_request_timer,
+                            .txn_id = _txn_id,
+                            .pool = _pool,
+                            .location = _location,
+                            .vec_output_expr_ctxs = &_vec_output_expr_ctxs,
+                            .schema = _schema,
+                            .caller = (void*)this,
+                            .create_partition_callback = 
&vectorized::on_partitions_created});
+
+    return _row_distribution.open(_output_row_desc);
 }
 
 Status VTabletWriterV2::init_properties(ObjectPool* pool, bool group_commit) {
@@ -331,11 +323,11 @@ void 
VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& r
 
 Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t 
partition_id, int64_t index_id,
                                         Streams& streams) {
-    auto location = _location->find_tablet(tablet_id);
+    const auto* location = _location->find_tablet(tablet_id);
     if (location == nullptr) {
         return Status::InternalError("unknown tablet location, tablet id = 
{}", tablet_id);
     }
-    for (auto& node_id : location->node_ids) {
+    for (const auto& node_id : location->node_ids) {
         PTabletID tablet;
         tablet.set_partition_id(partition_id);
         tablet.set_index_id(index_id);
@@ -356,6 +348,9 @@ Status VTabletWriterV2::append_block(Block& input_block) {
         return status;
     }
 
+    // check out of limit
+    RETURN_IF_ERROR(_send_new_partition_batch());
+
     auto input_rows = input_block.rows();
     auto input_bytes = input_block.bytes();
     if (UNLIKELY(input_rows == 0)) {
@@ -379,7 +374,8 @@ Status VTabletWriterV2::append_block(Block& input_block) {
 
     std::shared_ptr<vectorized::Block> block;
     RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
-            input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids));
+            input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids,
+            _number_input_rows));
     RowsForTablet rows_for_tablet;
     _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
 
@@ -409,7 +405,7 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
                 .is_high_priority = _is_high_priority,
                 .write_file_cache = _write_file_cache,
         };
-        for (auto& index : _schema->indexes()) {
+        for (const auto& index : _schema->indexes()) {
             if (index->index_id == rows.index_id) {
                 req.slots = &index->slots;
                 req.schema_hash = index->schema_hash;
@@ -440,6 +436,26 @@ Status VTabletWriterV2::_cancel(Status status) {
     return Status::OK();
 }
 
+Status VTabletWriterV2::_send_new_partition_batch() {
+    if (_row_distribution.need_deal_batching()) { // maybe try_close more than 
1 time
+        RETURN_IF_ERROR(_row_distribution.automatic_create_partition());
+
+        Block tmp_block = _row_distribution._batching_block->to_block(); // 
Borrow out, for lval ref
+
+        // these order is only.
+        //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
+        //  2. deal batched block
+        //  3. now reuse the column of lval block. cuz append_block doesn't 
real adjust it. it generate a new block from that.
+        _row_distribution.clear_batching_stats();
+        RETURN_IF_ERROR(this->append_block(tmp_block));
+        _row_distribution._batching_block->set_mutable_columns(
+                tmp_block.mutate_columns()); // Recovery back
+        _row_distribution._batching_block->clear_column_data();
+        _row_distribution._deal_batched = false;
+    }
+    return Status::OK();
+}
+
 Status VTabletWriterV2::close(Status exec_status) {
     std::lock_guard<std::mutex> close_lock(_close_mutex);
     if (_is_closed) {
@@ -447,6 +463,13 @@ Status VTabletWriterV2::close(Status exec_status) {
     }
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
+
+    if (status.ok()) {
+        SCOPED_TIMER(_profile->total_time_counter());
+        _row_distribution._deal_batched = true;
+        status = _send_new_partition_batch();
+    }
+
     if (status.ok()) {
         // only if status is ok can we call this 
_profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that 
_profile is null
@@ -538,5 +561,4 @@ Status VTabletWriterV2::_close_load(const Streams& streams) 
{
     return Status::OK();
 }
 
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index d4ccf7b6523..3ce291eb9ac 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -130,6 +130,8 @@ private:
 
     Status _incremental_open_streams(const std::vector<TOlapTablePartition>& 
partitions);
 
+    Status _send_new_partition_batch();
+
     void _build_tablet_node_mapping();
 
     void _generate_rows_for_tablet(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index d3192d92452..2250c06115e 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -1500,3 +1500,13 @@ Indicates how many tablets failed to load in the data 
directory. At the same tim
 
 * Description: BE Whether to enable the use of java-jni. When enabled, mutual 
calls between c++ and java are allowed. Currently supports hudi, java-udf, 
jdbc, max-compute, paimon, preload, avro
 * Default value: true
+
+#### `olap_table_sink_send_interval_microseconds`
+
+* Description: While loading data, there's a polling thread keep sending data 
to corresponding BE from Coordinator's sink node. This thread will check 
whether there's data to send every `olap_table_sink_send_interval_microseconds` 
microseconds.
+* Default value: 1000
+
+#### `olap_table_sink_send_interval_auto_partition_factor`
+
+* Description: If we load data to a table which enabled auto partition. the 
interval of `olap_table_sink_send_interval_microseconds` is too slow. In that 
case the real interval will multiply this factor.
+* Default value: 0.001
diff --git a/docs/en/docs/advanced/partition/auto-partition.md 
b/docs/en/docs/advanced/partition/auto-partition.md
index 3e6fc6dc8e8..f601445f874 100644
--- a/docs/en/docs/advanced/partition/auto-partition.md
+++ b/docs/en/docs/advanced/partition/auto-partition.md
@@ -38,19 +38,19 @@ When building a table, use the following syntax to populate 
[CREATE-TABLE](../..
 
 1. AUTO RANGE PARTITION:
 
-  ```SQL
+  ```sql
   AUTO PARTITION BY RANGE FUNC_CALL_EXPR
   (
   )
   ```
   where
-  ```SQL
+  ```sql
   FUNC_CALL_EXPR ::= date_trunc ( <partition_column>, '<interval>' )
   ```
 
 2. AUTO LIST PARTITION:
 
-  ```SQL
+  ```sql
   AUTO PARTITION BY LIST(`partition_col`)
   (
   )
@@ -60,7 +60,7 @@ When building a table, use the following syntax to populate 
[CREATE-TABLE](../..
 
 1. AUTO RANGE PARTITION
 
-  ```SQL
+  ```sql
   CREATE TABLE `${tblDate}` (
       `TIME_STAMP` datev2 NOT NULL COMMENT 'Date of collection'
   ) ENGINE=OLAP
@@ -76,7 +76,7 @@ When building a table, use the following syntax to populate 
[CREATE-TABLE](../..
 
 2. AUTO LIST PARTITION
 
-  ```SQL
+  ```sql
   CREATE TABLE `${tblName1}` (
       `str` varchar not null
   ) ENGINE=OLAP
@@ -144,7 +144,7 @@ PROPERTIES (
 
 The table stores a large amount of business history data, partitioned based on 
the date the transaction occurred. As you can see when building the table, we 
need to manually create the partitions in advance. If the data range of the 
partitioned columns changes, for example, 2022 is added to the above table, we 
need to create a partition by 
[ALTER-TABLE-PARTITION](../../sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-PARTITION.md)
 to make changes to the table partiti [...]
 
-```SQL
+```sql
 CREATE TABLE `DAILY_TRADE_VALUE`
 (
     `TRADE_DATE`              datev2 NULL,
@@ -162,13 +162,13 @@ PROPERTIES (
 ```
 
 At this point the new table does not have a default partition:
-```SQL
+```sql
 mysql> show partitions from `DAILY_TRADE_VALUE`;
 Empty set (0.12 sec)
 ```
 
 After inserting the data and then viewing it again, we could found that the 
table has been created with corresponding partitions:
-```SQL
+```sql
 mysql> insert into `DAILY_TRADE_VALUE` values ('2012-12-13', 1), 
('2008-02-03', 2), ('2014-11-11', 3);
 Query OK, 3 rows affected (0.88 sec)
 {'label':'insert_754e2a3926a345ea_854793fb2638f0ec', 'status':'VISIBLE', 
'txnId':'20014'}
@@ -188,3 +188,4 @@ mysql> show partitions from `DAILY_TRADE_VALUE`;
 
 - If a partition is created during the insertion or importation of data and 
the process eventually fails, the created partition is not automatically 
deleted.
 - Tables that use AUTO PARTITION only have their partitions created 
automatically instead of manually. The original use of the table and the 
partitions it creates is the same as for non-AUTO PARTITION tables or 
partitions.
+- When importing data to a table with AUTO PARTITION enabled, the polling 
interval for data sent by the Coordinator is different from that of a normal 
table. For details, see `olap_table_sink_send_interval_auto_partition_factor` 
in [BE Configuration](../../admin-manual/config/be-config.md).
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 3a94883804d..586958e3a6f 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -1527,5 +1527,15 @@ load tablets from header failed, failed tablets size: 
xxx, path=xxx
 
 #### `enable_java_support`
 
-* Description: BE 是否开启使用java-jni,开启后允许 c++  与 java 
之间的相互调用。目前已经支持hudi、java-udf、jdbc、max-compute、paimon、preload、avro
-* Default value: true
+* 描述: BE 是否开启使用java-jni,开启后允许 c++ 与 java 
之间的相互调用。目前已经支持hudi、java-udf、jdbc、max-compute、paimon、preload、avro
+* 默认值: true
+
+#### `olap_table_sink_send_interval_microseconds`.
+
+* 描述: 数据导入时,Coordinator 的 sink 节点有一个轮询线程持续向对应BE发送数据。该线程将每隔 
`olap_table_sink_send_interval_microseconds` 微秒检查是否有数据要发送。
+* 默认值:1000
+
+#### `olap_table_sink_send_interval_auto_partition_factor`.
+
+* 描述: 如果我们向一个启用了自动分区的表导入数据,那么 `olap_table_sink_send_interval_microseconds` 
的时间间隔就会太慢。在这种情况下,实际间隔将乘以该系数。
+* 默认值:0.001
diff --git a/docs/zh-CN/docs/advanced/partition/auto-partition.md 
b/docs/zh-CN/docs/advanced/partition/auto-partition.md
index 42c07581c2e..ef0da5032de 100644
--- a/docs/zh-CN/docs/advanced/partition/auto-partition.md
+++ b/docs/zh-CN/docs/advanced/partition/auto-partition.md
@@ -38,19 +38,19 @@ under the License.
 
 1. AUTO RANGE PARTITION:
 
-  ```SQL
+  ```sql
   AUTO PARTITION BY RANGE FUNC_CALL_EXPR
   (
   )
   ```
   其中
-  ```SQL
+  ```sql
   FUNC_CALL_EXPR ::= date_trunc ( <partition_column>, '<interval>' )
   ```
 
 2. AUTO LIST PARTITION:
 
-  ```SQL
+  ```sql
   AUTO PARTITION BY LIST(`partition_col`)
   (
   )
@@ -60,7 +60,7 @@ under the License.
 
 1. AUTO RANGE PARTITION
 
-  ```SQL
+  ```sql
   CREATE TABLE `${tblDate}` (
       `TIME_STAMP` datev2 NOT NULL COMMENT '采集日期'
   ) ENGINE=OLAP
@@ -76,7 +76,7 @@ under the License.
 
 2. AUTO LIST PARTITION
 
-  ```SQL
+  ```sql
   CREATE TABLE `${tblName1}` (
       `str` varchar not null
   ) ENGINE=OLAP
@@ -144,7 +144,7 @@ PROPERTIES (
 
 
该表内存储了大量业务历史数据,依据交易发生的日期进行分区。可以看到在建表时,我们需要预先手动创建分区。如果分区列的数据范围发生变化,例如上表中增加了2022年的数据,则我们需要通过[ALTER-TABLE-PARTITION](../../sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-PARTITION.md)对表的分区进行更改。在使用AUTO
 PARTITION后,该表DDL可以改为:
 
-```SQL
+```sql
 CREATE TABLE `DAILY_TRADE_VALUE`
 (
     `TRADE_DATE`              datev2 NULL COMMENT '交易日期',
@@ -162,13 +162,13 @@ PROPERTIES (
 ```
 
 此时新表没有默认分区:
-```SQL
+```sql
 mysql> show partitions from `DAILY_TRADE_VALUE`;
 Empty set (0.12 sec)
 ```
 
 经过插入数据后再查看,发现该表已经创建了对应的分区:
-```SQL
+```sql
 mysql> insert into `DAILY_TRADE_VALUE` values ('2012-12-13', 1), 
('2008-02-03', 2), ('2014-11-11', 3);
 Query OK, 3 rows affected (0.88 sec)
 {'label':'insert_754e2a3926a345ea_854793fb2638f0ec', 'status':'VISIBLE', 
'txnId':'20014'}
@@ -188,3 +188,4 @@ mysql> show partitions from `DAILY_TRADE_VALUE`;
 
 - 在数据的插入或导入过程中如果创建了分区,而最终整个过程失败,被创建的分区不会被自动删除。
 - 使用AUTO PARTITION的表,只是分区创建方式上由手动转为了自动。表及其所创建分区的原本使用方法都与非AUTO PARTITION的表或分区相同。
+- 向开启了AUTO 
PARTITION的表导入数据时,Coordinator发送数据的轮询间隔与普通表有所不同。具体请见[BE配置项](../../admin-manual/config/be-config.md)中的`olap_table_sink_send_interval_auto_partition_factor`。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index e2cc67d992a..351f7aa92f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -368,23 +368,30 @@ public class OlapTableSink extends DataSink {
                         }
                     }
                 }
-                boolean  enableAutomaticPartition = 
partitionInfo.enableAutomaticPartition();
+                boolean enableAutomaticPartition = 
partitionInfo.enableAutomaticPartition();
                 // for auto create partition by function expr, there is no any 
partition firstly,
                 // But this is required in thrift struct.
                 if (enableAutomaticPartition && partitionIds.isEmpty()) {
                     
partitionParam.setDistributedColumns(getDistColumns(table.getDefaultDistributionInfo()));
                     partitionParam.setPartitions(new 
ArrayList<TOlapTablePartition>());
                 }
-                ArrayList<Expr> exprs = partitionInfo.getPartitionExprs();
-                if (enableAutomaticPartition && exprs != null && analyzer != 
null) {
+
+                ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
+                if (enableAutomaticPartition && exprSource != null && analyzer 
!= null) {
                     Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), 
analyzer.getContext());
                     tupleDescriptor.setTable(table);
                     funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
+                    // we must clone the exprs. otherwise analyze will 
influence the origin exprs.
+                    ArrayList<Expr> exprs = new ArrayList<Expr>();
+                    for (Expr e : exprSource) {
+                        exprs.add(e.clone());
+                    }
                     for (Expr e : exprs) {
                         e.analyze(funcAnalyzer);
                     }
                     
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
                 }
+
                 
partitionParam.setEnableAutomaticPartition(enableAutomaticPartition);
                 break;
             }
diff --git 
a/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out 
b/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out
index 7e1dd673f69..2d85d850212 100644
--- 
a/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out
+++ 
b/regression-test/data/partition_p0/auto_partition/test_auto_partition_load.out
@@ -10,6 +10,10 @@
 8      2006-12-12T12:12:12     2001-11-12T12:12:12.123456
 9      2006-12-12T12:12:12     2001-11-13T12:12:12.123456
 10     2007-12-12T12:12:12     2001-11-14T12:12:12.123456
+11     2007-12-12T12:12:12     2001-11-14T12:12:12.123456
+12     2008-12-12T12:12:12     2001-11-14T12:12:12.123456
+13     2003-12-12T12:12:12     2001-11-14T12:12:12.123456
+14     2002-12-12T12:12:12     2001-11-14T12:12:12.123456
 
 -- !select2 --
 1      Beijing 2001-12-12T12:12:12.123456
@@ -22,4 +26,8 @@
 8      chengDU 2001-11-12T12:12:12.123456
 9      xian    2001-11-13T12:12:12.123456
 10     beiJing 2001-11-14T12:12:12.123456
+11     11      2123-11-14T12:12:12.123456
+12     Chengdu 2123-11-14T12:12:12.123456
+13     11      2123-11-14T12:12:12.123456
+14     12      2123-11-14T12:12:12.123456
 
diff --git 
a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy
 
b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy
index 0cf2eaf9c12..351d7bb3200 100644
--- 
a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy
+++ 
b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_load.groovy
@@ -40,11 +40,15 @@ suite("test_auto_partition_load") {
         file "auto_partition_stream_load1.csv"
         time 20000
     }
+    sql """ insert into ${tblName1} values (11, '2007-12-12 12:12:12.123', 
'2001-11-14 12:12:12.123456') """
+    sql """ insert into ${tblName1} values (12, '2008-12-12 12:12:12.123', 
'2001-11-14 12:12:12.123456') """
+    sql """ insert into ${tblName1} values (13, '2003-12-12 12:12:12.123', 
'2001-11-14 12:12:12.123456') """
+    sql """ insert into ${tblName1} values (14, '2002-12-12 12:12:12.123', 
'2001-11-14 12:12:12.123456') """
 
     qt_select1 "select * from ${tblName1} order by k1"
     result1 = sql "show partitions from ${tblName1}"
     logger.info("${result1}")
-    assertEquals(result1.size(), 7)
+    assertEquals(result1.size(), 8)
 
 
     def tblName2 = "load_table2"
@@ -71,9 +75,13 @@ suite("test_auto_partition_load") {
         file "auto_partition_stream_load2.csv"
         time 20000
     }
+    sql """ insert into ${tblName2} values (11, '11', '2123-11-14 
12:12:12.123456') """
+    sql """ insert into ${tblName2} values (12, 'Chengdu', '2123-11-14 
12:12:12.123456') """
+    sql """ insert into ${tblName2} values (13, '11', '2123-11-14 
12:12:12.123456') """
+    sql """ insert into ${tblName2} values (14, '12', '2123-11-14 
12:12:12.123456') """
 
     qt_select2 "select * from ${tblName2} order by k1"
     result2 = sql "show partitions from ${tblName2}"
     logger.info("${result2}")
-    assertEquals(result2.size(), 9)
+    assertEquals(result2.size(), 11)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to