This is an automated email from the ASF dual-hosted git repository.

hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a9d83b83dff [refactor](sink) refactor vtablet writer v2 sequential 
close to parallel close (#52639)
a9d83b83dff is described below

commit a9d83b83dff9f7cc5b7fedc9fb524d7774d2cf45
Author: hui lai <lai...@selectdb.com>
AuthorDate: Sat Jul 5 23:13:55 2025 +0800

    [refactor](sink) refactor vtablet writer v2 sequential close to parallel 
close (#52639)
    
    ### What problem does this PR solve?
    
    1. re-pick: https://github.com/apache/doris/pull/51989
    2. fix can not graceful shutdown.
---
 be/src/vec/sink/load_stream_map_pool.h             |   9 ++
 be/src/vec/sink/load_stream_stub.cpp               |  72 +++++++-------
 be/src/vec/sink/load_stream_stub.h                 |  10 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       | 107 ++++++++++++++++-----
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  12 ++-
 .../test_writer_v2_fault_injection.groovy          |  17 +++-
 6 files changed, 158 insertions(+), 69 deletions(-)

diff --git a/be/src/vec/sink/load_stream_map_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
index bdf98ca8f61..ab8a40d0c91 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -98,6 +98,15 @@ public:
     // only call this method after release() returns true.
     void close_load(bool incremental);
 
+    std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> 
get_streams_for_node() {
+        decltype(_streams_for_node) snapshot;
+        {
+            std::lock_guard<std::mutex> lock(_mutex);
+            snapshot = _streams_for_node;
+        }
+        return snapshot;
+    }
+
 private:
     const UniqueId _load_id;
     const int64_t _src_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 2479fc6b25c..53cc11c97f5 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -114,9 +114,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
         LOG(WARNING) << "stub is not exist when on_closed, " << *this;
         return;
     }
-    std::lock_guard<bthread::Mutex> lock(stub->_close_mutex);
     stub->_is_closed.store(true);
-    stub->_close_cv.notify_all();
 }
 
 inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler& handler) {
@@ -330,37 +328,30 @@ Status LoadStreamStub::wait_for_schema(int64_t 
partition_id, int64_t index_id, i
     return Status::OK();
 }
 
-Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
+Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* 
is_closed) {
     DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK);
+    DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed",
+                    { return Status::InternalError("close failed"); });
+    *is_closed = true;
     if (!_is_open.load()) {
         // we don't need to close wait on non-open streams
         return Status::OK();
     }
+    if (state->get_query_ctx()->is_cancelled()) {
+        return state->get_query_ctx()->exec_status();
+    }
     if (!_is_closing.load()) {
+        *is_closed = false;
         return _status;
     }
     if (_is_closed.load()) {
-        return _check_cancel();
-    }
-    DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
-    std::unique_lock<bthread::Mutex> lock(_close_mutex);
-    auto timeout_sec = timeout_ms / 1000;
-    while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
-        //the query maybe cancel, so need check after wait 1s
-        timeout_sec = timeout_sec - 1;
-        LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" << 
timeout_sec
-                  << ", is_closed=" << _is_closed.load()
-                  << ", is_cancelled=" << 
state->get_query_ctx()->is_cancelled();
-        int ret = _close_cv.wait_for(lock, 1000000);
-        if (ret != 0 && timeout_sec <= 0) {
-            return Status::InternalError("stream close_wait timeout, error={}, 
timeout_ms={}, {}",
-                                         ret, timeout_ms, to_string());
+        RETURN_IF_ERROR(_check_cancel());
+        if (!_is_eos.load()) {
+            return Status::InternalError("Stream closed without EOS, {}", 
to_string());
         }
+        return Status::OK();
     }
-    RETURN_IF_ERROR(_check_cancel());
-    if (!_is_eos.load()) {
-        return Status::InternalError("stream closed without eos, {}", 
to_string());
-    }
+    *is_closed = false;
     return Status::OK();
 }
 
@@ -374,11 +365,7 @@ void LoadStreamStub::cancel(Status reason) {
         _cancel_st = reason;
         _is_cancelled.store(true);
     }
-    {
-        std::lock_guard<bthread::Mutex> lock(_close_mutex);
-        _is_closed.store(true);
-        _close_cv.notify_all();
-    }
+    _is_closed.store(true);
 }
 
 Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const 
Slice> data) {
@@ -437,12 +424,34 @@ void LoadStreamStub::_handle_failure(butil::IOBuf& buf, 
Status st) {
         switch (hdr.opcode()) {
         case PStreamHeader::ADD_SEGMENT:
         case PStreamHeader::APPEND_DATA: {
+            
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", {
+                add_failed_tablet(hdr.tablet_id(), st);
+                return;
+            });
+            
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", {
+                add_failed_tablet(hdr.tablet_id(), st);
+                return;
+            });
             add_failed_tablet(hdr.tablet_id(), st);
         } break;
         case PStreamHeader::CLOSE_LOAD: {
+            
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", {
+                brpc::StreamClose(_stream_id);
+                return;
+            });
             brpc::StreamClose(_stream_id);
         } break;
         case PStreamHeader::GET_SCHEMA: {
+            
DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", {
+                // Just log and let wait_for_schema timeout
+                std::ostringstream oss;
+                for (const auto& tablet : hdr.tablets()) {
+                    oss << " " << tablet.tablet_id();
+                }
+                LOG(WARNING) << "failed to send GET_SCHEMA request, 
tablet_id:" << oss.str() << ", "
+                             << *this;
+                return;
+            });
             // Just log and let wait_for_schema timeout
             std::ostringstream oss;
             for (const auto& tablet : hdr.tablets()) {
@@ -556,13 +565,4 @@ Status LoadStreamStubs::close_load(const 
std::vector<PTabletID>& tablets_to_comm
     return status;
 }
 
-Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
-    MonotonicStopWatch watch;
-    watch.start();
-    for (auto& stream : _streams) {
-        RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - 
watch.elapsed_time() / 1000 / 1000));
-    }
-    return Status::OK();
-}
-
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 5c3ca02272d..a030de75720 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -155,7 +155,7 @@ public:
 
     // wait remote to close stream,
     // remote will close stream when it receives CLOSE_LOAD
-    Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
+    Status close_finish_check(RuntimeState* state, bool* is_closed);
 
     // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled
     void cancel(Status reason);
@@ -223,6 +223,8 @@ private:
     void _handle_failure(butil::IOBuf& buf, Status st);
 
     Status _check_cancel() {
+        DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
+                        { return Status::InternalError("stream cancelled"); });
         if (!_is_cancelled.load()) {
             return Status::OK();
         }
@@ -247,9 +249,7 @@ protected:
     Status _cancel_st;
 
     bthread::Mutex _open_mutex;
-    bthread::Mutex _close_mutex;
     bthread::Mutex _cancel_mutex;
-    bthread::ConditionVariable _close_cv;
 
     std::mutex _buffer_mutex;
     std::mutex _send_mutex;
@@ -310,8 +310,6 @@ public:
 
     Status close_load(const std::vector<PTabletID>& tablets_to_commit);
 
-    Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);
-
     std::unordered_set<int64_t> success_tablets() {
         std::unordered_set<int64_t> s;
         for (auto& stream : _streams) {
@@ -330,6 +328,8 @@ public:
         return m;
     }
 
+    std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; }
+
 private:
     std::vector<std::shared_ptr<LoadStreamStub>> _streams;
     std::atomic<bool> _open_success = false;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 28059e10b3c..5104a9e65d2 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -666,7 +666,7 @@ Status VTabletWriterV2::close(Status exec_status) {
         // close_wait on all non-incremental streams, even if this is not the 
last sink.
         // because some per-instance data structures are now shared among all 
sinks
         // due to sharing delta writers and load stream stubs.
-        RETURN_IF_ERROR(_close_wait(false));
+        RETURN_IF_ERROR(_close_wait(_non_incremental_streams()));
 
         // send CLOSE_LOAD on all incremental streams if this is the last sink.
         // this must happen after all non-incremental streams are closed,
@@ -676,7 +676,7 @@ Status VTabletWriterV2::close(Status exec_status) {
         }
 
         // close_wait on all incremental streams, even if this is not the last 
sink.
-        RETURN_IF_ERROR(_close_wait(true));
+        RETURN_IF_ERROR(_close_wait(_incremental_streams()));
 
         // calculate and submit commit info
         if (is_last_sink) {
@@ -721,32 +721,87 @@ Status VTabletWriterV2::close(Status exec_status) {
     return status;
 }
 
-Status VTabletWriterV2::_close_wait(bool incremental) {
+std::unordered_set<std::shared_ptr<LoadStreamStub>> 
VTabletWriterV2::_incremental_streams() {
+    std::unordered_set<std::shared_ptr<LoadStreamStub>> incremental_streams;
+    auto streams_for_node = _load_stream_map->get_streams_for_node();
+    for (const auto& [dst_id, streams] : streams_for_node) {
+        for (const auto& stream : streams->streams()) {
+            if (stream->is_incremental()) {
+                incremental_streams.insert(stream);
+            }
+        }
+    }
+    return incremental_streams;
+}
+
+std::unordered_set<std::shared_ptr<LoadStreamStub>> 
VTabletWriterV2::_non_incremental_streams() {
+    std::unordered_set<std::shared_ptr<LoadStreamStub>> 
non_incremental_streams;
+    auto streams_for_node = _load_stream_map->get_streams_for_node();
+    for (const auto& [dst_id, streams] : streams_for_node) {
+        for (const auto& stream : streams->streams()) {
+            if (!stream->is_incremental()) {
+                non_incremental_streams.insert(stream);
+            }
+        }
+    }
+    return non_incremental_streams;
+}
+
+Status VTabletWriterV2::_close_wait(
+        std::unordered_set<std::shared_ptr<LoadStreamStub>> 
unfinished_streams) {
     SCOPED_TIMER(_close_load_timer);
-    auto st = _load_stream_map->for_each_st(
-            [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> 
Status {
-                if (streams.is_incremental() != incremental) {
-                    return Status::OK();
-                }
-                int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
-                                    _timeout_watch.elapsed_time() / 1000 / 
1000;
-                DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { 
remain_ms = 0; });
-                if (remain_ms <= 0) {
-                    LOG(WARNING) << "load timed out before close waiting, 
load_id="
-                                 << print_id(_load_id);
-                    return Status::TimedOut("load timed out before close 
waiting");
-                }
-                auto st = streams.close_wait(_state, remain_ms);
-                if (!st.ok()) {
-                    LOG(WARNING) << "close_wait timeout on streams to dst_id=" 
<< dst_id
-                                 << ", load_id=" << print_id(_load_id) << ": " 
<< st;
-                }
-                return st;
-            });
-    if (!st.ok()) {
-        LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << 
print_id(_load_id);
+    Status status;
+    auto streams_for_node = _load_stream_map->get_streams_for_node();
+    while (true) {
+        RETURN_IF_ERROR(_check_timeout());
+        RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, 
streams_for_node));
+        if (!status.ok() || unfinished_streams.empty()) {
+            LOG(INFO) << "is all unfinished: " << unfinished_streams.empty()
+                      << ", status: " << status << ", txn_id: " << _txn_id
+                      << ", load_id: " << print_id(_load_id);
+            break;
+        }
+        bthread_usleep(1000 * 10);
     }
-    return st;
+    if (!status.ok()) {
+        LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << 
print_id(_load_id);
+    }
+    return status;
+}
+
+Status VTabletWriterV2::_check_timeout() {
+    int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 
1000 -
+                        _timeout_watch.elapsed_time() / 1000 / 1000;
+    DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 
0; });
+    if (remain_ms <= 0) {
+        LOG(WARNING) << "load timed out before close waiting, load_id=" << 
print_id(_load_id);
+        return Status::TimedOut("load timed out before close waiting");
+    }
+    return Status::OK();
+}
+
+Status VTabletWriterV2::_check_streams_finish(
+        std::unordered_set<std::shared_ptr<LoadStreamStub>>& 
unfinished_streams, Status& status,
+        const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& 
streams_for_node) {
+    for (const auto& [dst_id, streams] : streams_for_node) {
+        for (const auto& stream : streams->streams()) {
+            if (!unfinished_streams.contains(stream)) {
+                continue;
+            }
+            bool is_closed = false;
+            auto stream_st = stream->close_finish_check(_state, &is_closed);
+            if (!stream_st.ok()) {
+                status = stream_st;
+                unfinished_streams.erase(stream);
+                LOG(WARNING) << "close_wait failed: " << stream_st
+                             << ", load_id=" << print_id(_load_id);
+            }
+            if (is_closed) {
+                unfinished_streams.erase(stream);
+            }
+        }
+    }
+    return status;
 }
 
 void VTabletWriterV2::_calc_tablets_to_commit() {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 51f647b07e5..10e3c6378cb 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -148,7 +148,17 @@ private:
 
     void _calc_tablets_to_commit();
 
-    Status _close_wait(bool incremental);
+    std::unordered_set<std::shared_ptr<LoadStreamStub>> _incremental_streams();
+
+    std::unordered_set<std::shared_ptr<LoadStreamStub>> 
_non_incremental_streams();
+
+    Status _close_wait(std::unordered_set<std::shared_ptr<LoadStreamStub>> 
unfinished_streams);
+
+    Status _check_timeout();
+
+    Status _check_streams_finish(
+            std::unordered_set<std::shared_ptr<LoadStreamStub>>& 
unfinished_streams, Status& status,
+            const std::unordered_map<int64_t, 
std::shared_ptr<LoadStreamStubs>>& streams_for_node);
 
     void _cancel(Status status);
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
index 30854cfb50b..1a473d90e52 100644
--- 
a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy
@@ -68,7 +68,7 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
             file "baseall.txt"
         }
 
-        def load_with_injection = { injection, error_msg, success=false->
+        def load_with_injection = { injection, error_msg="", success=false->
             try {
                 GetDebugPoint().enableDebugPointForAllBEs(injection)
                 sql "insert into test select * from baseall where k1 <= 3"
@@ -104,6 +104,21 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
         // DeltaWriterV2 stream_size is 0
         load_with_injection("DeltaWriterV2.init.stream_size", "failed to find 
tablet schema")
 
+        // injection cases for VTabletWriterV2 close logic
+        // Test LoadStreamStub close_finish_check close failed
+        load_with_injection("LoadStreamStub::close_finish_check.close_failed")
+        // Test LoadStreamStub _check_cancel when cancelled
+        load_with_injection("LoadStreamStub._check_cancel.cancelled")
+        // Test LoadStreamStub _send_with_retry stream write failed
+        
load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed")
+        // Test LoadStreamStub _handle_failure for different opcodes
+        
load_with_injection("LoadStreamStub._handle_failure.append_data_failed")
+        
load_with_injection("LoadStreamStub._handle_failure.add_segment_failed")
+        load_with_injection("LoadStreamStub._handle_failure.close_load_failed")
+        load_with_injection("LoadStreamStub._handle_failure.get_schema_failed")
+        // Test LoadStreamStub skip send segment
+        load_with_injection("LoadStreamStub.skip_send_segment")
+
         sql """ set enable_memtable_on_sink_node=false """
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to