This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a512edc7e2c [fix](multi-catalog) Fix multi-thread issue in
hive/iceberg writer commit meta-info to fe. (#49920)
a512edc7e2c is described below
commit a512edc7e2cbb7afd7e525cb76786c90a67b5937
Author: Qi Chen <[email protected]>
AuthorDate: Thu Apr 10 21:57:54 2025 +0800
[fix](multi-catalog) Fix multi-thread issue in hive/iceberg writer commit
meta-info to fe. (#49920)
---
be/src/runtime/fragment_mgr.cpp | 28 ++++++++--------------
be/src/runtime/runtime_state.h | 22 +++++++++++++++--
.../writer/iceberg/viceberg_partition_writer.cpp | 3 ++-
be/src/vec/sink/writer/vhive_partition_writer.cpp | 3 ++-
4 files changed, 34 insertions(+), 22 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9b1461bcfac..c995a861884 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -583,37 +583,29 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
}
}
- if (!req.runtime_state->hive_partition_updates().empty()) {
+ if (auto hpu = req.runtime_state->hive_partition_updates();
!hpu.empty()) {
params.__isset.hive_partition_updates = true;
- params.hive_partition_updates.reserve(
- req.runtime_state->hive_partition_updates().size());
- for (auto& hive_partition_update :
req.runtime_state->hive_partition_updates()) {
- params.hive_partition_updates.push_back(hive_partition_update);
- }
+
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
hpu.begin(),
+ hpu.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
- if (!rs->hive_partition_updates().empty()) {
+ if (auto rs_hpu = rs->hive_partition_updates();
!rs_hpu.empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
-
rs->hive_partition_updates().begin(),
-
rs->hive_partition_updates().end());
+ rs_hpu.begin(),
rs_hpu.end());
}
}
}
-
- if (!req.runtime_state->iceberg_commit_datas().empty()) {
+ if (auto icd = req.runtime_state->iceberg_commit_datas();
!icd.empty()) {
params.__isset.iceberg_commit_datas = true;
-
params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
- for (auto& iceberg_commit_data :
req.runtime_state->iceberg_commit_datas()) {
- params.iceberg_commit_datas.push_back(iceberg_commit_data);
- }
+
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
icd.begin(),
+ icd.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
- if (!rs->iceberg_commit_datas().empty()) {
+ if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty())
{
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
-
rs->iceberg_commit_datas().begin(),
-
rs->iceberg_commit_datas().end());
+ rs_icd.begin(),
rs_icd.end());
}
}
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 0f368748101..a70eba0ab4c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -481,9 +481,25 @@ public:
std::make_move_iterator(tablet_infos.end()));
}
- std::vector<THivePartitionUpdate>& hive_partition_updates() { return
_hive_partition_updates; }
+ std::vector<THivePartitionUpdate> hive_partition_updates() const {
+ std::lock_guard<std::mutex> lock(_hive_partition_updates_mutex);
+ return _hive_partition_updates;
+ }
+
+ void add_hive_partition_updates(const THivePartitionUpdate&
hive_partition_update) {
+ std::lock_guard<std::mutex> lock(_hive_partition_updates_mutex);
+ _hive_partition_updates.emplace_back(hive_partition_update);
+ }
- std::vector<TIcebergCommitData>& iceberg_commit_datas() { return
_iceberg_commit_datas; }
+ std::vector<TIcebergCommitData> iceberg_commit_datas() const {
+ std::lock_guard<std::mutex> lock(_iceberg_commit_datas_mutex);
+ return _iceberg_commit_datas;
+ }
+
+ void add_iceberg_commit_datas(const TIcebergCommitData&
iceberg_commit_data) {
+ std::lock_guard<std::mutex> lock(_iceberg_commit_datas_mutex);
+ _iceberg_commit_datas.emplace_back(iceberg_commit_data);
+ }
// local runtime filter mgr, the runtime filter do not have remote target
or
// not need local merge should regist here. the instance exec finish, the
local
@@ -807,8 +823,10 @@ private:
int _task_id = -1;
int _task_num = 0;
+ mutable std::mutex _hive_partition_updates_mutex;
std::vector<THivePartitionUpdate> _hive_partition_updates;
+ mutable std::mutex _iceberg_commit_datas_mutex;
std::vector<TIcebergCommitData> _iceberg_commit_datas;
std::vector<std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>>
_op_id_to_local_state;
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index f3691f8c621..691de9c1727 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -117,7 +117,8 @@ Status VIcebergPartitionWriter::close(const Status& status)
{
}
}
if (status_ok) {
-
_state->iceberg_commit_datas().emplace_back(_build_iceberg_commit_data());
+ auto commit_data = _build_iceberg_commit_data();
+ _state->add_iceberg_commit_datas(commit_data);
}
return result_status;
}
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 714513adcce..82d56921966 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -131,7 +131,8 @@ Status VHivePartitionWriter::close(const Status& status) {
}
}
if (status_ok) {
-
_state->hive_partition_updates().emplace_back(_build_partition_update());
+ auto partition_update = _build_partition_update();
+ _state->add_hive_partition_updates(partition_update);
}
return result_status;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]