This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 00ec5875d21 [fix](load) add lock for
runtime_state->tablet_commit_infos (#48709)
00ec5875d21 is described below
commit 00ec5875d213c7f97876d2a54482aad8201fa606
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Mar 7 22:45:22 2025 +0800
[fix](load) add lock for runtime_state->tablet_commit_infos (#48709)
Lock before read/write tablet_commit_infos vector, to fix
use-after-free.
---
be/src/io/fs/multi_table_pipe.cpp | 4 ++--
be/src/runtime/fragment_mgr.cpp | 27 ++++++++--------------
be/src/runtime/runtime_state.h | 27 +++++++++++++++++-----
.../runtime/stream_load/stream_load_executor.cpp | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 21 +++++++++--------
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 +---
6 files changed, 45 insertions(+), 41 deletions(-)
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index b3af2531f15..eb601c5d6f5 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -263,9 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<ExecParam> para
{
std::lock_guard<std::mutex>
l(_tablet_commit_infos_lock);
+ auto commit_infos = state->tablet_commit_infos();
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
-
state->tablet_commit_infos().begin(),
-
state->tablet_commit_infos().end());
+ commit_infos.begin(),
commit_infos.end());
}
_number_total_rows += state->num_rows_load_total();
_number_loaded_rows += state->num_rows_load_success();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 4d795df55f9..6b9b02438a3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -492,35 +492,26 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
}
}
}
- if (!req.runtime_state->tablet_commit_infos().empty()) {
+ if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
params.__isset.commitInfos = true;
-
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
- for (auto& info : req.runtime_state->tablet_commit_infos()) {
- params.commitInfos.push_back(info);
- }
+ params.commitInfos.insert(params.commitInfos.end(), tci.begin(),
tci.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
- if (!rs->tablet_commit_infos().empty()) {
+ if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
params.__isset.commitInfos = true;
- params.commitInfos.insert(params.commitInfos.end(),
- rs->tablet_commit_infos().begin(),
- rs->tablet_commit_infos().end());
+ params.commitInfos.insert(params.commitInfos.end(),
rs_tci.begin(), rs_tci.end());
}
}
}
- if (!req.runtime_state->error_tablet_infos().empty()) {
+ if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
params.__isset.errorTabletInfos = true;
-
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
- for (auto& info : req.runtime_state->error_tablet_infos()) {
- params.errorTabletInfos.push_back(info);
- }
+ params.errorTabletInfos.insert(params.errorTabletInfos.end(),
eti.begin(), eti.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
- if (!rs->error_tablet_infos().empty()) {
+ if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
params.__isset.errorTabletInfos = true;
- params.errorTabletInfos.insert(params.errorTabletInfos.end(),
-
rs->error_tablet_infos().begin(),
- rs->error_tablet_infos().end());
+ params.errorTabletInfos.insert(params.errorTabletInfos.end(),
rs_eti.begin(),
+ rs_eti.end());
}
}
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index aa486d3b8b6..7d1f0bc9faf 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -424,19 +424,33 @@ public:
bool enable_page_cache() const;
- const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
+ std::vector<TTabletCommitInfo> tablet_commit_infos() const {
+ std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
return _tablet_commit_infos;
}
- std::vector<TTabletCommitInfo>& tablet_commit_infos() { return
_tablet_commit_infos; }
+ void add_tablet_commit_infos(std::vector<TTabletCommitInfo>& commit_infos)
{
+ std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
+ _tablet_commit_infos.insert(_tablet_commit_infos.end(),
+
std::make_move_iterator(commit_infos.begin()),
+
std::make_move_iterator(commit_infos.end()));
+ }
- std::vector<THivePartitionUpdate>& hive_partition_updates() { return
_hive_partition_updates; }
+ std::vector<TErrorTabletInfo> error_tablet_infos() const {
+ std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
+ return _error_tablet_infos;
+ }
- std::vector<TIcebergCommitData>& iceberg_commit_datas() { return
_iceberg_commit_datas; }
+ void add_error_tablet_infos(std::vector<TErrorTabletInfo>& tablet_infos) {
+ std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
+ _error_tablet_infos.insert(_error_tablet_infos.end(),
+
std::make_move_iterator(tablet_infos.begin()),
+
std::make_move_iterator(tablet_infos.end()));
+ }
- const std::vector<TErrorTabletInfo>& error_tablet_infos() const { return
_error_tablet_infos; }
+ std::vector<THivePartitionUpdate>& hive_partition_updates() { return
_hive_partition_updates; }
- std::vector<TErrorTabletInfo>& error_tablet_infos() { return
_error_tablet_infos; }
+ std::vector<TIcebergCommitData>& iceberg_commit_datas() { return
_iceberg_commit_datas; }
// local runtime filter mgr, the runtime filter do not have remote target
or
// not need local merge should regist here. the instance exec finish, the
local
@@ -752,6 +766,7 @@ private:
int64_t _error_row_number;
std::string _error_log_file_path;
std::unique_ptr<std::ofstream> _error_log_file; // error file path,
absolute path
+ mutable std::mutex _tablet_infos_mutex;
std::vector<TTabletCommitInfo> _tablet_commit_infos;
std::vector<TErrorTabletInfo> _error_tablet_infos;
int _max_operator_id = 0;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index e14378bb1fd..48682a21677 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -78,7 +78,7 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
ctx->txn_id = state->wal_id();
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
- ctx->commit_infos = std::move(state->tablet_commit_infos());
+ ctx->commit_infos = state->tablet_commit_infos();
ctx->number_total_rows = state->num_rows_load_total();
ctx->number_loaded_rows = state->num_rows_load_success();
ctx->number_filtered_rows = state->num_rows_load_filtered();
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index e04ef160726..7f66308839c 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -195,15 +195,18 @@ Status IndexChannel::check_intolerable_failure() {
}
void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
- std::vector<TErrorTabletInfo>& error_tablet_infos =
state->error_tablet_infos();
+ std::vector<TErrorTabletInfo> error_tablet_infos;
- std::lock_guard<doris::SpinLock> l(_fail_lock);
- for (const auto& it : _failed_channels_msgs) {
- TErrorTabletInfo error_info;
- error_info.__set_tabletId(it.first);
- error_info.__set_msg(it.second);
- error_tablet_infos.emplace_back(error_info);
+ {
+ std::lock_guard<doris::SpinLock> l(_fail_lock);
+ for (const auto& it : _failed_channels_msgs) {
+ TErrorTabletInfo error_info;
+ error_info.__set_tabletId(it.first);
+ error_info.__set_msg(it.second);
+ error_tablet_infos.emplace_back(error_info);
+ }
}
+ state->add_error_tablet_infos(error_tablet_infos);
}
void IndexChannel::set_tablets_received_rows(
@@ -968,9 +971,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
if (_add_batches_finished) {
_close_check();
- state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
-
std::make_move_iterator(_tablet_commit_infos.begin()),
-
std::make_move_iterator(_tablet_commit_infos.end()));
+ _state->add_tablet_commit_infos(_tablet_commit_infos);
_index_channel->set_error_tablet_in_state(state);
_index_channel->set_tablets_received_rows(_tablets_received_rows,
_node_id);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index b3baf89867f..8652127d88f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -682,10 +682,7 @@ Status VTabletWriterV2::close(Status exec_status) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
RETURN_IF_ERROR(
_create_commit_info(tablet_commit_infos, _load_stream_map,
_num_replicas));
- _state->tablet_commit_infos().insert(
- _state->tablet_commit_infos().end(),
- std::make_move_iterator(tablet_commit_infos.begin()),
- std::make_move_iterator(tablet_commit_infos.end()));
+ _state->add_tablet_commit_infos(tablet_commit_infos);
}
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]