This is an automated email from the ASF dual-hosted git repository.
sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 30d27ca95de [fix](be) Protect tablet writer map lookup in load channel
(#64604)
30d27ca95de is described below
commit 30d27ca95de26ea4a1aced1cba0a799e72658cc6
Author: Refrain <[email protected]>
AuthorDate: Tue Jun 23 21:43:29 2026 +0800
[fix](be) Protect tablet writer map lookup in load channel (#64604)
### What problem does this PR solve?
Issue Number: None
Related PR: #57133
Problem Summary:
`BaseTabletsChannel::_write_block_data` can run concurrently with
`incremental_open` for the same tablets channel. `_tablet_writers` is an
`std::unordered_map` protected by `_tablet_writers_lock` when writers
are inserted, but the tablet load rowset info lookup read the map
without holding the lock.
A concurrent `emplace` may rehash `_tablet_writers`, so the unlocked
lookup can race with bucket reallocation. This patch protects the lookup
with `_tablet_writers_lock` and avoids using unordered_map iterators
after the lock is released. The actual writer operations still run
outside `_tablet_writers_lock`, so the lock remains scoped to the map
access.
---
be/src/load/channel/tablets_channel.cpp | 22 +++++++++++++++-------
1 file changed, 15 insertions(+), 7 deletions(-)
diff --git a/be/src/load/channel/tablets_channel.cpp
b/be/src/load/channel/tablets_channel.cpp
index 21737a00303..2ddc5de245c 100644
--- a/be/src/load/channel/tablets_channel.cpp
+++ b/be/src/load/channel/tablets_channel.cpp
@@ -629,16 +629,17 @@ Status BaseTabletsChannel::_write_block_data(
// add_batch may concurrency with inc_open but not under _lock.
// so need to protect it with _tablet_writers_lock.
- decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
+ BaseDeltaWriter* tablet_writer = nullptr;
{
std::lock_guard<std::mutex> l(_tablet_writers_lock);
- tablet_writer_it = _tablet_writers.find(tablet_id);
+ auto tablet_writer_it = _tablet_writers.find(tablet_id);
if (tablet_writer_it == _tablet_writers.end()) {
return Status::InternalError("unknown tablet to append data,
tablet={}", tablet_id);
}
+ tablet_writer = tablet_writer_it->second.get();
}
- Status st = write_func(tablet_writer_it->second.get());
+ Status st = write_func(tablet_writer);
if (!st.ok()) {
auto err_msg =
fmt::format("tablet writer write failed, tablet_id={},
txn_id={}, err={}",
@@ -647,7 +648,7 @@ Status BaseTabletsChannel::_write_block_data(
PTabletError* error = tablet_errors->Add();
error->set_tablet_id(tablet_id);
error->set_msg(err_msg);
-
static_cast<void>(tablet_writer_it->second->cancel_with_status(st));
+ static_cast<void>(tablet_writer->cancel_with_status(st));
_add_broken_tablet(tablet_id);
// continue write to other tablet.
// the error will return back to sender.
@@ -662,9 +663,16 @@ Status BaseTabletsChannel::_write_block_data(
return writer->write(&send_data, tablet_to_rowidxs_it.second);
}));
- auto tablet_writer_it =
_tablet_writers.find(tablet_to_rowidxs_it.first);
- if (tablet_writer_it != _tablet_writers.end()) {
-
tablet_writer_it->second->set_tablet_load_rowset_num_info(tablet_load_infos);
+ BaseDeltaWriter* tablet_writer = nullptr;
+ {
+ std::lock_guard<std::mutex> l(_tablet_writers_lock);
+ auto tablet_writer_it =
_tablet_writers.find(tablet_to_rowidxs_it.first);
+ if (tablet_writer_it != _tablet_writers.end()) {
+ tablet_writer = tablet_writer_it->second.get();
+ }
+ }
+ if (tablet_writer != nullptr) {
+ tablet_writer->set_tablet_load_rowset_num_info(tablet_load_infos);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]