This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new fc486abcc26 branch-4.0: [feat](storage) introduce backpressure
algorithm to control version number (part II) (#60866)
fc486abcc26 is described below
commit fc486abcc26c46d3407fee0eddc52f354a52f385
Author: hui lai <[email protected]>
AuthorDate: Mon Mar 9 10:20:25 2026 +0800
branch-4.0: [feat](storage) introduce backpressure algorithm to control
version number (part II) (#60866)
pick
https://github.com/apache/doris/pull/57654
https://github.com/apache/doris/pull/59973
https://github.com/apache/doris/pull/60688
---
be/src/olap/delta_writer.cpp | 10 +++-
be/src/olap/delta_writer.h | 4 ++
be/src/runtime/load_stream.cpp | 58 +++++++++++++++++++++-
be/src/runtime/load_stream.h | 6 +++
be/src/service/internal_service.cpp | 10 ++++
be/src/vec/sink/load_stream_stub.cpp | 45 +++++++++++++++++
be/src/vec/sink/load_stream_stub.h | 10 ++++
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 22 ++++++++
be/src/vec/sink/writer/vtablet_writer_v2.h | 3 ++
gensrc/proto/internal_service.proto | 2 +
.../test_load_back_pressure_version.groovy | 23 +++++----
11 files changed, 181 insertions(+), 12 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 2098711a9e0..2bebaa61295 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -98,9 +98,9 @@ BaseDeltaWriter::~BaseDeltaWriter() {
}
}
-void BaseDeltaWriter::set_tablet_load_rowset_num_info(
+void BaseDeltaWriter::collect_tablet_load_rowset_num_info(
+ BaseTablet* tablet,
google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>*
tablet_infos) {
- auto* tablet = _rowset_builder->tablet().get();
if (tablet == nullptr) {
return;
}
@@ -114,6 +114,12 @@ void BaseDeltaWriter::set_tablet_load_rowset_num_info(
}
}
+void BaseDeltaWriter::set_tablet_load_rowset_num_info(
+ google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>*
tablet_infos) {
+ auto* tablet = _rowset_builder->tablet().get();
+ collect_tablet_load_rowset_num_info(tablet, tablet_infos);
+}
+
DeltaWriter::~DeltaWriter() = default;
Status BaseDeltaWriter::init() {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 1795d834556..d906d5cfe6f 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -93,6 +93,10 @@ public:
int64_t num_rows_filtered() const;
+ static void collect_tablet_load_rowset_num_info(
+ BaseTablet* tablet,
+ google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>*
tablet_infos);
+
void set_tablet_load_rowset_num_info(
google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>*
tablet_info);
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 2e272e25c70..4f2d041eb63 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -34,6 +34,7 @@
#include "cloud/config.h"
#include "common/signal_handler.h"
#include "exec/tablet_info.h"
+#include "olap/delta_writer.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema.h"
@@ -399,6 +400,13 @@ void
IndexStream::_init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int6
}
}
+void IndexStream::get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids) {
+ std::lock_guard lock_guard(_lock);
+ for (const auto& [tablet_id, _] : _tablet_streams_map) {
+ tablet_ids->push_back(tablet_id);
+ }
+}
+
void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids,
FailedTablets* failed_tablets) {
std::lock_guard lock_guard(_lock);
@@ -589,6 +597,43 @@ void LoadStream::_report_schema(StreamId stream, const
PStreamHeader& hdr) {
}
}
+void LoadStream::_report_tablet_load_info(StreamId stream, int64_t index_id) {
+ std::vector<int64_t> write_tablet_ids;
+ auto it = _index_streams_map.find(index_id);
+ if (it != _index_streams_map.end()) {
+ it->second->get_all_write_tablet_ids(&write_tablet_ids);
+ }
+
+ if (!write_tablet_ids.empty()) {
+ butil::IOBuf buf;
+ PLoadStreamResponse response;
+ auto* tablet_load_infos =
response.mutable_tablet_load_rowset_num_infos();
+ _collect_tablet_load_info_from_tablets(write_tablet_ids,
tablet_load_infos);
+ if (tablet_load_infos->empty()) {
+ return;
+ }
+ buf.append(response.SerializeAsString());
+ auto wst = _write_stream(stream, buf);
+ if (!wst.ok()) {
+ LOG(WARNING) << "report tablet load info failed with " << wst <<
", " << *this;
+ }
+ }
+}
+
+void LoadStream::_collect_tablet_load_info_from_tablets(
+ const std::vector<int64_t>& tablet_ids,
+ google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>*
tablet_load_infos) {
+ for (auto tablet_id : tablet_ids) {
+ BaseTabletSPtr tablet;
+ if (auto res = ExecEnv::get_tablet(tablet_id); res.has_value()) {
+ tablet = std::move(res).value();
+ } else {
+ continue;
+ }
+ BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(),
tablet_load_infos);
+ }
+}
+
Status LoadStream::_write_stream(StreamId stream, butil::IOBuf& buf) {
for (;;) {
int ret = 0;
@@ -699,7 +744,18 @@ void LoadStream::_dispatch(StreamId id, const
PStreamHeader& hdr, butil::IOBuf*
}
switch (hdr.opcode()) {
- case PStreamHeader::ADD_SEGMENT: // ADD_SEGMENT will be dispatched inside
TabletStream
+ case PStreamHeader::ADD_SEGMENT: {
+ auto st = _append_data(hdr, data);
+ if (!st.ok()) {
+ _report_failure(id, st, hdr);
+ } else {
+ // Report tablet load info only on ADD_SEGMENT to reduce frequency.
+ // ADD_SEGMENT is sent once per segment, while APPEND_DATA is sent
+ // for every data batch. This reduces unnecessary writes and avoids
+ // potential stream write failures when the sender is closing.
+ _report_tablet_load_info(id, hdr.index_id());
+ }
+ } break;
case PStreamHeader::APPEND_DATA: {
auto st = _append_data(hdr, data);
if (!st.ok()) {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index be147f9415c..caf67815be9 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -99,6 +99,8 @@ public:
void close(const std::vector<PTabletID>& tablets_to_commit,
std::vector<int64_t>* success_tablet_ids, FailedTablets*
failed_tablet_ids);
+ void get_all_write_tablet_ids(std::vector<int64_t>* tablet_ids);
+
private:
void _init_tablet_stream(TabletStreamSharedPtr& tablet_stream, int64_t
tablet_id,
int64_t partition_id);
@@ -154,6 +156,10 @@ private:
const std::vector<int64_t>& success_tablet_ids,
const FailedTablets& failed_tablets, bool eos);
void _report_schema(StreamId stream, const PStreamHeader& hdr);
+ void _report_tablet_load_info(StreamId stream, int64_t index_id);
+ void _collect_tablet_load_info_from_tablets(
+ const std::vector<int64_t>& tablet_ids,
+ google::protobuf::RepeatedPtrField<PTabletLoadRowsetInfo>*
tablet_load_infos);
// report failure for one message
void _report_failure(StreamId stream, const Status& status, const
PStreamHeader& header) {
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 23490e97998..bbd86d9c56c 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -68,6 +68,7 @@
#include "io/fs/stream_load_pipe.h"
#include "io/io_common.h"
#include "olap/data_dir.h"
+#include "olap/delta_writer.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
@@ -415,6 +416,7 @@ void
PInternalService::open_load_stream(google::protobuf::RpcController* control
LOG(INFO) << "open load stream, load_id=" << request->load_id()
<< ", src_id=" << request->src_id();
+ std::vector<BaseTabletSPtr> tablets;
for (const auto& req : request->tablets()) {
BaseTabletSPtr tablet;
if (auto res = ExecEnv::get_tablet(req.tablet_id());
!res.has_value()) [[unlikely]] {
@@ -429,6 +431,14 @@ void
PInternalService::open_load_stream(google::protobuf::RpcController* control
resp->set_index_id(req.index_id());
resp->set_enable_unique_key_merge_on_write(tablet->enable_unique_key_merge_on_write());
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
+ tablets.push_back(tablet);
+ }
+ if (!tablets.empty()) {
+ auto* tablet_load_infos =
response->mutable_tablet_load_rowset_num_infos();
+ for (const auto& tablet : tablets) {
+
BaseDeltaWriter::collect_tablet_load_rowset_num_info(tablet.get(),
+
tablet_load_infos);
+ }
}
LoadStream* load_stream = nullptr;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 85f30351362..faf2b0c1776 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -88,6 +88,10 @@ int
LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, butil::IOBuf
ss << ", status: " << st;
LOG(INFO) << ss.str();
+ if (response.tablet_load_rowset_num_infos_size() > 0) {
+
stub->_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
+ }
+
if (response.has_load_stream_profile()) {
TRuntimeProfileTree tprofile;
const uint8_t* buf =
@@ -196,6 +200,9 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
_enable_unique_mow_for_index->emplace(resp.index_id(),
resp.enable_unique_key_merge_on_write());
}
+ if (response.tablet_load_rowset_num_infos_size() > 0) {
+
_refresh_back_pressure_version_wait_time(response.tablet_load_rowset_num_infos());
+ }
if (cntl.Failed()) {
brpc::StreamClose(_stream_id);
_status = Status::InternalError("Failed to connect to backend {}: {}",
_dst_id,
@@ -503,6 +510,44 @@ Status LoadStreamStub::_send_with_retry(butil::IOBuf& buf)
{
}
}
+void LoadStreamStub::_refresh_back_pressure_version_wait_time(
+ const
::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
+ tablet_load_infos) {
+ int64_t max_rowset_num_gap = 0;
+ // if any one tablet is under high load pressure, we would make the whole
procedure
+ // sleep to prevent the corresponding BE return -235
+ std::for_each(
+ tablet_load_infos.begin(), tablet_load_infos.end(),
+ [&max_rowset_num_gap](auto& load_info) {
+ int64_t cur_rowset_num = load_info.current_rowset_nums();
+ int64_t high_load_point = load_info.max_config_rowset_nums() *
+
(config::load_back_pressure_version_threshold / 100);
+ DCHECK(cur_rowset_num > high_load_point);
+ max_rowset_num_gap = std::max(max_rowset_num_gap,
cur_rowset_num - high_load_point);
+ });
+ // to slow down the high load pressure
+ // we would use the rowset num gap to calculate one sleep time
+ // for example:
+ // if the max tablet version is 2000, there are 3 BE
+ // A: ==================== 1800
+ // B: =================== 1700
+ // C: ================== 1600
+ // ================== 1600
+ // ^
+ // the high load point
+ // then then max gap is 1800 - (max tablet version *
config::load_back_pressure_version_threshold / 100) = 200,
+ // we would make the whole send procesure sleep
+ // 1200ms for compaction to be done toe reduce the high pressure
+ auto max_time = config::max_load_back_pressure_version_wait_time_ms;
+ if (UNLIKELY(max_rowset_num_gap > 0)) {
+ _load_back_pressure_version_wait_time_ms.store(
+ std::min(max_rowset_num_gap + 1000, max_time));
+ LOG(INFO) << "try to back pressure version, wait time(ms): "
+ << _load_back_pressure_version_wait_time_ms << ", load id: "
<< print_id(_load_id)
+ << ", max_rowset_num_gap: " << max_rowset_num_gap;
+ }
+}
+
std::string LoadStreamStub::to_string() {
std::ostringstream ss;
ss << *this;
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 15e51cdd8e6..b97e7e801b6 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -236,6 +236,14 @@ public:
_cancel_st.to_string_no_stack());
}
+ int64_t get_and_reset_load_back_pressure_version_wait_time_ms() {
+ return _load_back_pressure_version_wait_time_ms.exchange(0);
+ }
+
+ void _refresh_back_pressure_version_wait_time(
+ const
::google::protobuf::RepeatedPtrField<::doris::PTabletLoadRowsetInfo>&
+ tablet_load_infos);
+
private:
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data
= {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
@@ -278,6 +286,8 @@ protected:
bthread::Mutex _write_mutex;
size_t _bytes_written = 0;
+
+ std::atomic<int64_t> _load_back_pressure_version_wait_time_ms {0};
};
// a collection of LoadStreams connect to the same node
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 898bb10c437..2017099a4ef 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -57,6 +57,8 @@
namespace doris::vectorized {
#include "common/compile_check_begin.h"
+extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
+
VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs,
std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep)
@@ -243,6 +245,8 @@ Status VTabletWriterV2::_init(RuntimeState* state,
RuntimeProfile* profile) {
_close_timer = ADD_TIMER(_operator_profile, "CloseWaitTime");
_close_writer_timer = ADD_CHILD_TIMER(_operator_profile,
"CloseWriterTime", "CloseWaitTime");
_close_load_timer = ADD_CHILD_TIMER(_operator_profile, "CloseLoadTime",
"CloseWaitTime");
+ _load_back_pressure_version_time_ms =
+ ADD_TIMER(_operator_profile, "LoadBackPressureVersionTimeMs");
if (config::share_delta_writers) {
_delta_writer_for_tablet =
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
@@ -467,6 +471,21 @@ Status VTabletWriterV2::write(RuntimeState* state, Block&
input_block) {
return status;
}
+ int64_t total_wait_time_ms = 0;
+ auto streams_for_node = _load_stream_map->get_streams_for_node();
+ for (const auto& [dst_id, streams] : streams_for_node) {
+ for (const auto& stream : streams->streams()) {
+ auto wait_time_ms =
stream->get_and_reset_load_back_pressure_version_wait_time_ms();
+ if (wait_time_ms > 0) {
+ total_wait_time_ms = std::max(total_wait_time_ms,
wait_time_ms);
+ }
+ }
+ }
+ if (UNLIKELY(total_wait_time_ms > 0)) {
+
std::this_thread::sleep_for(std::chrono::milliseconds(total_wait_time_ms));
+ _load_back_pressure_version_block_ms.fetch_add(total_wait_time_ms);
+ }
+
// check out of limit
RETURN_IF_ERROR(_send_new_partition_batch());
@@ -641,6 +660,9 @@ Status VTabletWriterV2::close(Status exec_status) {
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
COUNTER_SET(_validate_data_timer,
_block_convertor->validate_data_ns());
+ auto back_pressure_time_ms =
_load_back_pressure_version_block_ms.load();
+ COUNTER_SET(_load_back_pressure_version_time_ms,
back_pressure_time_ms);
+ g_sink_load_back_pressure_version_time_ms << back_pressure_time_ms;
// close DeltaWriters
{
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index b1a522e1d37..e92084ebcd5 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -230,6 +230,7 @@ private:
RuntimeProfile::Counter* _close_writer_timer = nullptr;
RuntimeProfile::Counter* _close_load_timer = nullptr;
RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
+ RuntimeProfile::Counter* _load_back_pressure_version_time_ms = nullptr;
std::mutex _close_mutex;
bool _is_closed = false;
@@ -256,6 +257,8 @@ private:
// tablet_id -> <total replicas num, load required replicas num>
std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
+
+ std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
};
} // namespace vectorized
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 33439dfe1d0..e047e5ca8c9 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -959,6 +959,7 @@ message PTabletSchemaWithIndex {
message POpenLoadStreamResponse {
optional PStatus status = 1;
repeated PTabletSchemaWithIndex tablet_schemas = 2;
+ repeated PTabletLoadRowsetInfo tablet_load_rowset_num_infos = 3;
}
message PFailedTablet {
@@ -973,6 +974,7 @@ message PLoadStreamResponse {
optional bytes load_stream_profile = 4;
repeated PTabletSchemaWithIndex tablet_schemas = 5;
optional bool eos = 6;
+ repeated PTabletLoadRowsetInfo tablet_load_rowset_num_infos = 7;
}
message PStreamHeader {
diff --git
a/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
b/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
index 48747266306..a65076f0445 100644
---
a/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
+++
b/regression-test/suites/fault_injection_p0/test_load_back_pressure_version.groovy
@@ -33,14 +33,19 @@ suite("test_load_back_pressure_version", "nonConcurrent") {
)
"""
- try {
- set_be_param("load_back_pressure_version_threshold", "0")
- sql "insert into ${testTable} values(1,1)"
- def res = sql "select * from ${testTable}"
- logger.info("res: " + res.size())
- assertTrue(res.size() == 1)
- } finally {
- set_be_param("load_back_pressure_version_threshold", "80")
- sql """ set enable_memtable_on_sink_node=true """
+ def test_load_back_pressure_version = { int targetRows ->
+ try {
+ set_be_param("load_back_pressure_version_threshold", "0")
+ sql "insert into ${testTable} values(1,1)"
+ def res = sql "select * from ${testTable}"
+ logger.info("res: " + res.size())
+ assertTrue(res.size() == targetRows)
+ } finally {
+ set_be_param("load_back_pressure_version_threshold", "80")
+ }
}
+
+ test_load_back_pressure_version(1)
+ sql """ set enable_memtable_on_sink_node=true """
+ test_load_back_pressure_version(2)
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]