This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0393c9b  [Optimize] Support send batch parallelism for olap table sink 
(#6397)
0393c9b is described below

commit 0393c9b3b9a8580df2285cb3875fff1b8e6d59dc
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Mon Aug 30 11:03:09 2021 +0800

    [Optimize] Support send batch parallelism for olap table sink (#6397)
    
    * Support send batch parallelism for olap table sink
    
    Co-authored-by: caiconghui <caicong...@xiaomi.com>
---
 be/src/common/config.h                             |  11 ++
 be/src/exec/tablet_sink.cpp                        | 123 ++++++++++++---------
 be/src/exec/tablet_sink.h                          |  16 ++-
 be/src/http/action/stream_load.cpp                 |  14 ++-
 be/src/http/http_common.h                          |   2 +
 be/src/olap/memtable_flush_executor.cpp            |  11 +-
 be/src/olap/memtable_flush_executor.h              |  11 +-
 be/src/runtime/exec_env.h                          |   2 +
 be/src/runtime/exec_env_init.cpp                   |  18 +++
 be/src/service/internal_service.cpp                |   7 +-
 be/src/util/doris_metrics.h                        |   2 +
 be/src/util/threadpool.h                           |   5 +
 be/test/exec/tablet_sink_test.cpp                  |   7 +-
 docs/en/administrator-guide/config/be_config.md    |  17 +++
 docs/en/administrator-guide/variables.md           |   4 +
 .../Data Manipulation/BROKER LOAD.md               |  12 +-
 .../Data Manipulation/ROUTINE LOAD.md              |   3 +
 .../Data Manipulation/STREAM LOAD.md               |   2 +
 docs/zh-CN/administrator-guide/config/be_config.md |  18 +++
 docs/zh-CN/administrator-guide/variables.md        |   5 +-
 .../Data Manipulation/BROKER LOAD.md               |  57 +++++-----
 .../Data Manipulation/ROUTINE LOAD.md              |   6 +-
 .../Data Manipulation/STREAM LOAD.md               |   2 +
 .../doris/analysis/CreateRoutineLoadStmt.java      |  13 +++
 .../java/org/apache/doris/analysis/InsertStmt.java |   3 +-
 .../java/org/apache/doris/analysis/LoadStmt.java   |  21 +++-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   2 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |   6 +
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   6 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   6 +-
 .../doris/load/routineload/RoutineLoadJob.java     |  13 +++
 .../apache/doris/load/update/UpdatePlanner.java    |   3 +-
 .../org/apache/doris/planner/OlapTableSink.java    |   3 +-
 .../apache/doris/planner/StreamLoadPlanner.java    |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  14 ++-
 .../java/org/apache/doris/task/LoadTaskInfo.java   |   1 +
 .../java/org/apache/doris/task/StreamLoadTask.java |   9 ++
 .../doris/load/loadv2/BrokerLoadJobTest.java       |   4 +-
 .../apache/doris/planner/OlapTableSinkTest.java    |   8 +-
 gensrc/proto/internal_service.proto                |   1 +
 gensrc/thrift/DataSinks.thrift                     |   5 +-
 gensrc/thrift/FrontendService.thrift               |   1 +
 42 files changed, 349 insertions(+), 127 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 96a8128..eb42572 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -621,6 +621,17 @@ CONF_mDouble(tablet_version_graph_orphan_vertex_ratio, 
"0.1");
 // else we will call sync method
 CONF_mBool(runtime_filter_use_async_rpc, "true");
 
+// max send batch parallelism for OlapTableSink
+// The value set by the user for send_batch_parallelism is not allowed to 
exceed max_send_batch_parallelism_per_job,
+// if exceed, the value of send_batch_parallelism would be 
max_send_batch_parallelism_per_job
+CONF_mInt32(max_send_batch_parallelism_per_job, "5");
+CONF_Validator(max_send_batch_parallelism_per_job, [](const int config) -> 
bool { return config >= 1; });
+
+// number of send batch thread pool size
+CONF_Int32(send_batch_thread_pool_thread_num, "64");
+// number of send batch thread pool queue size
+CONF_Int32(send_batch_thread_pool_queue_size, "102400");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index a9488ec..f780971 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -32,6 +32,7 @@
 #include "util/debug/sanitizer_scopes.h"
 #include "util/monotime.h"
 #include "util/time.h"
+#include "util/threadpool.h"
 #include "util/uid_util.h"
 
 namespace doris {
@@ -194,11 +195,10 @@ Status NodeChannel::open_wait() {
 
         if (result.has_execution_time_us()) {
             _add_batch_counter.add_batch_execution_time_us += 
result.execution_time_us();
-            _add_batch_counter.add_batch_wait_lock_time_us += 
result.wait_lock_time_us();
+            _add_batch_counter.add_batch_wait_execution_time_us += 
result.wait_execution_time_us();
             _add_batch_counter.add_batch_num++;
         }
     });
-
     return status;
 }
 
@@ -342,68 +342,75 @@ void NodeChannel::cancel() {
     request.release_id();
 }
 
-int NodeChannel::try_send_and_fetch_status() {
+int NodeChannel::try_send_and_fetch_status(std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) {
     auto st = none_of({_cancelled, _send_finished});
     if (!st.ok()) {
         return 0;
     }
-
-    if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 
0) {
-        SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
-        AddBatchReq send_batch;
-        {
-            debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
-            std::lock_guard<std::mutex> l(_pending_batches_lock);
-            DCHECK(!_pending_batches.empty());
-            send_batch = std::move(_pending_batches.front());
-            _pending_batches.pop();
-            _pending_batches_num--;
+    bool is_finished = true;
+    if (!_add_batch_closure->is_packet_in_flight() && _pending_batches_num > 0 
&&
+            
_last_patch_processed_finished.compare_exchange_strong(is_finished, false)) {
+        auto s = 
thread_pool_token->submit_func(std::bind(&NodeChannel::try_send_batch, this));
+        if (!s.ok()) {
+            _cancel_with_msg("submit send_batch task to send_batch_thread_pool 
failed");
         }
+    }
+    return _send_finished ? 0 : 1;
+}
 
-        auto row_batch = std::move(send_batch.first);
-        auto request = std::move(send_batch.second); // doesn't need to be 
saved in heap
-
-        // tablet_ids has already set when add row
-        request.set_packet_seq(_next_packet_seq);
-        if (row_batch->num_rows() > 0) {
-            SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
-            row_batch->serialize(request.mutable_row_batch());
-        }
+void NodeChannel::try_send_batch() {
+    SCOPED_ATOMIC_TIMER(&_actual_consume_ns);
+    AddBatchReq send_batch;
+    {
+        debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+        std::lock_guard<std::mutex> l(_pending_batches_lock);
+        DCHECK(!_pending_batches.empty());
+        send_batch = std::move(_pending_batches.front());
+        _pending_batches.pop();
+        _pending_batches_num--;
+    }
 
-        _add_batch_closure->reset();
-        int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
-        if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) {
-            if (remain_ms <= 0 && !request.eos()) {
-                cancel();
-                return 0;
-            } else {
-                remain_ms = _min_rpc_timeout_ms;
-            }
-        }
-        _add_batch_closure->cntl.set_timeout_ms(remain_ms);
-        if (config::tablet_writer_ignore_eovercrowded) {
-            _add_batch_closure->cntl.ignore_eovercrowded();
-        }
+    auto row_batch = std::move(send_batch.first);
+    auto request = std::move(send_batch.second); // doesn't need to be saved 
in heap
 
-        if (request.eos()) {
-            for (auto pid : _parent->_partition_ids) {
-                request.add_partition_ids(pid);
-            }
+    // tablet_ids has already set when add row
+    request.set_packet_seq(_next_packet_seq);
+    if (row_batch->num_rows() > 0) {
+        SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
+        row_batch->serialize(request.mutable_row_batch());
+    }
 
-            // eos request must be the last request
-            _add_batch_closure->end_mark();
-            _send_finished = true;
-            DCHECK(_pending_batches_num == 0);
+    _add_batch_closure->reset();
+    int remain_ms = _rpc_timeout_ms - _timeout_watch.elapsed_time() / 
NANOS_PER_MILLIS;
+    if (UNLIKELY(remain_ms < _min_rpc_timeout_ms)) {
+        if (remain_ms <= 0 && !request.eos()) {
+            cancel();
+        } else {
+            remain_ms = _min_rpc_timeout_ms;
         }
+    }
+    _add_batch_closure->cntl.set_timeout_ms(remain_ms);
+    if (config::tablet_writer_ignore_eovercrowded) {
+        _add_batch_closure->cntl.ignore_eovercrowded();
+    }
 
-        _add_batch_closure->set_in_flight();
-        _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
-                                       &_add_batch_closure->result, 
_add_batch_closure);
+    if (request.eos()) {
+        for (auto pid : _parent->_partition_ids) {
+            request.add_partition_ids(pid);
+        }
 
-        _next_packet_seq++;
+        // eos request must be the last request
+        _add_batch_closure->end_mark();
+        _send_finished = true;
+        DCHECK(_pending_batches_num == 0);
     }
 
-    return _send_finished ? 0 : 1;
+    _add_batch_closure->set_in_flight();
+    _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
+                                   &_add_batch_closure->result, 
_add_batch_closure);
+
+    _next_packet_seq++;
+    _last_patch_processed_finished = true;
 }
 
 Status NodeChannel::none_of(std::initializer_list<bool> vars) {
@@ -528,6 +535,9 @@ Status OlapTableSink::init(const TDataSink& t_sink) {
     } else {
         _load_channel_timeout_s = 
config::streaming_load_rpc_max_alive_time_sec;
     }
+    if (table_sink.__isset.send_batch_parallelism && 
table_sink.send_batch_parallelism > 1) {
+        _send_batch_parallelism = table_sink.send_batch_parallelism;
+    }
 
     return Status::OK();
 }
@@ -675,7 +685,9 @@ Status OlapTableSink::open(RuntimeState* state) {
             return Status::InternalError(ss.str());
         }
     }
-
+    int32_t send_batch_parallelism = MIN(_send_batch_parallelism, 
config::max_send_batch_parallelism_per_job);
+    _send_batch_thread_pool_token = 
state->exec_env()->send_batch_thread_pool()->new_token(
+            ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
     RETURN_IF_ERROR(Thread::create(
             "OlapTableSink", "send_batch_process", [this]() { 
this->_send_batch_process(); },
             &_sender_thread));
@@ -815,10 +827,11 @@ Status OlapTableSink::close(RuntimeState* state, Status 
close_status) {
         // print log of add batch time of all node, for tracing load 
performance easily
         std::stringstream ss;
         ss << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
-           << ", txn_id=" << _txn_id << ", node add batch time(ms)/num: ";
+           << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait 
execution time(ms)/num: ";
         for (auto const& pair : node_add_batch_counter_map) {
             ss << "{" << pair.first << ":(" << 
(pair.second.add_batch_execution_time_us / 1000)
-               << ")(" << pair.second.add_batch_num << ")} ";
+               << ")(" << (pair.second.add_batch_wait_execution_time_us / 
1000) << ")("
+               << pair.second.add_batch_num << ")} ";
         }
         LOG(INFO) << ss.str();
     } else {
@@ -1006,8 +1019,8 @@ void OlapTableSink::_send_batch_process() {
     do {
         int running_channels_num = 0;
         for (auto index_channel : _channels) {
-            
index_channel->for_each_node_channel([&running_channels_num](NodeChannel* ch) {
-                running_channels_num += ch->try_send_and_fetch_status();
+            index_channel->for_each_node_channel([&running_channels_num, 
this](NodeChannel* ch) {
+                running_channels_num += 
ch->try_send_and_fetch_status(this->_send_batch_thread_pool_token);
             });
         }
 
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 7f0dcb5..277ecef 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -44,6 +44,8 @@ class Bitmap;
 class MemTracker;
 class RuntimeProfile;
 class RowDescriptor;
+class ThreadPool;
+class ThreadPoolToken;
 class Tuple;
 class TupleDescriptor;
 class ExprContext;
@@ -58,12 +60,12 @@ struct AddBatchCounter {
     // total execution time of a add_batch rpc
     int64_t add_batch_execution_time_us = 0;
     // lock waiting time in a add_batch rpc
-    int64_t add_batch_wait_lock_time_us = 0;
+    int64_t add_batch_wait_execution_time_us = 0;
     // number of add_batch call
     int64_t add_batch_num = 0;
     AddBatchCounter& operator+=(const AddBatchCounter& rhs) {
         add_batch_execution_time_us += rhs.add_batch_execution_time_us;
-        add_batch_wait_lock_time_us += rhs.add_batch_wait_lock_time_us;
+        add_batch_wait_execution_time_us += 
rhs.add_batch_wait_execution_time_us;
         add_batch_num += rhs.add_batch_num;
         return *this;
     }
@@ -169,7 +171,9 @@ public:
     // 1: running, haven't reach eos.
     // only allow 1 rpc in flight
     // plz make sure, this func should be called after open_wait().
-    int try_send_and_fetch_status();
+    int try_send_and_fetch_status(std::unique_ptr<ThreadPoolToken>& 
thread_pool_token);
+
+    void try_send_batch();
 
     void time_report(std::unordered_map<int64_t, AddBatchCounter>* 
add_batch_counter_map,
                      int64_t* serialize_batch_ns, int64_t* 
mem_exceeded_block_ns,
@@ -227,6 +231,8 @@ private:
     // add batches finished means the last rpc has be response, used to check 
whether this channel can be closed
     std::atomic<bool> _add_batches_finished{false};
 
+    std::atomic<bool> _last_patch_processed_finished{true};
+
     bool _eos_is_produced{false}; // only for restricting producer behaviors
 
     std::unique_ptr<RowDescriptor> _row_desc;
@@ -374,6 +380,7 @@ private:
 
     CountDownLatch _stop_background_threads_latch;
     scoped_refptr<Thread> _sender_thread;
+    std::unique_ptr<ThreadPoolToken> _send_batch_thread_pool_token;
 
     std::vector<DecimalV2Value> _max_decimalv2_val;
     std::vector<DecimalV2Value> _min_decimalv2_val;
@@ -410,7 +417,8 @@ private:
     // the timeout of load channels opened by this tablet sink. in second
     int64_t _load_channel_timeout_s = 0;
 
-    // True if this sink has been closed once
+    int32_t _send_batch_parallelism = 1;
+    // True if this sink has been closed once bool
     bool _is_closed = false;
     // Save the status of close() method
     Status _close_status;
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index ac409b1..2177d12 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -237,8 +237,10 @@ int StreamLoadAction::on_header(HttpRequest* req) {
         HttpChannel::send_reply(req, str);
         streaming_load_current_processing->increment(-1);
 #ifndef BE_TEST
-        str = ctx->prepare_stream_load_record(str);
-        _sava_stream_load_record(ctx, str);
+        if (config::enable_stream_load_record) {
+            str = ctx->prepare_stream_load_record(str);
+            _sava_stream_load_record(ctx, str);
+        }
 #endif
         return -1;
     }
@@ -485,6 +487,14 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
                 http_req->header(HTTP_FUNCTION_COLUMN + "." + 
HTTP_SEQUENCE_COL));
     }
 
+    if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) {
+        try {
+            
request.__set_send_batch_parallelism(std::stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM)));
+        } catch (const std::invalid_argument& e) {
+            return Status::InvalidArgument("Invalid send_batch_parallelism 
format");
+        }
+    }
+
     if (ctx->timeout_second != -1) {
         request.__set_timeout(ctx->timeout_second);
     }
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 50cfbae..4ade62f 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -50,6 +50,8 @@ static const std::string HTTP_FUNCTION_COLUMN = 
"function_column";
 static const std::string HTTP_SEQUENCE_COL = "sequence_col";
 static const std::string HTTP_COMPRESS_TYPE = "compress_type";
 
+static const std::string HTTP_SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
+
 static const std::string HTTP_100_CONTINUE = "100-continue";
 
 } // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index fb78a55..d257ab6 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -21,11 +21,13 @@
 
 #include "olap/memtable.h"
 #include "util/scoped_cleanup.h"
+#include "util/time.h"
 
 namespace doris {
 
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
-    os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
+    os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
+       << ", flush wait time(ms)=" << stat.flush_wait_time_ns / 
NANOS_PER_MILLIS
        << ", flush count=" << stat.flush_count
        << ", flush bytes: " << stat.flush_size_bytes
        << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
@@ -39,7 +41,8 @@ std::ostream& operator<<(std::ostream& os, const 
FlushStatistic& stat) {
 // its reference count is not 0.
 OLAPStatus FlushToken::submit(const std::shared_ptr<MemTable>& memtable) {
     RETURN_NOT_OK(_flush_status.load());
-    _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, 
memtable));
+    int64_t submit_task_time = MonotonicNanos();
+    _flush_token->submit_func(std::bind(&FlushToken::_flush_memtable, this, 
memtable, submit_task_time));
     return OLAP_SUCCESS;
 }
 
@@ -52,9 +55,9 @@ OLAPStatus FlushToken::wait() {
     return _flush_status.load();
 }
 
-void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) {
+void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable, int64_t 
submit_task_time) {
+    _stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time);
     SCOPED_CLEANUP({ memtable.reset(); });
-
     // If previous flush has failed, return directly
     if (_flush_status.load() != OLAP_SUCCESS) {
         return;
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 4b6795b..8b81bde 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -35,10 +35,11 @@ class MemTable;
 // the statistic of a certain flush handler.
 // use atomic because it may be updated by multi threads
 struct FlushStatistic {
-    int64_t flush_time_ns = 0;
-    int64_t flush_count = 0;
-    int64_t flush_size_bytes = 0;
-    int64_t flush_disk_size_bytes = 0;
+    std::atomic_uint64_t flush_time_ns = 0;
+    std::atomic_uint64_t flush_count = 0;
+    std::atomic_uint64_t flush_size_bytes = 0;
+    std::atomic_uint64_t flush_disk_size_bytes = 0;
+    std::atomic_uint64_t flush_wait_time_ns = 0;
 };
 
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
@@ -68,7 +69,7 @@ public:
     const FlushStatistic& get_stats() const { return _stats; }
 
 private:
-    void _flush_memtable(std::shared_ptr<MemTable> mem_table);
+    void _flush_memtable(std::shared_ptr<MemTable> mem_table, int64_t 
submit_task_time);
 
     std::unique_ptr<ThreadPoolToken> _flush_token;
 
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f92f3b6..766ca5c 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -117,6 +117,7 @@ public:
     PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
     ThreadPool* limited_scan_thread_pool() { return 
_limited_scan_thread_pool.get(); }
     PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
+    ThreadPool* send_batch_thread_pool() { return 
_send_batch_thread_pool.get(); }
     CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
     FragmentMgr* fragment_mgr() { return _fragment_mgr; }
     ResultCache* result_cache() { return _result_cache; }
@@ -189,6 +190,7 @@ private:
     PriorityThreadPool* _scan_thread_pool = nullptr;
     std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
 
+    std::unique_ptr<ThreadPool> _send_batch_thread_pool;
     PriorityThreadPool* _etl_thread_pool = nullptr;
     CgroupsMgr* _cgroups_mgr = nullptr;
     FragmentMgr* _fragment_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d4a9d2d..69d70cd 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -67,6 +67,8 @@ namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(etl_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, 
"",
                                    mem_consumption, Labels({{"type", 
"query"}}));
 
@@ -100,6 +102,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
             .set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
             .build(&_limited_scan_thread_pool);    
 
+    ThreadPoolBuilder("SendBatchThreadPool")
+            .set_min_threads(1)
+            .set_max_threads(config::send_batch_thread_pool_thread_num)
+            .set_max_queue_size(config::send_batch_thread_pool_queue_size)
+            .build(&_send_batch_thread_pool);
+    
     _etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size,
                                               
config::etl_thread_pool_queue_size);
     _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
@@ -246,11 +254,21 @@ void ExecEnv::_register_metrics() {
     REGISTER_HOOK_METRIC(etl_thread_pool_queue_size, [this]() {
         return _etl_thread_pool->get_queue_size();
     });
+
+    REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, [this]() {
+        return _send_batch_thread_pool->num_threads();
+    });
+
+    REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, [this]() {
+        return _send_batch_thread_pool->get_queue_size();
+    });
 }
 
 void ExecEnv::_deregister_metrics() {
     DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
     DEREGISTER_HOOK_METRIC(etl_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
+    DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
 }
 
 void ExecEnv::_destroy() {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index f4bdb49..021324f 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -110,7 +110,9 @@ void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
     // add batch maybe cost a lot of time, and this callback thread will be 
held.
     // this will influence query execution, because the pthreads under bthread 
may be
     // exhausted, so we put this to a local thread pool to process
-    _tablet_worker_pool.offer([request, response, done, this]() {
+    int64_t submit_task_time_ns = MonotonicNanos();
+    _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, 
this]() {
+        int64_t wait_execution_time_ns = MonotonicNanos() - 
submit_task_time_ns;
         brpc::ClosureGuard closure_guard(done);
         int64_t execution_time_ns = 0;
         {
@@ -124,7 +126,8 @@ void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
             }
             st.to_protobuf(response->mutable_status());
         }
-        response->set_execution_time_us(execution_time_ns / 1000);
+        response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
+        response->set_wait_execution_time_us(wait_execution_time_ns / 
NANOS_PER_MICRO);
     });
 }
 
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index f363c73..67d60a3 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -193,6 +193,8 @@ public:
     UIntGauge* scanner_thread_pool_queue_size;
     UIntGauge* etl_thread_pool_queue_size;
     UIntGauge* add_batch_task_queue_size;
+    UIntGauge* send_batch_thread_pool_thread_num;
+    UIntGauge* send_batch_thread_pool_queue_size;
 
     static DorisMetrics* instance() {
         static DorisMetrics instance;
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index c228e15..8d2f26e 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -199,6 +199,11 @@ public:
         return _num_threads + _num_threads_pending_start;
     }
 
+    int get_queue_size() const {
+        MutexLock l(&_lock);
+        return _total_queued_tasks;
+    }
+
 private:
     friend class ThreadPoolBuilder;
     friend class ThreadPoolToken;
diff --git a/be/test/exec/tablet_sink_test.cpp 
b/be/test/exec/tablet_sink_test.cpp
index b4ec5de..1aa853d 100644
--- a/be/test/exec/tablet_sink_test.cpp
+++ b/be/test/exec/tablet_sink_test.cpp
@@ -54,8 +54,13 @@ public:
         _env->_load_stream_mgr = new LoadStreamMgr();
         _env->_brpc_stub_cache = new BrpcStubCache();
         _env->_buffer_reservation = new ReservationTracker();
-
+        ThreadPoolBuilder("SendBatchThreadPool")
+                .set_min_threads(1)
+                .set_max_threads(5)
+                .set_max_queue_size(100)
+                .build(&_env->_send_batch_thread_pool);
         config::tablet_writer_open_rpc_timeout_sec = 60;
+        config::max_send_batch_parallelism_per_job = 1;
     }
 
     void TearDown() override {
diff --git a/docs/en/administrator-guide/config/be_config.md 
b/docs/en/administrator-guide/config/be_config.md
index 40da5db..dd74fb7 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -789,6 +789,12 @@ Default:100
 
 Max number of txns for every txn_partition_map in txn manager, this is a self 
protection to avoid too many txns saving in manager
 
+### `max_send_batch_parallelism_per_job`
+
+* Type: int
+* Description: Max send batch parallelism for OlapTableSink. The value set by 
the user for `send_batch_parallelism` is not allowed to exceed 
`max_send_batch_parallelism_per_job`, if exceed, the value of 
`send_batch_parallelism` would be `max_send_batch_parallelism_per_job`.
+* Default value: 1
+
 ### `max_tablet_num_per_shard`
 
 Default:1024
@@ -1056,6 +1062,17 @@ Default:5
 
 This configuration is used for the context gc thread scheduling cycle. Note: 
The unit is minutes, and the default is 5 minutes
 
+### `send_batch_thread_pool_thread_num`
+
+* Type: int32
+* Description: The number of threads in the SendBatch thread pool. In 
NodeChannels' sending data tasks, the SendBatch operation of each NodeChannel 
will be submitted as a thread task to the thread pool to be scheduled. This 
parameter determines the size of the SendBatch thread pool.
+* Default value: 256
+
+### `send_batch_thread_pool_queue_size`
+
+* Type: int32
+* Description: The queue length of the SendBatch thread pool. In NodeChannels' 
sending data tasks,  the SendBatch operation of each NodeChannel will be 
submitted as a thread task to the thread pool waiting to be scheduled, and 
after the number of submitted tasks exceeds the length of the thread pool 
queue, subsequent submitted tasks will be blocked until there is a empty slot 
in the queue.
+
 ### `sleep_one_second`
 
 + Type: int32
diff --git a/docs/en/administrator-guide/variables.md 
b/docs/en/administrator-guide/variables.md
index 4334e28..d5e1bf6 100644
--- a/docs/en/administrator-guide/variables.md
+++ b/docs/en/administrator-guide/variables.md
@@ -345,6 +345,10 @@ Translated with www.DeepL.com/Translator (free version)
 
     Not used.
     
+* `send_batch_parallelism`
+
+    Used to set the default parallelism for sending batch when execute 
InsertStmt operation, if the value for parallelism exceed 
`max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will 
use the value of `max_send_batch_parallelism_per_job`.
+
 * `sql_mode`
 
     Used to specify SQL mode to accommodate certain SQL dialects. For the SQL 
mode, see [here](./sql-mode.md).
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index 092a88a..a689a1c 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md     
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md     
@@ -253,6 +253,8 @@ under the License.
 
         timezone: Specify time zones for functions affected by time zones, 
such as strftime/alignment_timestamp/from_unixtime, etc. See the documentation 
for details. If not specified, use the "Asia/Shanghai" time zone.
 
+        send_batch_parallelism: Used to set the default parallelism for 
sending batch, if the value for parallelism exceed 
`max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will 
use the value of `max_send_batch_parallelism_per_job`.
+
     5. Load data format sample
 
         Integer(TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
@@ -499,7 +501,7 @@ under the License.
         ) 
         WITH BROKER "hdfs" ("username"="user", "password"="pass");
 
-    13. Load a batch of data from HDFS, specify timeout and filtering ratio. 
Use the broker with the plaintext ugi my_hdfs_broker. Simple authentication. 
delete the data when v2 >100, other append
+    12. Load a batch of data from HDFS, specify timeout and filtering ratio. 
Use the broker with the plaintext ugi my_hdfs_broker. Simple authentication. 
delete the data when v2 >100, other append
 
         LOAD LABEL example_db.label1
         (
@@ -520,7 +522,7 @@ under the License.
         "max_filter_ratio" = "0.1"
         );
 
-    14. Filter the original data first, and perform column mapping, conversion 
and filtering operations
+    13. Filter the original data first, and perform column mapping, conversion 
and filtering operations
 
         LOAD LABEL example_db.label_filter
         (
@@ -534,7 +536,7 @@ under the License.
         ) 
         with BROKER "hdfs" ("username"="user", "password"="pass");
 
-    15. Import the data in the json file, and specify format as json, it is 
judged by the file suffix by default, set parameters for reading data
+    14. Import the data in the json file, and specify format as json, it is 
judged by the file suffix by default, set parameters for reading data
 
         LOAD LABEL example_db.label9
         (
@@ -546,7 +548,7 @@ under the License.
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); 
  
 
-    16. LOAD WITH HDFS, normal HDFS cluster
+    15. LOAD WITH HDFS, normal HDFS cluster
         LOAD LABEL example_db.label_filter
         (
             DATA INFILE("hdfs://host:port/user/data/*/test.txt")
@@ -558,7 +560,7 @@ under the License.
             "fs.defaultFS"="hdfs://testFs",
             "hdfs_user"="user"
         );
-    17. LOAD WITH HDFS, hdfs ha
+    16. LOAD WITH HDFS, hdfs ha
         LOAD LABEL example_db.label_filter
         (
             DATA INFILE("hdfs://host:port/user/data/*/test.txt")
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md
index ee0e775..edaa2f5 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md    
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md    
@@ -196,6 +196,9 @@ FROM data_source
     9. `json_root`
         json_root is a valid JSONPATH string that specifies the root node of 
the JSON Document. The default value is "".
 
+    10. `send_batch_parallelism`
+        Integer, Used to set the default parallelism for sending batch, if the 
value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, 
then the coordinator BE will use the value of 
`max_send_batch_parallelism_per_job`.
+
 6. data_source
 
     The type of data source. Current support:
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md
index a91da83..9a8e063 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md     
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md     
@@ -142,6 +142,8 @@ The type of data merging supports three types: APPEND, 
DELETE, and MERGE. APPEND
 
 `read_json_by_line`: Boolean type, true means that one json object can be read 
per line, and the default value is false.
 
+`send_batch_parallelism`: Integer type, used to set the default parallelism 
for sending batch, if the value for parallelism exceed 
`max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will 
use the value of `max_send_batch_parallelism_per_job`.
+
 RETURN VALUES
 
 After the load is completed, the related content of this load will be returned 
in Json format. Current field included
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md 
b/docs/zh-CN/administrator-guide/config/be_config.md
index 8e637fa..08adfca 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -792,6 +792,12 @@ cumulative compaction策略:最大增量文件的数量
 
 txn 管理器中每个 txn_partition_map 的最大 txns 数,这是一种自我保护,以避免在管理器中保存过多的 txns
 
+### `max_send_batch_parallelism_per_job`
+
+* 类型:int
+* 描述:OlapTableSink 发送批处理数据的最大并行度,用户为 `send_batch_parallelism` 设置的值不允许超过 
`max_send_batch_parallelism_per_job` ,如果超过, `send_batch_parallelism` 将被设置为 
`max_send_batch_parallelism_per_job` 的值。
+* 默认值:1
+
 ### `max_tablet_num_per_shard`
 
 默认:1024
@@ -1059,6 +1065,18 @@ routine load任务的线程池大小。 这应该大于 FE 配置 'max_concurren
 
 此配置用于上下文gc线程调度周期 , 注意:单位为分钟,默认为 5 分钟
 
+### `send_batch_thread_pool_thread_num`
+
+* 类型:int32
+* 
描述:SendBatch线程池线程数目。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,该参数决定了SendBatch线程池的大小。
+* 默认值:256
+
+### `send_batch_thread_pool_queue_size`
+
+* 类型:int32
+* 
描述:SendBatch线程池的队列长度。在NodeChannel的发送数据任务之中,每一个NodeChannel的SendBatch操作会作为一个线程task提交到线程池之中等待被调度,而提交的任务数目超过线程池队列的长度之后,后续提交的任务将阻塞直到队列之中有新的空缺。
+* 默认值:102400
+
 ### `serialize_batch`
 
 默认值:false
diff --git a/docs/zh-CN/administrator-guide/variables.md 
b/docs/zh-CN/administrator-guide/variables.md
index 423e86f..a5457ce 100644
--- a/docs/zh-CN/administrator-guide/variables.md
+++ b/docs/zh-CN/administrator-guide/variables.md
@@ -339,7 +339,10 @@ SELECT /*+ SET_VAR(query_timeout = 1, 
enable_partition_cache=true) */ sleep(3);
 * `resource_group`
 
     暂不使用。
-    
+* `send_batch_parallelism`                                                     
                                                                                
              
+
+   用于设置执行 InsertStmt 操作时发送批处理数据的默认并行度,如果并行度的值超过 BE 配置中的 
`max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 
`max_send_batch_parallelism_per_job` 的值。 
+                                                                               
                  
 * `sql_mode`
 
     用于指定 SQL 模式,以适应某些 SQL 方言。关于 SQL 模式,可参阅 [这里](./sql-mode.md)。
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER 
LOAD.md
index b9edbfb..73e2abb 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md  
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md  
@@ -138,30 +138,30 @@ under the License.
             用于指定一些特殊参数。
             语法:
             [PROPERTIES ("key"="value", ...)]
-        
+
             可以指定如下参数:
-                
-              line_delimiter: 用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。
 
-              fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 
导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json格式。
-            
-              jsonpaths: 导入json方式分为:简单模式和匹配模式。
-              简单模式:没有设置jsonpaths参数即为简单模式,这种模式下要求json数据是对象类型,例如:
-              {"k1":1, "k2":2, "k3":"hello"},其中k1,k2,k3是列名字。
-              匹配模式:用于json数据相对复杂,需要通过jsonpaths参数匹配对应的value。
-            
-              strip_outer_array: 
布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。例如:
-                [
-                  {"k1" : 1, "v1" : 2},
-                  {"k1" : 3, "v1" : 4}
-                ]
-              当strip_outer_array为true,最后导入到doris中会生成两行数据。
-           
-              json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。
-           
-              num_as_string: 
布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。
-
-    3. broker_name
+            line_delimiter: 用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。
+
+            fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 
导入效率,但是要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json格式。
+
+            jsonpaths: 导入json方式分为:简单模式和匹配模式。
+            简单模式:没有设置jsonpaths参数即为简单模式,这种模式下要求json数据是对象类型,例如:
+            {"k1":1, "k2":2, "k3":"hello"},其中k1,k2,k3是列名字。
+            匹配模式:用于json数据相对复杂,需要通过jsonpaths参数匹配对应的value。
+
+            strip_outer_array: 
布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。例如:
+            [
+             {"k1" : 1, "v1" : 2},
+             {"k1" : 3, "v1" : 4}
+             ]
+             当strip_outer_array为true,最后导入到doris中会生成两行数据。
+
+             json_root: json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。
+
+             num_as_string: 
布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。
+
+      3. broker_name
 
         所使用的 broker 名称,可以通过 show broker 命令查看。
 
@@ -246,6 +246,7 @@ under the License.
         exec_mem_limit:  导入内存限制。默认为 2GB。单位为字节。
         strict mode:     是否对数据进行严格限制。默认为 false。
         timezone:         指定某些受时区影响的函数的时区,如 
strftime/alignment_timestamp/from_unixtime 等等,具体请查阅 [时区] 文档。如果不指定,则使用 
"Asia/Shanghai" 时区。
+        send_batch_parallelism: 用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 
`max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 
`max_send_batch_parallelism_per_job` 的值。
 
     5. 导入数据格式样例
 
@@ -537,7 +538,7 @@ under the License.
         ) 
         with BROKER "hdfs" ("username"="user", "password"="pass");
 
-    14. 先过滤原始数据,在进行列的映射、转换和过滤操作
+    15. 先过滤原始数据,在进行列的映射、转换和过滤操作
 
         LOAD LABEL example_db.label_filter
         (
@@ -550,8 +551,8 @@ under the License.
          WHERE k1 > 3
         ) 
         with BROKER "hdfs" ("username"="user", "password"="pass");
-         
-     15. 导入json文件中数据  指定FORMAT为json, 默认是通过文件后缀判断,设置读取数据的参数
+
+     16. 导入json文件中数据  指定FORMAT为json, 默认是通过文件后缀判断,设置读取数据的参数
 
         LOAD LABEL example_db.label9
         (
@@ -563,7 +564,7 @@ under the License.
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
 
-    16. LOAD WITH HDFS, 普通HDFS集群
+     17. LOAD WITH HDFS, 普通HDFS集群
         LOAD LABEL example_db.label_filter
         (
             DATA INFILE("hdfs://host:port/user/data/*/test.txt")
@@ -575,7 +576,7 @@ under the License.
             "fs.defaultFS"="hdfs://testFs",
             "hdfs_user"="user"
         );
-    17. LOAD WITH HDFS, 带ha的HDFS集群
+     18. LOAD WITH HDFS, 带ha的HDFS集群
         LOAD LABEL example_db.label_filter
         (
             DATA INFILE("hdfs://host:port/user/data/*/test.txt")
@@ -595,4 +596,4 @@ under the License.
 
 ## keyword
 
-    BROKER,LOAD
+    BROKER,LOAD
\ No newline at end of file
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE 
LOAD.md
index 4200ad4..798ff69 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md 
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/ROUTINE LOAD.md 
@@ -179,7 +179,11 @@ under the License.
 
         9. json_root
 
-        json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。
+            json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为""。
+
+        10. send_batch_parallelism
+            
+            整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 
`max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 
`max_send_batch_parallelism_per_job` 的值。 
 
     6. data_source
 
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md
index 0587e6f..ef0c4c4 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md  
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md  
@@ -106,6 +106,8 @@ under the License.
         num_as_string: 布尔类型,为true表示在解析json数据时会将数字类型转为字符串,然后在确保不会出现精度丢失的情况下进行导入。
 
         read_json_by_line: 布尔类型,为true表示支持每行读取一个json对象,默认值为false。
+        
+        send_batch_parallelism: 整型,用于设置发送批处理数据的并行度,如果并行度的值超过 BE 配置中的 
`max_send_batch_parallelism_per_job`,那么作为协调点的 BE 将使用 
`max_send_batch_parallelism_per_job` 的值。 
 
     RETURN VALUES
         导入完成后,会以Json格式返回这次导入的相关内容。当前包括以下字段
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index ffd782a..335ff17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -116,6 +116,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     
     private static final String NAME_TYPE = "ROUTINE LOAD NAME";
     public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
+    public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
 
     private static final ImmutableSet<String> PROPERTIES_SET = new 
ImmutableSet.Builder<String>()
             .add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
@@ -132,6 +133,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
             .add(EXEC_MEM_LIMIT_PROPERTY)
+            .add(SEND_BATCH_PARALLELISM)
             .build();
 
     private final LabelName labelName;
@@ -154,6 +156,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     private boolean strictMode = true;
     private long execMemLimit = 2 * 1024 * 1024 * 1024L;
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
+    private int sendBatchParallelism = 1;
     /**
      * RoutineLoad support json data.
      * Require Params:
@@ -175,6 +178,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> v >= 
200000;
     public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> v >= 100 
* 1024 * 1024 && v <= 1024 * 1024 * 1024;
     public static final Predicate<Long> EXEC_MEM_LIMIT_PRED = (v) -> v >= 0L;
+    public static final Predicate<Long> SEND_BATCH_PARALLELISM_PRED = (v) -> v 
> 0L;
 
     public CreateRoutineLoadStmt(LabelName labelName, String tableName, 
List<ParseNode> loadPropertyList,
                                  Map<String, String> jobProperties, String 
typeName,
@@ -232,6 +236,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         return execMemLimit;
     }
 
+    public int getSendBatchParallelism() {
+        return sendBatchParallelism;
+    }
+
     public boolean isStrictMode() {
         return strictMode;
     }
@@ -434,6 +442,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                                                       LoadStmt.STRICT_MODE + " 
should be a boolean");
         execMemLimit = 
Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY),
                 RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, 
EXEC_MEM_LIMIT_PROPERTY + "should > 0");
+
+        sendBatchParallelism = ((Long) 
Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM),
+                
ConnectContext.get().getSessionVariable().getSendBatchParallelism(), 
SEND_BATCH_PARALLELISM_PRED,
+                SEND_BATCH_PARALLELISM + " should > 0")).intValue();
+
         if (ConnectContext.get() != null) {
             timezone = ConnectContext.get().getSessionVariable().getTimeZone();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 7e31600..64e3525 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -323,7 +323,8 @@ public class InsertStmt extends DdlStmt {
         if (!isExplain() && targetTable instanceof OlapTable) {
             OlapTableSink sink = (OlapTableSink) dataSink;
             TUniqueId loadId = analyzer.getContext().queryId();
-            sink.init(loadId, transactionId, db.getId(), timeoutSecond);
+            int sendBatchParallelism = 
analyzer.getContext().getSessionVariable().getSendBatchParallelism();
+            sink.init(loadId, transactionId, db.getId(), timeoutSecond, 
sendBatchParallelism);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index efcfe5b..368d6c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -81,6 +81,7 @@ public class LoadStmt extends DdlStmt {
     public static final String STRICT_MODE = "strict_mode";
     public static final String TIMEZONE = "timezone";
     public static final String LOAD_PARALLELISM = "load_parallelism";
+    public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
 
     // for load data from Baidu Object Store(BOS)
     public static final String BOS_ENDPOINT = "bos_endpoint";
@@ -114,7 +115,6 @@ public class LoadStmt extends DdlStmt {
     public static final String KEY_IN_PARAM_FUNCTION_COLUMN = 
"function_column";
     public static final String KEY_IN_PARAM_SEQUENCE_COL = "sequence_col";
     public static final String KEY_IN_PARAM_BACKEND_ID = "backend_id";
-
     private final LabelName label;
     private final List<DataDescription> dataDescriptions;
     private final BrokerDesc brokerDesc;
@@ -162,6 +162,12 @@ public class LoadStmt extends DdlStmt {
                     return Integer.valueOf(s);
                 }
             })
+            .put(SEND_BATCH_PARALLELISM, new Function<String, Integer>() {
+                @Override
+                public @Nullable Integer apply(@Nullable String s) {
+                    return Integer.valueOf(s);
+                }
+            })
             .put(CLUSTER_PROPERTY, new Function<String, String>() {
                 @Override
                 public @Nullable String apply(@Nullable String s) {
@@ -289,6 +295,19 @@ public class LoadStmt extends DdlStmt {
             properties.put(TIMEZONE, 
TimeUtils.checkTimeZoneValidAndStandardize(
                     properties.getOrDefault(LoadStmt.TIMEZONE, 
TimeUtils.DEFAULT_TIME_ZONE)));
         }
+
+        // send batch parallelism
+        final String sendBatchParallelism = 
properties.get(SEND_BATCH_PARALLELISM);
+        if (sendBatchParallelism != null) {
+            try {
+                final int sendBatchParallelismValue = 
Integer.valueOf(sendBatchParallelism);
+                if (sendBatchParallelismValue < 1) {
+                    throw new DdlException(SEND_BATCH_PARALLELISM + " must be 
greater than 0");
+                }
+            } catch (NumberFormatException e) {
+                throw new DdlException(SEND_BATCH_PARALLELISM + " is not a 
number.");
+            }
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index e001b87..93f60ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -199,7 +199,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                 LoadLoadingTask task = new LoadLoadingTask(db, table, 
brokerDesc,
                         brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
                         isStrictMode(), transactionId, this, getTimeZone(), 
getTimeout(),
-                        getLoadParallelism(), isReportSuccess ? jobProfile : 
null);
+                        getLoadParallelism(), getSendBatchParallelism(), 
isReportSuccess ? jobProfile : null);
                 UUID uuid = UUID.randomUUID();
                 TUniqueId loadId = new 
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
                 task.init(loadId, attachment.getFileStatusByTable(aggKey),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index e83094e..e33e98b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -337,6 +337,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         if (ConnectContext.get() != null) {
             jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 
ConnectContext.get().getSessionVariable().getMaxExecMemByte());
             jobProperties.put(LoadStmt.TIMEZONE, 
ConnectContext.get().getSessionVariable().getTimeZone());
+            jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 
ConnectContext.get().getSessionVariable().getSendBatchParallelism());
         }
 
         if (properties == null || properties.isEmpty()) {
@@ -383,6 +384,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         jobProperties.put(LoadStmt.STRICT_MODE, false);
         jobProperties.put(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE);
         jobProperties.put(LoadStmt.LOAD_PARALLELISM, 
Config.default_load_parallelism);
+        jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 1);
     }
 
     public void isJobTypeRead(boolean jobTypeRead) {
@@ -1193,6 +1195,10 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         return (int) jobProperties.get(LoadStmt.LOAD_PARALLELISM);
     }
 
+    public int getSendBatchParallelism() {
+        return (int) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM);
+    }
+
     // Return true if this job is finished for a long time
     public boolean isExpired(long currentTimeMs) {
         if (!isCompleted()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 7510e61..243e95a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -64,6 +64,7 @@ public class LoadLoadingTask extends LoadTask {
     // timeout of load job, in seconds
     private final long timeoutS;
     private final int loadParallelism;
+    private final int sendBatchParallelism;
 
     private LoadingTaskPlanner planner;
 
@@ -74,7 +75,7 @@ public class LoadLoadingTask extends LoadTask {
                            BrokerDesc brokerDesc, List<BrokerFileGroup> 
fileGroups,
                            long jobDeadlineMs, long execMemLimit, boolean 
strictMode,
                            long txnId, LoadTaskCallback callback, String 
timezone,
-                           long timeoutS, int loadParallelism, RuntimeProfile 
profile) {
+                           long timeoutS, int loadParallelism, int 
sendBatchParallelism, RuntimeProfile profile) {
         super(callback, TaskType.LOADING);
         this.db = db;
         this.table = table;
@@ -89,13 +90,14 @@ public class LoadLoadingTask extends LoadTask {
         this.timezone = timezone;
         this.timeoutS = timeoutS;
         this.loadParallelism = loadParallelism;
+        this.sendBatchParallelism = sendBatchParallelism;
         this.jobProfile = profile;
     }
 
     public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> 
fileStatusList, int fileNum, UserIdentity userInfo) throws UserException {
         this.loadId = loadId;
         planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, 
db.getId(), table,
-                brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, 
this.loadParallelism, userInfo);
+                brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, 
this.loadParallelism, this.sendBatchParallelism, userInfo);
         planner.plan(loadId, fileStatusList, fileNum);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 4d2dbe4..b490dca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -67,6 +67,7 @@ public class LoadingTaskPlanner {
     private final boolean strictMode;
     private final long timeoutS;    // timeout of load job, in second
     private final int loadParallelism;
+    private final int sendBatchParallelism;
     private UserIdentity userInfo;
     // Something useful
     // ConnectContext here is just a dummy object to avoid some NPE problem, 
like ctx.getDatabase()
@@ -82,7 +83,7 @@ public class LoadingTaskPlanner {
     public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable 
table,
                               BrokerDesc brokerDesc, List<BrokerFileGroup> 
brokerFileGroups,
                               boolean strictMode, String timezone, long 
timeoutS, int loadParallelism,
-                              UserIdentity userInfo) {
+                              int sendBatchParallelism, UserIdentity userInfo) 
{
         this.loadJobId = loadJobId;
         this.txnId = txnId;
         this.dbId = dbId;
@@ -93,6 +94,7 @@ public class LoadingTaskPlanner {
         this.analyzer.setTimezone(timezone);
         this.timeoutS = timeoutS;
         this.loadParallelism = loadParallelism;
+        this.sendBatchParallelism = sendBatchParallelism;
         this.userInfo = userInfo;
         if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(userInfo,
                 Catalog.getCurrentCatalog().getDb(dbId).getFullName(), 
PrivPredicate.SELECT)) {
@@ -131,7 +133,7 @@ public class LoadingTaskPlanner {
         // 2. Olap table sink
         List<Long> partitionIds = getAllPartitionIds();
         OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, 
partitionIds);
-        olapTableSink.init(loadId, txnId, dbId, timeoutS);
+        olapTableSink.init(loadId, txnId, dbId, timeoutS, 
sendBatchParallelism);
         olapTableSink.complete();
 
         // 3. Plan fragment
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index a758d65..edca511 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -109,6 +109,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     public static final long DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 
100MB
     public static final long DEFAULT_EXEC_MEM_LIMIT = 2 * 1024 * 1024 * 1024L;
     public static final boolean DEFAULT_STRICT_MODE = false; // default is 
false
+    public static final int DEFAULT_SEND_BATCH_PARALLELISM = 1;
 
     protected static final String STAR_STRING = "*";
      /*
@@ -168,6 +169,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     // if current error rate is more than max error rate, the job will be 
paused
     protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional
     protected long execMemLimit = DEFAULT_EXEC_MEM_LIMIT;
+    protected int sendBatchParallelism = DEFAULT_SEND_BATCH_PARALLELISM;
     // include strict mode
     protected Map<String, String> jobProperties = Maps.newHashMap();
 
@@ -284,8 +286,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         if (stmt.getExecMemLimit() != -1) {
             this.execMemLimit = stmt.getExecMemLimit();
         }
+        if (stmt.getSendBatchParallelism() > 0) {
+            this.sendBatchParallelism = stmt.getSendBatchParallelism();
+        }
         jobProperties.put(LoadStmt.TIMEZONE, stmt.getTimezone());
         jobProperties.put(LoadStmt.STRICT_MODE, 
String.valueOf(stmt.isStrictMode()));
+        jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 
String.valueOf(this.execMemLimit));
+        jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 
String.valueOf(this.sendBatchParallelism));
+
         if (Strings.isNullOrEmpty(stmt.getFormat()) || 
stmt.getFormat().equals("csv")) {
             jobProperties.put(PROPS_FORMAT, "csv");
             jobProperties.put(PROPS_STRIP_OUTER_ARRAY, "false");
@@ -556,6 +564,11 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     }
 
     @Override
+    public int getSendBatchParallelism() {
+        return sendBatchParallelism;
+    }
+
+    @Override
     public boolean isReadJsonByLine() {
         return false;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
index 0ffe906..6fed2cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
@@ -90,7 +90,8 @@ public class UpdatePlanner extends Planner {
         // 2. gen olap table sink
         OlapTableSink olapTableSink = new OlapTableSink(targetTable, 
computeTargetTupleDesc(), null);
         olapTableSink.init(analyzer.getContext().queryId(), txnId, targetDBId,
-                analyzer.getContext().getSessionVariable().queryTimeoutS);
+                analyzer.getContext().getSessionVariable().queryTimeoutS,
+                
analyzer.getContext().getSessionVariable().sendBatchParallelism);
         olapTableSink.complete();
         // 3. gen plan fragment
         PlanFragment planFragment = new 
PlanFragment(fragmentIdGenerator_.getNextId(), olapScanNode,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 3268374..420c399 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -93,12 +93,13 @@ public class OlapTableSink extends DataSink {
         this.partitionIds = partitionIds;
     }
 
-    public void init(TUniqueId loadId, long txnId, long dbId, long 
loadChannelTimeoutS) throws AnalysisException {
+    public void init(TUniqueId loadId, long txnId, long dbId, long 
loadChannelTimeoutS, int sendBatchParallelism) throws AnalysisException {
         TOlapTableSink tSink = new TOlapTableSink();
         tSink.setLoadId(loadId);
         tSink.setTxnId(txnId);
         tSink.setDbId(dbId);
         tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
+        tSink.setSendBatchParallelism(sendBatchParallelism);
         tDataSink = new TDataSink(TDataSinkType.DATA_SPLIT_SINK);
         tDataSink.setType(TDataSinkType.OLAP_TABLE_SINK);
         tDataSink.setOlapTableSink(tSink);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index f232334..fdcfa8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -151,7 +151,7 @@ public class StreamLoadPlanner {
         // create dest sink
         List<Long> partitionIds = getAllPartitionIds();
         OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, 
partitionIds);
-        olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout);
+        olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), 
taskInfo.getTimeout(), taskInfo.getSendBatchParallelism());
         olapTableSink.complete();
 
         // for stream load, we only need one fragment, ScanNode -> DataSink.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 0e15a31..25d2778 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -142,6 +142,11 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String DELETE_WITHOUT_PARTITION = 
"delete_without_partition";
 
+    // set the default parallelism for send batch when execute InsertStmt 
operation,
+    // if the value for parallelism exceed 
`max_send_batch_parallelism_per_job` in BE config,
+    // then the coordinator be will use the value of 
`max_send_batch_parallelism_per_job`
+    public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
+
     public static final long DEFAULT_INSERT_VISIBLE_TIMEOUT_MS = 10_000;
 
     public static final String EXTRACT_WIDE_RANGE_EXPR = 
"extract_wide_range_expr";
@@ -341,6 +346,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = DELETE_WITHOUT_PARTITION, needForward = true)
     public boolean deleteWithoutPartition = false;
 
+    @VariableMgr.VarAttr(name = SEND_BATCH_PARALLELISM, needForward = true)
+    public int sendBatchParallelism = 1;
+
     @VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true)
     public boolean extractWideRangeExpr = true;
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE)
@@ -758,7 +766,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean isDeleteWithoutPartition() {
         return deleteWithoutPartition;
     }
-
+    
     public boolean isExtractWideRangeExpr() {
         return extractWideRangeExpr;
     }
@@ -767,6 +775,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return cpuResourceLimit;
     }
 
+    public int getSendBatchParallelism() {
+        return sendBatchParallelism;
+    }
+
     // Serialize to thrift object
     // used for rest api
     public TQueryOptions toThrift() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 3bf88c1..6deae19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -57,6 +57,7 @@ public interface LoadTaskInfo {
     public Expr getWhereExpr();
     public Separator getColumnSeparator();
     public Separator getLineDelimiter();
+    public int getSendBatchParallelism();
 
     public static class ImportColumnDescs {
         public List<ImportColumnDesc> descs = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index f6723b3..b47b91a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -73,6 +73,7 @@ public class StreamLoadTask implements LoadTaskInfo {
     private LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND; // 
default is all data is load no delete
     private Expr deleteCondition;
     private String sequenceCol;
+    private int sendBatchParallelism = 1;
 
     public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, 
TFileFormatType formatType) {
         this.id = id;
@@ -123,6 +124,11 @@ public class StreamLoadTask implements LoadTaskInfo {
         return lineDelimiter;
     }
 
+    @Override
+    public int getSendBatchParallelism() {
+        return sendBatchParallelism;
+    }
+
     public PartitionNames getPartitions() {
         return partitions;
     }
@@ -288,6 +294,9 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetSequenceCol()) {
             sequenceCol = request.getSequenceCol();
         }
+        if (request.isSetSendBatchParallelism()) {
+            sendBatchParallelism = request.getSendBatchParallelism();
+        }
     }
 
     // used for stream load
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index 894ed2f..5ef4dcb 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -361,8 +361,8 @@ public class BrokerLoadJobTest {
         TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
         RuntimeProfile jobProfile = new RuntimeProfile("test");
         LoadLoadingTask task = new LoadLoadingTask(database, 
olapTable,brokerDesc, fileGroups,
-                100, 100, false, 100, callback, "", 100, 1,
-                jobProfile);
+                100, 100, false, 100, callback, "",
+                100, 1, 1, jobProfile);
         try {
             UserIdentity userInfo = new UserIdentity("root", "localhost");
             userInfo.setIsAnalyzed();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
index 0e4e764..d0e4daf 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/OlapTableSinkTest.java
@@ -101,7 +101,7 @@ public class OlapTableSinkTest {
         }};
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(2L));
-        sink.init(new TUniqueId(1, 2), 3, 4, 1000);
+        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1);
         sink.complete();
         LOG.info("sink is {}", sink.toThrift());
         LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
@@ -131,7 +131,7 @@ public class OlapTableSinkTest {
         }};
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()));
-        sink.init(new TUniqueId(1, 2), 3, 4, 1000);
+        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1);
         try {
             sink.complete();
         } catch (UserException e) {
@@ -153,7 +153,7 @@ public class OlapTableSinkTest {
         }};
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(unknownPartId));
-        sink.init(new TUniqueId(1, 2), 3, 4, 1000);
+        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1);
         sink.complete();
         LOG.info("sink is {}", sink.toThrift());
         LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
@@ -183,7 +183,7 @@ public class OlapTableSinkTest {
         }};
 
         OlapTableSink sink = new OlapTableSink(dstTable, tuple, 
Lists.newArrayList(p1.getId()));
-        sink.init(new TUniqueId(1, 2), 3, 4, 1000);
+        sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1);
         try {
             sink.complete();
         } catch (UserException e) {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index b3b93fb..0b81b49 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -101,6 +101,7 @@ message PTabletWriterAddBatchResult {
     repeated PTabletInfo tablet_vec = 2;
     optional int64 execution_time_us = 3;
     optional int64 wait_lock_time_us = 4;
+    optional int64 wait_execution_time_us = 5;
 };
 
 // tablet writer cancel
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 3aa1d10..b8fe8b2 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -76,7 +76,7 @@ struct TDataStreamSink {
 
 struct TResultSink {
     1: optional TResultSinkType type;
-    2: optional TResultFileSinkOptions file_options;
+    2: optional TResultFileSinkOptions file_options
 }
 
 struct TMysqlTableSink {
@@ -116,7 +116,7 @@ struct TExportSink {
     4: required string line_delimiter
     // properties need to access broker.
     5: optional list<Types.TNetworkAddress> broker_addresses
-    6: optional map<string, string> properties;
+    6: optional map<string, string> properties
 }
 
 struct TOlapTableSink {
@@ -134,6 +134,7 @@ struct TOlapTableSink {
     12: required Descriptors.TOlapTableLocationParam location
     13: required Descriptors.TPaloNodesInfo nodes_info
     14: optional i64 load_channel_timeout_s // the timeout of load channels in 
second
+    15: optional i32 send_batch_parallelism
 }
 
 struct TDataSink {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 78e4ea3..374e282 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -591,6 +591,7 @@ struct TStreamLoadPutRequest {
     32: optional string line_delimiter
     33: optional bool read_json_by_line
     34: optional string auth_code_uuid
+    35: optional i32 send_batch_parallelism
 }
 
 struct TStreamLoadPutResult {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to