This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 42e69041c86 [branch-2.0](routine-load) fix single stream multi table 
load cannot finish #33816 (#33880)
42e69041c86 is described below

commit 42e69041c866c87e11e87c9fdf5160e354aaff4f
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Fri Apr 19 16:44:07 2024 +0800

    [branch-2.0](routine-load) fix single stream multi table load cannot finish 
#33816 (#33880)
---
 be/src/common/config.cpp                         |  7 ++++++
 be/src/common/config.h                           |  7 ++++++
 be/src/io/fs/multi_table_pipe.cpp                | 28 +++++++++++++++++++-----
 be/src/io/fs/multi_table_pipe.h                  |  3 +++
 be/src/io/fs/stream_load_pipe.cpp                |  9 ++++++++
 be/src/io/fs/stream_load_pipe.h                  |  4 ++++
 docs/en/docs/admin-manual/config/be-config.md    |  6 +++++
 docs/zh-CN/docs/admin-manual/config/be-config.md |  7 ++++++
 8 files changed, 65 insertions(+), 6 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2e715961348..8440433b707 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -790,6 +790,13 @@ DEFINE_Int32(routine_load_consumer_pool_size, "10");
 // if the size of batch is more than this threshold, we will request plans for 
all related tables.
 DEFINE_Int32(multi_table_batch_plan_threshold, "200");
 
+// Used in single-stream-multi-table load. When receiving a batch of messages 
from Kafka,
+// if the size of the table wait for plan is more than this threshold, we will 
request plans for all related tables.
+// The param is aimed to avoid requesting and executing too many plans at once.
+// Performing small batch processing on multiple tables during the loaded 
process can reduce the pressure of a single RPC
+// and improve the real-time processing of data.
+DEFINE_Int32(multi_table_max_wait_tables, "5");
+
 // When the timeout of a load task is less than this threshold,
 // Doris treats it as a high priority task.
 // high priority tasks use a separate thread pool for flush and do not block 
rpc by memory cleanup logic.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1947f791cb5..fcb501cb802 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -844,6 +844,13 @@ DECLARE_Int32(routine_load_consumer_pool_size);
 // if the size of batch is more than this threshold, we will request plans for 
all related tables.
 DECLARE_Int32(multi_table_batch_plan_threshold);
 
+// Used in single-stream-multi-table load. When receiving a batch of messages 
from Kafka,
+// if the size of the table wait for plan is more than this threshold, we will 
request plans for all related tables.
+// The param is aimed to avoid requesting and executing too many plans at once.
+// Performing small batch processing on multiple tables during the loaded 
process can reduce the pressure of a single RPC
+// and improve the real-time processing of data.
+DECLARE_Int32(multi_table_max_wait_tables);
+
 // When the timeout of a load task is less than this threshold,
 // Doris treats it as a high priority task.
 // high priority tasks use a separate thread pool for flush and do not block 
rpc by memory cleanup logic.
diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 399676657f1..97f88161bc9 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -114,21 +114,37 @@ Status MultiTablePipe::dispatch(const std::string& table, 
const char* data, size
         } else {
             pipe = iter->second;
         }
-        RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
-                                       "append failed in unplanned kafka 
pipe");
 
+        // It is necessary to determine whether the sum of 
pipe_current_capacity and size is greater than pipe_max_capacity,
+        // otherwise the following situation may occur:
+        // the pipe is full but still cannot trigger the request and exec plan 
condition,
+        // causing one stream multi table load can not finish
         ++_unplanned_row_cnt;
-        size_t threshold = config::multi_table_batch_plan_threshold;
-        if (_unplanned_row_cnt >= threshold) {
-            LOG(INFO) << fmt::format("unplanned row cnt={} reach threshold={}, 
plan them",
-                                     _unplanned_row_cnt, threshold);
+        auto pipe_current_capacity = pipe->current_capacity();
+        auto pipe_max_capacity = pipe->max_capacity();
+        if (_unplanned_row_cnt >= _row_threshold ||
+            _unplanned_pipes.size() >= _wait_tables_threshold ||
+            pipe_current_capacity + size > pipe_max_capacity) {
+            LOG(INFO) << fmt::format(
+                                 "unplanned row cnt={} reach row_threshold={} 
or "
+                                 "wait_plan_table_threshold={}, or the sum of "
+                                 "pipe_current_capacity {} "
+                                 "and size {} is greater than 
pipe_max_capacity {}, "
+                                 "plan them",
+                                 _unplanned_row_cnt, _row_threshold, 
_wait_tables_threshold,
+                                 pipe_current_capacity, size, 
pipe_max_capacity)
+                      << ", ctx: " << _ctx->brief();
             Status st = request_and_exec_plans();
             _unplanned_row_cnt = 0;
             if (!st.ok()) {
                 return st;
             }
         }
+
+        RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
+                                       "append failed in unplanned kafka 
pipe");
     }
+
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index f5217176e96..db71a5a2e75 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -103,6 +103,9 @@ private:
 
     std::mutex _pipe_map_lock;
     std::unordered_map<TUniqueId /*instance id*/, 
std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
+
+    uint32_t _row_threshold = config::multi_table_batch_plan_threshold;
+    uint32_t _wait_tables_threshold = config::multi_table_max_wait_tables;
 };
 } // namespace io
 } // end namespace doris
diff --git a/be/src/io/fs/stream_load_pipe.cpp 
b/be/src/io/fs/stream_load_pipe.cpp
index a1d5fbe90b2..4d6278605ee 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -257,5 +257,14 @@ TUniqueId StreamLoadPipe::calculate_pipe_id(const 
UniqueId& query_id, int32_t fr
     return pipe_id;
 }
 
+size_t StreamLoadPipe::current_capacity() {
+    std::unique_lock<std::mutex> l(_lock);
+    if (_use_proto) {
+        return _proto_buffered_bytes;
+    } else {
+        return _buffered_bytes;
+    }
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 508620d5afd..f7efa62b8b4 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -83,6 +83,10 @@ public:
     // used for pipeline load, which use TUniqueId(lo: query_id.lo + 
fragment_id, hi: query_id.hi) as pipe_id
     static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t 
fragment_id);
 
+    size_t max_capacity() const { return _max_buffered_bytes; }
+
+    size_t current_capacity();
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
diff --git a/docs/en/docs/admin-manual/config/be-config.md 
b/docs/en/docs/admin-manual/config/be-config.md
index a46411ddf5d..f2a81250060 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -774,6 +774,12 @@ BaseCompaction:546859:
 * Description: For single-stream-multi-table load. When receive a batch of 
messages from kafka, if the size of batch is more than this threshold, we will 
request plans for all related tables.
 * Default value: 200
 
+#### `multi_table_max_wait_tables`
+
+* Type: int32
+* Description: Used in single-stream-multi-table load. When receiving a batch 
of messages from Kafka, if the size of the table wait for plan is more than 
this threshold, we will request plans for all related tables.The param is aimed 
to avoid requesting and executing too many plans at once. Performing small 
batch processing on multiple tables during the loaded process can reduce the 
pressure of a single RPC and improve the real-time processing of data.
+* Default value: 5
+
 #### `single_replica_load_download_num_workers`
 
 * Type: int32
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md 
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 7c8d9657dbc..f3a0660b232 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -800,6 +800,13 @@ BaseCompaction:546859:
 * 描述:一流多表使用该配置,表示攒多少条数据再进行规划。过小的值会导致规划频繁,多大的值会增加内存压力和导入延迟。
 * 默认值:200
 
+#### `multi_table_max_wait_tables`
+
+* 类型:int32
+* 描述:一流多表使用该配置,如果等待执行的表的数量大于此阈值,将请求并执行所有相关表的计划。该参数旨在避免一次同时请求和执行过多的计划。
+将导入过程的多表进行小批处理,可以减少单次rpc的压力,同时可以提高导入数据处理的实时性。
+* 默认值:5
+
 #### `single_replica_load_download_num_workers`
 * 类型: int32
 * 描述: 
单副本数据导入功能中,Slave副本通过HTTP从Master副本下载数据文件的工作线程数。导入并发增大时,可以适当调大该参数来保证Slave副本及时同步Master副本数据。必要时也应相应地调大`webserver_num_workers`来提高IO效率。


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to