This is an automated email from the ASF dual-hosted git repository. hellostephen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a9d83b83dff [refactor](sink) refactor vtablet writer v2 sequential close to parallel close (#52639) a9d83b83dff is described below commit a9d83b83dff9f7cc5b7fedc9fb524d7774d2cf45 Author: hui lai <lai...@selectdb.com> AuthorDate: Sat Jul 5 23:13:55 2025 +0800 [refactor](sink) refactor vtablet writer v2 sequential close to parallel close (#52639) ### What problem does this PR solve? 1. re-pick: https://github.com/apache/doris/pull/51989 2. fix can not graceful shutdown. --- be/src/vec/sink/load_stream_map_pool.h | 9 ++ be/src/vec/sink/load_stream_stub.cpp | 72 +++++++------- be/src/vec/sink/load_stream_stub.h | 10 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 107 ++++++++++++++++----- be/src/vec/sink/writer/vtablet_writer_v2.h | 12 ++- .../test_writer_v2_fault_injection.groovy | 17 +++- 6 files changed, 158 insertions(+), 69 deletions(-) diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index bdf98ca8f61..ab8a40d0c91 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -98,6 +98,15 @@ public: // only call this method after release() returns true. void close_load(bool incremental); + std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> get_streams_for_node() { + decltype(_streams_for_node) snapshot; + { + std::lock_guard<std::mutex> lock(_mutex); + snapshot = _streams_for_node; + } + return snapshot; + } + private: const UniqueId _load_id; const int64_t _src_id; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 2479fc6b25c..53cc11c97f5 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -114,9 +114,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) { LOG(WARNING) << "stub is not exist when on_closed, " << *this; return; } - std::lock_guard<bthread::Mutex> lock(stub->_close_mutex); stub->_is_closed.store(true); - stub->_close_cv.notify_all(); } inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler& handler) { @@ -330,37 +328,30 @@ Status LoadStreamStub::wait_for_schema(int64_t partition_id, int64_t index_id, i return Status::OK(); } -Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { +Status LoadStreamStub::close_finish_check(RuntimeState* state, bool* is_closed) { DBUG_EXECUTE_IF("LoadStreamStub::close_wait.long_wait", DBUG_BLOCK); + DBUG_EXECUTE_IF("LoadStreamStub::close_finish_check.close_failed", + { return Status::InternalError("close failed"); }); + *is_closed = true; if (!_is_open.load()) { // we don't need to close wait on non-open streams return Status::OK(); } + if (state->get_query_ctx()->is_cancelled()) { + return state->get_query_ctx()->exec_status(); + } if (!_is_closing.load()) { + *is_closed = false; return _status; } if (_is_closed.load()) { - return _check_cancel(); - } - DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; - std::unique_lock<bthread::Mutex> lock(_close_mutex); - auto timeout_sec = timeout_ms / 1000; - while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) { - //the query maybe cancel, so need check after wait 1s - timeout_sec = timeout_sec - 1; - LOG(INFO) << "close waiting, " << *this << ", timeout_sec=" << timeout_sec - << ", is_closed=" << _is_closed.load() - << ", is_cancelled=" << state->get_query_ctx()->is_cancelled(); - int ret = _close_cv.wait_for(lock, 1000000); - if (ret != 0 && timeout_sec <= 0) { - return Status::InternalError("stream close_wait timeout, error={}, timeout_ms={}, {}", - ret, timeout_ms, to_string()); + RETURN_IF_ERROR(_check_cancel()); + if (!_is_eos.load()) { + return Status::InternalError("Stream closed without EOS, {}", to_string()); } + return Status::OK(); } - RETURN_IF_ERROR(_check_cancel()); - if (!_is_eos.load()) { - return Status::InternalError("stream closed without eos, {}", to_string()); - } + *is_closed = false; return Status::OK(); } @@ -374,11 +365,7 @@ void LoadStreamStub::cancel(Status reason) { _cancel_st = reason; _is_cancelled.store(true); } - { - std::lock_guard<bthread::Mutex> lock(_close_mutex); - _is_closed.store(true); - _close_cv.notify_all(); - } + _is_closed.store(true); } Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const Slice> data) { @@ -437,12 +424,34 @@ void LoadStreamStub::_handle_failure(butil::IOBuf& buf, Status st) { switch (hdr.opcode()) { case PStreamHeader::ADD_SEGMENT: case PStreamHeader::APPEND_DATA: { + DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.append_data_failed", { + add_failed_tablet(hdr.tablet_id(), st); + return; + }); + DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.add_segment_failed", { + add_failed_tablet(hdr.tablet_id(), st); + return; + }); add_failed_tablet(hdr.tablet_id(), st); } break; case PStreamHeader::CLOSE_LOAD: { + DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.close_load_failed", { + brpc::StreamClose(_stream_id); + return; + }); brpc::StreamClose(_stream_id); } break; case PStreamHeader::GET_SCHEMA: { + DBUG_EXECUTE_IF("LoadStreamStub._handle_failure.get_schema_failed", { + // Just log and let wait_for_schema timeout + std::ostringstream oss; + for (const auto& tablet : hdr.tablets()) { + oss << " " << tablet.tablet_id(); + } + LOG(WARNING) << "failed to send GET_SCHEMA request, tablet_id:" << oss.str() << ", " + << *this; + return; + }); // Just log and let wait_for_schema timeout std::ostringstream oss; for (const auto& tablet : hdr.tablets()) { @@ -556,13 +565,4 @@ Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_comm return status; } -Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) { - MonotonicStopWatch watch; - watch.start(); - for (auto& stream : _streams) { - RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - watch.elapsed_time() / 1000 / 1000)); - } - return Status::OK(); -} - } // namespace doris diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 5c3ca02272d..a030de75720 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -155,7 +155,7 @@ public: // wait remote to close stream, // remote will close stream when it receives CLOSE_LOAD - Status close_wait(RuntimeState* state, int64_t timeout_ms = 0); + Status close_finish_check(RuntimeState* state, bool* is_closed); // cancel the stream, abort close_wait, mark _is_closed and _is_cancelled void cancel(Status reason); @@ -223,6 +223,8 @@ private: void _handle_failure(butil::IOBuf& buf, Status st); Status _check_cancel() { + DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled", + { return Status::InternalError("stream cancelled"); }); if (!_is_cancelled.load()) { return Status::OK(); } @@ -247,9 +249,7 @@ protected: Status _cancel_st; bthread::Mutex _open_mutex; - bthread::Mutex _close_mutex; bthread::Mutex _cancel_mutex; - bthread::ConditionVariable _close_cv; std::mutex _buffer_mutex; std::mutex _send_mutex; @@ -310,8 +310,6 @@ public: Status close_load(const std::vector<PTabletID>& tablets_to_commit); - Status close_wait(RuntimeState* state, int64_t timeout_ms = 0); - std::unordered_set<int64_t> success_tablets() { std::unordered_set<int64_t> s; for (auto& stream : _streams) { @@ -330,6 +328,8 @@ public: return m; } + std::vector<std::shared_ptr<LoadStreamStub>> streams() { return _streams; } + private: std::vector<std::shared_ptr<LoadStreamStub>> _streams; std::atomic<bool> _open_success = false; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 28059e10b3c..5104a9e65d2 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -666,7 +666,7 @@ Status VTabletWriterV2::close(Status exec_status) { // close_wait on all non-incremental streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - RETURN_IF_ERROR(_close_wait(false)); + RETURN_IF_ERROR(_close_wait(_non_incremental_streams())); // send CLOSE_LOAD on all incremental streams if this is the last sink. // this must happen after all non-incremental streams are closed, @@ -676,7 +676,7 @@ Status VTabletWriterV2::close(Status exec_status) { } // close_wait on all incremental streams, even if this is not the last sink. - RETURN_IF_ERROR(_close_wait(true)); + RETURN_IF_ERROR(_close_wait(_incremental_streams())); // calculate and submit commit info if (is_last_sink) { @@ -721,32 +721,87 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } -Status VTabletWriterV2::_close_wait(bool incremental) { +std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_incremental_streams() { + std::unordered_set<std::shared_ptr<LoadStreamStub>> incremental_streams; + auto streams_for_node = _load_stream_map->get_streams_for_node(); + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (stream->is_incremental()) { + incremental_streams.insert(stream); + } + } + } + return incremental_streams; +} + +std::unordered_set<std::shared_ptr<LoadStreamStub>> VTabletWriterV2::_non_incremental_streams() { + std::unordered_set<std::shared_ptr<LoadStreamStub>> non_incremental_streams; + auto streams_for_node = _load_stream_map->get_streams_for_node(); + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (!stream->is_incremental()) { + non_incremental_streams.insert(stream); + } + } + } + return non_incremental_streams; +} + +Status VTabletWriterV2::_close_wait( + std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams) { SCOPED_TIMER(_close_load_timer); - auto st = _load_stream_map->for_each_st( - [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status { - if (streams.is_incremental() != incremental) { - return Status::OK(); - } - int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - auto st = streams.close_wait(_state, remain_ms); - if (!st.ok()) { - LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id - << ", load_id=" << print_id(_load_id) << ": " << st; - } - return st; - }); - if (!st.ok()) { - LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); + Status status; + auto streams_for_node = _load_stream_map->get_streams_for_node(); + while (true) { + RETURN_IF_ERROR(_check_timeout()); + RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, streams_for_node)); + if (!status.ok() || unfinished_streams.empty()) { + LOG(INFO) << "is all unfinished: " << unfinished_streams.empty() + << ", status: " << status << ", txn_id: " << _txn_id + << ", load_id: " << print_id(_load_id); + break; + } + bthread_usleep(1000 * 10); } - return st; + if (!status.ok()) { + LOG(WARNING) << "close_wait failed: " << status << ", load_id=" << print_id(_load_id); + } + return status; +} + +Status VTabletWriterV2::_check_timeout() { + int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + return Status::OK(); +} + +Status VTabletWriterV2::_check_streams_finish( + std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status, + const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node) { + for (const auto& [dst_id, streams] : streams_for_node) { + for (const auto& stream : streams->streams()) { + if (!unfinished_streams.contains(stream)) { + continue; + } + bool is_closed = false; + auto stream_st = stream->close_finish_check(_state, &is_closed); + if (!stream_st.ok()) { + status = stream_st; + unfinished_streams.erase(stream); + LOG(WARNING) << "close_wait failed: " << stream_st + << ", load_id=" << print_id(_load_id); + } + if (is_closed) { + unfinished_streams.erase(stream); + } + } + } + return status; } void VTabletWriterV2::_calc_tablets_to_commit() { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 51f647b07e5..10e3c6378cb 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -148,7 +148,17 @@ private: void _calc_tablets_to_commit(); - Status _close_wait(bool incremental); + std::unordered_set<std::shared_ptr<LoadStreamStub>> _incremental_streams(); + + std::unordered_set<std::shared_ptr<LoadStreamStub>> _non_incremental_streams(); + + Status _close_wait(std::unordered_set<std::shared_ptr<LoadStreamStub>> unfinished_streams); + + Status _check_timeout(); + + Status _check_streams_finish( + std::unordered_set<std::shared_ptr<LoadStreamStub>>& unfinished_streams, Status& status, + const std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>& streams_for_node); void _cancel(Status status); diff --git a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy index 30854cfb50b..1a473d90e52 100644 --- a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy @@ -68,7 +68,7 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") { file "baseall.txt" } - def load_with_injection = { injection, error_msg, success=false-> + def load_with_injection = { injection, error_msg="", success=false-> try { GetDebugPoint().enableDebugPointForAllBEs(injection) sql "insert into test select * from baseall where k1 <= 3" @@ -104,6 +104,21 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") { // DeltaWriterV2 stream_size is 0 load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema") + // injection cases for VTabletWriterV2 close logic + // Test LoadStreamStub close_finish_check close failed + load_with_injection("LoadStreamStub::close_finish_check.close_failed") + // Test LoadStreamStub _check_cancel when cancelled + load_with_injection("LoadStreamStub._check_cancel.cancelled") + // Test LoadStreamStub _send_with_retry stream write failed + load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed") + // Test LoadStreamStub _handle_failure for different opcodes + load_with_injection("LoadStreamStub._handle_failure.append_data_failed") + load_with_injection("LoadStreamStub._handle_failure.add_segment_failed") + load_with_injection("LoadStreamStub._handle_failure.close_load_failed") + load_with_injection("LoadStreamStub._handle_failure.get_schema_failed") + // Test LoadStreamStub skip send segment + load_with_injection("LoadStreamStub.skip_send_segment") + sql """ set enable_memtable_on_sink_node=false """ } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org