This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new fc486abcc26 branch-4.0: [feat](storage) introduce backpressure 
algorithm to control version number (part II) (#60866)
fc486abcc26 is described below

commit fc486abcc26c46d3407fee0eddc52f354a52f385
Author: hui lai <[email protected]>
AuthorDate: Mon Mar 9 10:20:25 2026 +0800

    branch-4.0: [feat](storage) introduce backpressure algorithm to control 
version number (part II) (#60866)
    
    pick
    https://github.com/apache/doris/pull/57654
    https://github.com/apache/doris/pull/59973
    https://github.com/apache/doris/pull/60688
---
 be/src/olap/delta_writer.cpp                       | 10 +++-
 be/src/olap/delta_writer.h                         |  4 ++
 be/src/runtime/load_stream.cpp                     | 58 +++++++++++++++++++++-
 be/src/runtime/load_stream.h                       |  6 +++
 be/src/service/internal_service.cpp                | 10 ++++
 be/src/vec/sink/load_stream_stub.cpp               | 45 +++++++++++++++++
 be/src/vec/sink/load_stream_stub.h                 | 10 ++++
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       | 22 ++++++++
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  3 ++
 gensrc/proto/internal_service.proto                |  2 +
 .../test_load_back_pressure_version.groovy         | 23 +++++----
 11 files changed, 181 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 2098711a9e0..2bebaa61295 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -98,9 +98,9 @@ BaseDeltaWriter::~BaseDeltaWriter() {
     }
 }
 
-void BaseDeltaWriter::set_tablet_load_rowset_num_info(
+void BaseDeltaWriter::collect_tablet_load_rowset_num_info(
+        BaseTablet* tablet,
         google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* 
tablet_infos) {
-    auto* tablet = _rowset_builder->tablet().get();
     if (tablet == nullptr) {
         return;
     }
@@ -114,6 +114,12 @@ void BaseDeltaWriter::set_tablet_load_rowset_num_info(
     }
 }
 
+void BaseDeltaWriter::set_tablet_load_rowset_num_info(
+        google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* 
tablet_infos) {
+    auto* tablet = _rowset_builder->tablet().get();
+    collect_tablet_load_rowset_num_info(tablet, tablet_infos);
+}
+
 DeltaWriter::~DeltaWriter() = default;
 
 Status BaseDeltaWriter::init() {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 1795d834556..d906d5cfe6f 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -93,6 +93,10 @@ public:
 
     int64_t num_rows_filtered() const;
 
+    static void collect_tablet_load_rowset_num_info(
+            BaseTablet* tablet,
+            google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* 
tablet_infos);
+
     void set_tablet_load_rowset_num_info(
             google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* 
tablet_info);
 
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 2e272e25c70..4f2d041eb63 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -34,6 +34,7 @@
 #include "cloud/config.h"
 #include "common/signal_handler.h"
 #include "exec/tablet_info.h"
+#include "olap/delta_writer.h"
 #include "olap/tablet.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_schema.h"
@@ -399,6 +400,13 @@ void 
IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int6
     }
 }
 
+void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) {
+    std::lock_guard lock_guard(_lock);
+    for (const auto& [tablet_id, _] : _tablet_streams_map) {
+        tablet_ids->push_back(tablet_id);
+    }
+}
+
 void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
                         std::vector<int64_t>* success_tablet_ids, 
FailedTablets* failed_tablets) {
     std::lock_guard lock_guard(_lock);
@@ -589,6 +597,43 @@ void LoadStream::_report_schema(StreamId stream, const 
PStreamHeader& hdr) {
     }
 }
 
+void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) {
+    std::vector<int64_t> write_tablet_ids;
+    auto it = _index_streams_map.find(index_id);
+    if (it != _index_streams_map.end()) {
+        it->second->get_all_write_tablet_ids(&write_tablet_ids);
+    }
+
+    if (!write_tablet_ids.empty()) {
+        butil::IOBuf buf;
+        PLoadStreamResponse response;
+        auto* tablet_load_infos = 
response.mutable_tablet_load_rowset_num_infos();
+        _collect_tablet_load_info_from_tablets(write_tablet_ids, 
tablet_load_infos);
+        if (tablet_load_infos->empty()) {
+            return;
+        }
+        buf.append(response.SerializeAsString());
+        auto wst = _write_stream(stream, buf);
+        if (!wst.ok()) {
+            LOG(WARNING) << "report tablet load info failed with " << wst << 
", " << *this;
+        }
+    }
+}
+
+void LoadStream::_collect_tablet_load_info_from_tablets(
+        const std::vector<int64_t>& tablet_ids,
+        google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* 
tablet_load_infos) {
+    for (auto tablet_id : tablet_ids) {
+        BaseTabletSPtr tablet;
+        if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) {
+            tablet = std::move(res).value();
+        } else {
+            continue;
+        }
+        BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(), 
tablet_load_infos);
+    }
+}
+
 Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
     for (;;) {
         int ret = 0;
@@ -699,7 +744,18 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
     }
 
     switch (hdr.opcode()) {
-    case PStreamHeader::ADD_SEGMENT: // ADD_SEGMENT will be dispatched inside 
TabletStream
+    case PStreamHeader::ADD_SEGMENT: {
+        auto st = _append_data(hdr, data);
+        if (!st.ok()) {
+            _report_failure(id, st, hdr);
+        } else {
+            // Report tablet load info only on ADD_SEGMENT to reduce frequency.
+            // ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent
+            // for every data batch. This reduces unnecessary writes and avoids
+            // potential stream write failures when the sender is closing.
+            _report_tablet_load_info(id, hdr.index_id());
+        }
+    } break;
     case PStreamHeader::APPEND_DATA: {
         auto st = _append_data(hdr, data);
         if (!st.ok()) {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index be147f9415c..caf67815be9 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -99,6 +99,8 @@ public:
     void close(const std::vector<PTabletID>& tablets_to_commit,
                std::vector<int64_t>* success_tablet_ids, FailedTablets* 
failed_tablet_ids);
 
+    void get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids);
+
 private:
     void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t 
tablet_id,
                              int64_t partition_id);
@@ -154,6 +156,10 @@ private:
                         const std::vector<int64_t>& success_tablet_ids,
                         const FailedTablets& failed_tablets, bool eos);
     void _report_schema(StreamId stream, const PStreamHeader& hdr);
+    void _report_tablet_load_info(StreamId stream, int64_t index_id);
+    void _collect_tablet_load_info_from_tablets(
+            const std::vector<int64_t>& tablet_ids,
+            google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>* 
tablet_load_infos);
 
     // report failure for one message
     void _report_failure(StreamId stream, const Status& status, const 
PStreamHeader& header) {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 23490e97998..bbd86d9c56c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -68,6 +68,7 @@
 #include "io/fs/stream_load_pipe.h"
 #include "io/io_common.h"
 #include "olap/data_dir.h"
+#include "olap/delta_writer.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/beta_rowset.h"
@@ -415,6 +416,7 @@ void 
PInternalService::open_load_stream(google::protobuf::RpcController* control
         LOG(INFO) << "open load stream, load_id=" << request->load_id()
                   << ", src_id=" << request->src_id();
 
+        std::vector<BaseTabletSPtr> tablets;
         for (const auto& req : request->tablets()) {
             BaseTabletSPtr tablet;
             if (auto res = ExecEnv::get_tablet(req.tablet_id()); 
!res.has_value()) [[unlikely]] {
@@ -429,6 +431,14 @@ void 
PInternalService::open_load_stream(google::protobuf::RpcController* control
             resp->set_index_id(req.index_id());
             
resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
             
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
+            tablets.push_back(tablet);
+        }
+        if (!tablets.empty()) {
+            auto* tablet_load_infos = 
response->mutable_tablet_load_rowset_num_infos();
+            for (const auto& tablet : tablets) {
+                
BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(),
+                                                                     
tablet_load_infos);
+            }
         }
 
         LoadStream* load_stream = nullptr;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 85f30351362..faf2b0c1776 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -88,6 +88,10 @@ int 
LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, butil::IOBuf
         ss << ", status: " << st;
         LOG(INFO) << ss.str();
 
+        if (response.tablet_load_rowset_num_infos_size() > 0) {
+            
stub->_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
+        }
+
         if (response.has_load_stream_profile()) {
             TRuntimeProfileTree tprofile;
             const uint8_t* buf =
@@ -196,6 +200,9 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
         _enable_unique_mow_for_index->emplace(resp.index_id(),
                                               
resp.enable_unique_key_merge_on_write());
     }
+    if (response.tablet_load_rowset_num_infos_size() > 0) {
+        
_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
+    }
     if (cntl.Failed()) {
         brpc::StreamClose(_stream_id);
         _status = Status::InternalError("Failed to connect to backend {}: {}", 
_dst_id,
@@ -503,6 +510,44 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf) 
{
     }
 }
 
+void LoadStreamStub::_refresh_back_pressure_version_wait_time(
+        const 
::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
+                tablet_load_infos) {
+    int64_t max_rowset_num_gap = 0;
+    // if any one tablet is under high load pressure, we would make the whole 
procedure
+    // sleep to prevent the corresponding BE return -235
+    std::for_each(
+            tablet_load_infos.begin(), tablet_load_infos.end(),
+            [&max_rowset_num_gap](auto& load_info) {
+                int64_t cur_rowset_num = load_info.current_rowset_nums();
+                int64_t high_load_point = load_info.max_config_rowset_nums() *
+                                          
(config::load_back_pressure_version_threshold / 100);
+                DCHECK(cur_rowset_num > high_load_point);
+                max_rowset_num_gap = std::max(max_rowset_num_gap, 
cur_rowset_num - high_load_point);
+            });
+    // to slow down the high load pressure
+    // we would use the rowset num gap to calculate one sleep time
+    // for example:
+    // if the max tablet version is 2000, there are 3 BE
+    // A: ====================  1800
+    // B: ===================   1700
+    // C: ==================    1600
+    //    ==================    1600
+    //                      ^
+    //                      the high load point
+    // then then max gap is 1800 - (max tablet version * 
config::load_back_pressure_version_threshold / 100) = 200,
+    // we would make the whole send procesure sleep
+    // 1200ms for compaction to be done toe reduce the high pressure
+    auto max_time = config::max_load_back_pressure_version_wait_time_ms;
+    if (UNLIKELY(max_rowset_num_gap > 0)) {
+        _load_back_pressure_version_wait_time_ms.store(
+                std::min(max_rowset_num_gap + 1000, max_time));
+        LOG(INFO) << "try to back pressure version, wait time(ms): "
+                  << _load_back_pressure_version_wait_time_ms << ", load id: " 
<< print_id(_load_id)
+                  << ", max_rowset_num_gap: " << max_rowset_num_gap;
+    }
+}
+
 std::string LoadStreamStub::to_string() {
     std::ostringstream ss;
     ss << *this;
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 15e51cdd8e6..b97e7e801b6 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -236,6 +236,14 @@ public:
                                  _cancel_st.to_string_no_stack());
     }
 
+    int64_t get_and_reset_load_back_pressure_version_wait_time_ms() {
+        return _load_back_pressure_version_wait_time_ms.exchange(0);
+    }
+
+    void _refresh_back_pressure_version_wait_time(
+            const 
::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
+                    tablet_load_infos);
+
 private:
     Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data 
= {});
     Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
@@ -278,6 +286,8 @@ protected:
 
     bthread::Mutex _write_mutex;
     size_t _bytes_written = 0;
+
+    std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
 };
 
 // a collection of LoadStreams connect to the same node
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 898bb10c437..2017099a4ef 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -57,6 +57,8 @@
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 
+extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
+
 VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs,
                                  std::shared_ptr<pipeline::Dependency> dep,
                                  std::shared_ptr<pipeline::Dependency> fin_dep)
@@ -243,6 +245,8 @@ Status VTabletWriterV2::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     _close_timer = ADD_TIMER(_operator_profile, "CloseWaitTime");
     _close_writer_timer = ADD_CHILD_TIMER(_operator_profile, 
"CloseWriterTime", "CloseWaitTime");
     _close_load_timer = ADD_CHILD_TIMER(_operator_profile, "CloseLoadTime", 
"CloseWaitTime");
+    _load_back_pressure_version_time_ms =
+            ADD_TIMER(_operator_profile, "LoadBackPressureVersionTimeMs");
 
     if (config::share_delta_writers) {
         _delta_writer_for_tablet = 
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
@@ -467,6 +471,21 @@ Status VTabletWriterV2::write(RuntimeState* state, Block& 
input_block) {
         return status;
     }
 
+    int64_t total_wait_time_ms = 0;
+    auto streams_for_node = _load_stream_map->get_streams_for_node();
+    for (const auto& [dst_id, streams] : streams_for_node) {
+        for (const auto& stream : streams->streams()) {
+            auto wait_time_ms = 
stream->get_and_reset_load_back_pressure_version_wait_time_ms();
+            if (wait_time_ms > 0) {
+                total_wait_time_ms = std::max(total_wait_time_ms, 
wait_time_ms);
+            }
+        }
+    }
+    if (UNLIKELY(total_wait_time_ms > 0)) {
+        
std::this_thread::sleep_for(std::chrono::milliseconds(total_wait_time_ms));
+        _load_back_pressure_version_block_ms.fetch_add(total_wait_time_ms);
+    }
+
     // check out of limit
     RETURN_IF_ERROR(_send_new_partition_batch());
 
@@ -641,6 +660,9 @@ Status VTabletWriterV2::close(Status exec_status) {
         COUNTER_SET(_send_data_timer, _send_data_ns);
         COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
         COUNTER_SET(_validate_data_timer, 
_block_convertor->validate_data_ns());
+        auto back_pressure_time_ms = 
_load_back_pressure_version_block_ms.load();
+        COUNTER_SET(_load_back_pressure_version_time_ms, 
back_pressure_time_ms);
+        g_sink_load_back_pressure_version_time_ms << back_pressure_time_ms;
 
         // close DeltaWriters
         {
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index b1a522e1d37..e92084ebcd5 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -230,6 +230,7 @@ private:
     RuntimeProfile::Counter* _close_writer_timer = nullptr;
     RuntimeProfile::Counter* _close_load_timer = nullptr;
     RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
+    RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr;
 
     std::mutex _close_mutex;
     bool _is_closed = false;
@@ -256,6 +257,8 @@ private:
 
     // tablet_id -> <total replicas num, load required replicas num>
     std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
+
+    std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
 };
 
 } // namespace vectorized
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 33439dfe1d0..e047e5ca8c9 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -959,6 +959,7 @@ message PTabletSchemaWithIndex {
 message POpenLoadStreamResponse {
     optional PStatus status = 1;
     repeated PTabletSchemaWithIndex tablet_schemas = 2;
+    repeated PTabletLoadRowsetInfo tablet_load_rowset_num_infos = 3;
 }
 
 message PFailedTablet {
@@ -973,6 +974,7 @@ message PLoadStreamResponse {
     optional bytes load_stream_profile = 4;
     repeated PTabletSchemaWithIndex tablet_schemas = 5;
     optional bool eos = 6;
+    repeated PTabletLoadRowsetInfo tablet_load_rowset_num_infos = 7;
 }
 
 message PStreamHeader {
diff --git 
a/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
 
b/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
index 48747266306..a65076f0445 100644
--- 
a/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
@@ -33,14 +33,19 @@ suite("test_load_back_pressure_version", "nonConcurrent") {
         )
     """
 
-    try {
-        set_be_param("load_back_pressure_version_threshold", "0")
-        sql "insert into ${testTable} values(1,1)"
-        def res = sql "select * from ${testTable}"
-        logger.info("res: " + res.size())
-        assertTrue(res.size() == 1)
-    } finally {
-        set_be_param("load_back_pressure_version_threshold", "80")
-        sql """ set enable_memtable_on_sink_node=true """
+    def test_load_back_pressure_version = { int targetRows ->
+        try {
+            set_be_param("load_back_pressure_version_threshold", "0")
+            sql "insert into ${testTable} values(1,1)"
+            def res = sql "select * from ${testTable}"
+            logger.info("res: " + res.size())
+            assertTrue(res.size() == targetRows)
+        } finally {
+            set_be_param("load_back_pressure_version_threshold", "80")
+        }
     }
+
+    test_load_back_pressure_version(1)
+    sql """ set enable_memtable_on_sink_node=true """
+    test_load_back_pressure_version(2)
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to