This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e38d844d40395f102da8dfef76c583ea5e29046e
Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com>
AuthorDate: Fri Apr 19 13:02:45 2024 +0800

    [fix](multi-table-load) fix single stream multi table load cannot finish 
(#33816)
---
 be/src/io/fs/multi_table_pipe.cpp | 22 +++++++++++++++++-----
 be/src/io/fs/stream_load_pipe.cpp |  9 +++++++++
 be/src/io/fs/stream_load_pipe.h   |  4 ++++
 3 files changed, 30 insertions(+), 5 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 816561cfbc8..6a4da0188dd 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -114,17 +114,25 @@ Status MultiTablePipe::dispatch(const std::string& table, 
const char* data, size
         } else {
             pipe = iter->second;
         }
-        RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
-                                       "append failed in unplanned kafka 
pipe");
 
+        // It is necessary to determine whether the sum of 
pipe_current_capacity and size is greater than pipe_max_capacity,
+        // otherwise the following situation may occur:
+        // the pipe is full but still cannot trigger the request and exec plan 
condition,
+        // causing one stream multi table load can not finish
         ++_unplanned_row_cnt;
+        auto pipe_current_capacity = pipe->current_capacity();
+        auto pipe_max_capacity = pipe->max_capacity();
         if (_unplanned_row_cnt >= _row_threshold ||
-            _unplanned_pipes.size() >= _wait_tables_threshold) {
+            _unplanned_pipes.size() >= _wait_tables_threshold ||
+            pipe_current_capacity + size > pipe_max_capacity) {
             LOG(INFO) << fmt::format(
                                  "unplanned row cnt={} reach row_threshold={} 
or "
-                                 "wait_plan_table_threshold={}, "
+                                 "wait_plan_table_threshold={}, or the sum of "
+                                 "pipe_current_capacity {} "
+                                 "and size {} is greater than 
pipe_max_capacity {}, "
                                  "plan them",
-                                 _unplanned_row_cnt, _row_threshold, 
_wait_tables_threshold)
+                                 _unplanned_row_cnt, _row_threshold, 
_wait_tables_threshold,
+                                 pipe_current_capacity, size, 
pipe_max_capacity)
                       << ", ctx: " << _ctx->brief();
             Status st = request_and_exec_plans();
             _unplanned_row_cnt = 0;
@@ -132,7 +140,11 @@ Status MultiTablePipe::dispatch(const std::string& table, 
const char* data, size
                 return st;
             }
         }
+
+        RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
+                                       "append failed in unplanned kafka 
pipe");
     }
+
     return Status::OK();
 }
 
diff --git a/be/src/io/fs/stream_load_pipe.cpp 
b/be/src/io/fs/stream_load_pipe.cpp
index cd5ee5a8a09..ecce306bdf1 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -255,5 +255,14 @@ TUniqueId StreamLoadPipe::calculate_pipe_id(const 
UniqueId& query_id, int32_t fr
     return pipe_id;
 }
 
+size_t StreamLoadPipe::current_capacity() {
+    std::unique_lock<std::mutex> l(_lock);
+    if (_use_proto) {
+        return _proto_buffered_bytes;
+    } else {
+        return _buffered_bytes;
+    }
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 06c9bca4027..978badf9add 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -83,6 +83,10 @@ public:
     // used for pipeline load, which use TUniqueId(lo: query_id.lo + 
fragment_id, hi: query_id.hi) as pipe_id
     static TUniqueId calculate_pipe_id(const UniqueId& query_id, int32_t 
fragment_id);
 
+    size_t max_capacity() const { return _max_buffered_bytes; }
+
+    size_t current_capacity();
+
 protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to