This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new df6935ad044 [Improvement](fragment) Improvement map performance in
fragment mgr (#46245)
df6935ad044 is described below
commit df6935ad04497408463986a1826c92f818fbf405
Author: Gabriel <[email protected]>
AuthorDate: Thu Jan 2 15:11:56 2025 +0800
[Improvement](fragment) Improvement map performance in fragment mgr (#46245)
pick #46235
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 2 +-
be/src/runtime/fragment_mgr.cpp | 713 +++++++++++++++++++++-------------------
be/src/runtime/fragment_mgr.h | 65 +++-
be/src/runtime/query_context.h | 6 +
5 files changed, 434 insertions(+), 353 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 483e7753ec2..f047071139e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -219,6 +219,7 @@ DEFINE_Int32(check_consistency_worker_count, "1");
DEFINE_Int32(upload_worker_count, "1");
// the count of thread to download
DEFINE_Int32(download_worker_count, "1");
+DEFINE_Int32(num_query_ctx_map_partitions, "128");
// the count of thread to make snapshot
DEFINE_Int32(make_snapshot_worker_count, "5");
// the count of thread to release snapshot
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 3579ce54fce..29080a56def 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1346,7 +1346,7 @@ DECLARE_Int32(spill_io_thread_pool_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_mBool(check_segment_when_build_rowset_meta);
-
+DECLARE_Int32(num_query_ctx_map_partitions);
// max s3 client retry times
DECLARE_mInt32(max_s3_client_retry);
// When meet s3 429 error, the "get" request will
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index b7bbaf8f206..0788b5e3206 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -222,12 +222,95 @@ static std::map<int64_t, std::unordered_set<TUniqueId>>
_get_all_running_queries
return result;
}
+inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
+ uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
+ value = HashUtil::hash(&query_id.hi, 8, value);
+ return value % capacity;
+}
+
+inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
+ uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
+ value = HashUtil::hash(&key.first.hi, 8, value);
+ return value % capacity;
+}
+
+template <typename Key, typename Value, typename ValueType>
+ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
+ _internal_map.resize(config::num_query_ctx_map_partitions);
+ for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
+ _internal_map[i] = {std::make_unique<std::shared_mutex>(),
+ phmap::flat_hash_map<Key, Value>()};
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
+ auto id = get_map_id(query_id, _internal_map.size());
+ {
+ std::shared_lock lock(*_internal_map[id].first);
+ auto& map = _internal_map[id].second;
+ auto search = map.find(query_id);
+ if (search != map.end()) {
+ return search->second;
+ }
+ return std::shared_ptr<ValueType>(nullptr);
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
+ const Key& query_id, std::shared_ptr<ValueType>& query_ctx,
ApplyFunction&& function) {
+ auto id = get_map_id(query_id, _internal_map.size());
+ {
+ std::unique_lock lock(*_internal_map[id].first);
+ auto& map = _internal_map[id].second;
+ auto search = map.find(query_id);
+ if (search != map.end()) {
+ query_ctx = search->second;
+ }
+ if (!query_ctx) {
+ return function(map);
+ }
+ return Status::OK();
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
+ auto id = get_map_id(query_id, _internal_map.size());
+ {
+ std::unique_lock lock(*_internal_map[id].first);
+ auto& map = _internal_map[id].second;
+ map.erase(query_id);
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
+
std::shared_ptr<ValueType> query_ctx) {
+ auto id = get_map_id(query_id, _internal_map.size());
+ {
+ std::unique_lock lock(*_internal_map[id].first);
+ auto& map = _internal_map[id].second;
+ map.insert({query_id, query_ctx});
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::clear() {
+ for (auto& pair : _internal_map) {
+ std::unique_lock lock(*pair.first);
+ auto& map = pair.second;
+ map.clear();
+ }
+}
+
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env), _stop_background_threads_latch(1) {
_entity =
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
REGISTER_HOOK_METRIC(fragment_instance_count,
- [this]() { return _fragment_instance_map.size(); });
+ [this]() { return _fragment_instance_map.num_items();
});
auto s = Thread::create(
"FragmentMgr", "cancel_timeout_plan_fragment", [this]() {
this->cancel_worker(); },
@@ -268,20 +351,17 @@ void FragmentMgr::stop() {
_thread_pool->shutdown();
// Only me can delete
- {
- std::lock_guard<std::mutex> lock(_lock);
- _fragment_instance_map.clear();
- for (auto& pipeline : _pipeline_map) {
- pipeline.second->close_sink();
- }
- _pipeline_map.clear();
- }
-
- {
- std::unique_lock lock(_query_ctx_map_lock);
- _query_ctx_map.clear();
- }
-
+ _fragment_instance_map.clear();
+ _pipeline_map.apply(
+ [&](phmap::flat_hash_map<TUniqueId,
std::shared_ptr<pipeline::PipelineFragmentContext>>&
+ map) -> Status {
+ for (auto& pipeline : map) {
+ pipeline.second->close_sink();
+ }
+ return Status::OK();
+ });
+ _pipeline_map.clear();
+ _query_ctx_map.clear();
_async_report_thread_pool->shutdown();
}
@@ -621,13 +701,11 @@ void
FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
// remove exec state after this fragment finished
{
- std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
LOG_INFO("Instance {} finished",
print_id(fragment_executor->fragment_instance_id()));
}
if (all_done && query_ctx) {
- std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.erase(query_ctx->query_id());
LOG_INFO("Query {} finished", print_id(query_ctx->query_id()));
}
@@ -721,15 +799,13 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
- std::shared_lock lock(_query_ctx_map_lock);
- auto search = _query_ctx_map.find(query_id);
- if (search == _query_ctx_map.end()) {
+ q_ctx = _query_ctx_map.find(query_id);
+ if (q_ctx == nullptr) {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
- q_ctx = search->second;
}
q_ctx->set_ready_to_execute(false);
LOG_INFO("Query {} start execution", print_id(query_id));
@@ -742,7 +818,6 @@ void FragmentMgr::remove_pipeline_context(
bool all_done = false;
TUniqueId query_id = f_context->get_query_id();
{
- std::lock_guard<std::mutex> lock(_lock);
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
all_done = q_context->countdown(ins_ids.size());
@@ -754,7 +829,6 @@ void FragmentMgr::remove_pipeline_context(
}
}
if (all_done) {
- std::unique_lock lock(_query_ctx_map_lock);
_query_ctx_map.erase(query_id);
LOG_INFO("Query {} finished", print_id(query_id));
}
@@ -768,98 +842,90 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
{ return
Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
- std::shared_lock lock(_query_ctx_map_lock);
- auto search = _query_ctx_map.find(query_id);
- if (search == _query_ctx_map.end()) {
+ query_ctx = _query_ctx_map.find(query_id);
+ if (query_ctx == nullptr) {
return Status::InternalError(
"Failed to get query fragments context. Query may be "
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
- query_ctx = search->second;
} else {
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
- {
- std::shared_lock lock(_query_ctx_map_lock);
- auto search = _query_ctx_map.find(query_id);
- if (search != _query_ctx_map.end()) {
- query_ctx = search->second;
- return Status::OK();
- }
- }
-
- std::unique_lock lock(_query_ctx_map_lock);
- auto search = _query_ctx_map.find(query_id);
- if (search != _query_ctx_map.end()) {
- query_ctx = search->second;
- return Status::OK();
- }
-
- TNetworkAddress current_connect_fe_addr;
- // for gray upragde between 2.1 version, fe may not set
current_connect_fe,
- // then use coord addr instead
- if (params.__isset.current_connect_fe) {
- current_connect_fe_addr = params.current_connect_fe;
- } else {
- current_connect_fe_addr = params.coord;
- }
+ RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
+ query_id, query_ctx,
+ [&](phmap::flat_hash_map<TUniqueId,
std::shared_ptr<QueryContext>>& map) -> Status {
+ TNetworkAddress current_connect_fe_addr;
+ // for gray upragde between 2.1 version, fe may not set
current_connect_fe,
+ // then use coord addr instead
+ if (params.__isset.current_connect_fe) {
+ current_connect_fe_addr = params.current_connect_fe;
+ } else {
+ current_connect_fe_addr = params.coord;
+ }
- LOG(INFO) << "query_id: " << print_id(query_id) << ", coord_addr: " <<
params.coord
- << ", total fragment num on current host: " <<
params.fragment_num_on_host
- << ", fe process uuid: " <<
params.query_options.fe_process_uuid
- << ", query type: " << params.query_options.query_type
- << ", report audit fe:" << current_connect_fe_addr;
-
- // This may be a first fragment request of the query.
- // Create the query fragments context.
- query_ctx = QueryContext::create_shared(
- query_id, params.fragment_num_on_host, _exec_env,
params.query_options,
- params.coord, pipeline, params.is_nereids,
current_connect_fe_addr, query_source);
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
- RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool),
params.desc_tbl,
- &(query_ctx->desc_tbl)));
- // set file scan range params
- if (params.__isset.file_scan_params) {
- query_ctx->file_scan_range_params_map = params.file_scan_params;
- }
+ LOG(INFO) << "query_id: " << print_id(query_id)
+ << ", coord_addr: " << params.coord
+ << ", total fragment num on current host: "
+ << params.fragment_num_on_host
+ << ", fe process uuid: " <<
params.query_options.fe_process_uuid
+ << ", query type: " <<
params.query_options.query_type
+ << ", report audit fe:" <<
current_connect_fe_addr;
+
+ // This may be a first fragment request of the query.
+ // Create the query fragments context.
+ query_ctx = QueryContext::create_shared(
+ query_id, params.fragment_num_on_host, _exec_env,
params.query_options,
+ params.coord, pipeline, params.is_nereids,
current_connect_fe_addr,
+ query_source);
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
+
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
+
&(query_ctx->desc_tbl)));
+ // set file scan range params
+ if (params.__isset.file_scan_params) {
+ query_ctx->file_scan_range_params_map =
params.file_scan_params;
+ }
- query_ctx->query_globals = params.query_globals;
+ query_ctx->query_globals = params.query_globals;
- if (params.__isset.resource_info) {
- query_ctx->user = params.resource_info.user;
- query_ctx->group = params.resource_info.group;
- query_ctx->set_rsc_info = true;
- }
+ if (params.__isset.resource_info) {
+ query_ctx->user = params.resource_info.user;
+ query_ctx->group = params.resource_info.group;
+ query_ctx->set_rsc_info = true;
+ }
-
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
- _set_scan_concurrency(params, query_ctx.get());
- const bool is_pipeline = std::is_same_v<TPipelineFragmentParams,
Params>;
-
- if (params.__isset.workload_groups && !params.workload_groups.empty())
{
- uint64_t tg_id = params.workload_groups[0].id;
- WorkloadGroupPtr workload_group_ptr =
-
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
- if (workload_group_ptr != nullptr) {
- RETURN_IF_ERROR(workload_group_ptr->add_query(query_id,
query_ctx));
-
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
-
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
-
tg_id);
-
- LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
- << ", use workload group: " <<
workload_group_ptr->debug_string()
- << ", is pipeline: " << ((int)is_pipeline);
- } else {
- LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
- << " carried group info but can not find group in
be";
- }
- }
- // There is some logic in query ctx's dctor, we could not check if
exists and delete the
- // temp query ctx now. For example, the query id maybe removed from
workload group's queryset.
- _query_ctx_map.insert(std::make_pair(query_ctx->query_id(),
query_ctx));
- LOG(INFO) << "Register query/load memory tracker, query/load id: "
- << print_id(query_ctx->query_id())
- << " limit: " <<
PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
+
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(
+ pipeline);
+ _set_scan_concurrency(params, query_ctx.get());
+ const bool is_pipeline =
std::is_same_v<TPipelineFragmentParams, Params>;
+
+ if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
+ uint64_t tg_id = params.workload_groups[0].id;
+ WorkloadGroupPtr workload_group_ptr =
+
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
+ if (workload_group_ptr != nullptr) {
+
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
+
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
+
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+ print_id(query_id), tg_id);
+
+ LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
+ << ", use workload group: "
+ << workload_group_ptr->debug_string()
+ << ", is pipeline: " <<
((int)is_pipeline);
+ } else {
+ LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
+ << " carried group info but can not find
group in be";
+ }
+ }
+ // There is some logic in query ctx's dctor, we could not
check if exists and delete the
+ // temp query ctx now. For example, the query id maybe
removed from workload group's queryset.
+ map.insert({query_id, query_ctx});
+ LOG(INFO) << "Register query/load memory tracker,
query/load id: "
+ << print_id(query_ctx->query_id()) << " limit: "
+ << PrettyPrinter::print(query_ctx->mem_limit(),
TUnit::BYTES);
+ return Status::OK();
+ }));
}
return Status::OK();
}
@@ -874,9 +940,8 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
<<
apache::thrift::ThriftDebugString(params.query_options).c_str();
const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
{
- std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_instance_map.find(fragment_instance_id);
- if (iter != _fragment_instance_map.end()) {
+ if (iter != nullptr) {
// Duplicated
LOG(WARNING) << "duplicate fragment instance id: " <<
print_id(fragment_instance_id);
return Status::OK();
@@ -894,8 +959,7 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
// Need lock here, because it will modify fragment ids and std::vector
may resize and reallocate
// memory, but query_is_canncelled will traverse the vector, it will
core.
// query_is_cancelled is called in allocator, we has to avoid dead
lock.
- std::lock_guard<std::mutex> lock(_lock);
- query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
+ query_ctx->push_instance_ids(fragment_instance_id);
}
auto fragment_executor = std::make_shared<PlanFragmentExecutor>(
@@ -921,12 +985,10 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
params.params, params.params.query_id, params.query_options,
&handler,
RuntimeFilterParamsContext::create(fragment_executor->runtime_state())));
{
- std::lock_guard<std::mutex> lock(_lock);
if (handler) {
query_ctx->set_merge_controller_handler(handler);
}
- _fragment_instance_map.insert(
- std::make_pair(params.params.fragment_instance_id,
fragment_executor));
+ _fragment_instance_map.insert(params.params.fragment_instance_id,
fragment_executor);
}
auto st = _thread_pool->submit_func([this, fragment_executor, cb]() {
@@ -938,7 +1000,6 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params,
if (!st.ok()) {
{
// Remove the exec state added
- std::lock_guard<std::mutex> lock(_lock);
_fragment_instance_map.erase(params.params.fragment_instance_id);
}
fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
@@ -957,27 +1018,33 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t
duration) {
auto t = MonotonicNanos();
size_t i = 0;
{
- std::lock_guard<std::mutex> lock(_lock);
fmt::format_to(debug_string_buffer,
"{} pipeline fragment contexts are still running!
duration_limit={}\n",
- _pipeline_map.size(), duration);
+ _pipeline_map.num_items(), duration);
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
- for (auto& it : _pipeline_map) {
- auto elapsed = (t - it.second->create_time()) / 1000000000.0;
- if (elapsed < duration) {
- // Only display tasks which has been running for more than
{duration} seconds.
- continue;
- }
- auto timeout_second = it.second->timeout_second();
- fmt::format_to(debug_string_buffer,
- "No.{} (elapse_second={}s,
query_timeout_second={}s, instance_id="
- "{}) : {}\n",
- i, elapsed, timeout_second, print_id(it.first),
- it.second->debug_string());
- i++;
- }
+ _pipeline_map.apply(
+ [&](phmap::flat_hash_map<TUniqueId,
+
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
+ -> Status {
+ for (auto& it : map) {
+ auto elapsed = (t - it.second->create_time()) /
1000000000.0;
+ if (elapsed < duration) {
+ // Only display tasks which has been running for
more than {duration} seconds.
+ continue;
+ }
+ auto timeout_second = it.second->timeout_second();
+ fmt::format_to(
+ debug_string_buffer,
+ "No.{} (elapse_second={}s,
query_timeout_second={}s, instance_id="
+ "{}) : {}\n",
+ i, elapsed, timeout_second, print_id(it.first),
+ it.second->debug_string());
+ i++;
+ }
+ return Status::OK();
+ });
}
return fmt::to_string(debug_string_buffer);
}
@@ -1037,14 +1104,13 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id =
local_param.fragment_instance_id;
- std::lock_guard<std::mutex> lock(_lock);
auto iter = _pipeline_map.find(fragment_instance_id);
- if (iter != _pipeline_map.end()) {
+ if (iter != nullptr) {
return Status::InternalError(
"exec_plan_fragment input duplicated
fragment_instance_id({})",
UniqueId(fragment_instance_id).to_string());
}
- query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
+ query_ctx->push_instance_ids(fragment_instance_id);
}
if (!params.__isset.need_wait_execution_trigger ||
!params.need_wait_execution_trigger) {
@@ -1052,13 +1118,12 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
}
{
- std::lock_guard<std::mutex> lock(_lock);
std::vector<TUniqueId> ins_ids;
reinterpret_cast<pipeline::PipelineXFragmentContext*>(context.get())
->instance_ids(ins_ids);
// TODO: simplify this mapping
for (const auto& ins_id : ins_ids) {
- _pipeline_map.insert({ins_id, context});
+ _pipeline_map.insert(ins_id, context);
}
}
query_ctx->set_pipeline_context(params.fragment_id, context);
@@ -1071,13 +1136,12 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
const TUniqueId& fragment_instance_id =
local_params.fragment_instance_id;
{
- std::lock_guard<std::mutex> lock(_lock);
- auto iter = _pipeline_map.find(fragment_instance_id);
- if (iter != _pipeline_map.end()) {
+ auto res = _pipeline_map.find(fragment_instance_id);
+ if (res != nullptr) {
// Duplicated
return Status::OK();
}
-
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
+ query_ctx->push_instance_ids(fragment_instance_id);
}
int64_t duration_ns = 0;
@@ -1115,10 +1179,7 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
if (i == 0 && handler) {
query_ctx->set_merge_controller_handler(handler);
}
- {
- std::lock_guard<std::mutex> lock(_lock);
- _pipeline_map.insert(std::make_pair(fragment_instance_id,
context));
- }
+ _pipeline_map.insert(fragment_instance_id, context);
return context->submit();
};
@@ -1188,13 +1249,7 @@ void FragmentMgr::_set_scan_concurrency(const Param&
params, QueryContext* query
}
std::shared_ptr<QueryContext> FragmentMgr::get_query_context(const TUniqueId&
query_id) {
- std::shared_lock lock(_query_ctx_map_lock);
- auto ctx = _query_ctx_map.find(query_id);
- if (ctx != _query_ctx_map.end()) {
- return ctx->second;
- } else {
- return nullptr;
- }
+ return _query_ctx_map.find(query_id);
}
void FragmentMgr::cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
@@ -1202,15 +1257,13 @@ void FragmentMgr::cancel_query(const TUniqueId&
query_id, const PPlanFragmentCan
std::shared_ptr<QueryContext> query_ctx;
std::vector<TUniqueId> all_instance_ids;
{
- std::shared_lock lock(_query_ctx_map_lock);
- auto ctx_iter = _query_ctx_map.find(query_id);
+ query_ctx = _query_ctx_map.find(query_id);
- if (ctx_iter == _query_ctx_map.end()) {
+ if (query_ctx == nullptr) {
LOG(WARNING) << "Query " << print_id(query_id)
<< " does not exists, failed to cancel it";
return;
}
- query_ctx = ctx_iter->second;
// Copy instanceids to avoid concurrent modification.
// And to reduce the scope of lock.
all_instance_ids = query_ctx->fragment_instance_ids;
@@ -1224,10 +1277,7 @@ void FragmentMgr::cancel_query(const TUniqueId&
query_id, const PPlanFragmentCan
}
query_ctx->cancel(msg, Status::Cancelled(msg));
- {
- std::lock_guard<std::mutex> state_lock(_lock);
- _query_ctx_map.erase(query_id);
- }
+ _query_ctx_map.erase(query_id);
LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed.
Reason: " << msg;
}
@@ -1236,22 +1286,10 @@ void FragmentMgr::cancel_instance(const TUniqueId&
instance_id,
std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
std::shared_ptr<PlanFragmentExecutor> non_pipeline_ctx;
{
- std::lock_guard<std::mutex> state_lock(_lock);
- const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
- if (is_pipeline_instance) {
- auto itr = _pipeline_map.find(instance_id);
- if (itr != _pipeline_map.end()) {
- pipeline_ctx = itr->second;
- } else {
- LOG(WARNING) << "Could not find the pipeline instance id:" <<
print_id(instance_id)
- << " to cancel";
- return;
- }
- } else {
- auto itr = _fragment_instance_map.find(instance_id);
- if (itr != _fragment_instance_map.end()) {
- non_pipeline_ctx = itr->second;
- } else {
+ pipeline_ctx = _pipeline_map.find(instance_id);
+ if (!pipeline_ctx) {
+ non_pipeline_ctx = _fragment_instance_map.find(instance_id);
+ if (non_pipeline_ctx == nullptr) {
LOG(WARNING) << "Could not find the fragment instance id:" <<
print_id(instance_id)
<< " to cancel";
return;
@@ -1269,14 +1307,10 @@ void FragmentMgr::cancel_instance(const TUniqueId&
instance_id,
void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t
fragment_id,
const PPlanFragmentCancelReason& reason,
const std::string& msg) {
- std::shared_lock lock(_query_ctx_map_lock);
- auto q_ctx_iter = _query_ctx_map.find(query_id);
- if (q_ctx_iter != _query_ctx_map.end()) {
+ auto res = _query_ctx_map.find(query_id);
+ if (res != nullptr) {
// Has to use value to keep the shared ptr not deconstructed.
- std::shared_ptr<QueryContext> q_ctx = q_ctx_iter->second;
- // the lock should only be used to protect the map, not scope query ctx
- lock.unlock();
- WARN_IF_ERROR(q_ctx->cancel_pipeline_context(fragment_id, reason, msg),
+ WARN_IF_ERROR(res->cancel_pipeline_context(fragment_id, reason, msg),
"fail to cancel fragment");
} else {
LOG(WARNING) << "Could not find the query id:" << print_id(query_id)
@@ -1314,141 +1348,161 @@ void FragmentMgr::cancel_worker() {
VecDateTimeValue now = VecDateTimeValue::local_time();
std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem>
brpc_stub_with_queries;
{
- std::lock_guard<std::mutex> lock(_lock);
- for (auto& fragment_instance_itr : _fragment_instance_map) {
- if (fragment_instance_itr.second->is_timeout(now)) {
-
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
- }
- }
-
- for (auto& pipeline_itr : _pipeline_map) {
- if (pipeline_itr.second->is_timeout(now)) {
- std::vector<TUniqueId> ins_ids;
-
reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get())
- ->instance_ids(ins_ids);
- for (auto& ins_id : ins_ids) {
- queries_timeout.push_back(ins_id);
- }
- } else {
- pipeline_itr.second->clear_finished_tasks();
- }
- }
+ _fragment_instance_map.apply(
+ [&](phmap::flat_hash_map<TUniqueId,
std::shared_ptr<PlanFragmentExecutor>>& map)
+ -> Status {
+ for (auto& fragment_instance_itr : map) {
+ if (fragment_instance_itr.second->is_timeout(now))
{
+ queries_timeout.push_back(
+
fragment_instance_itr.second->fragment_instance_id());
+ }
+ }
+ return Status::OK();
+ });
+ _pipeline_map.apply(
+ [&](phmap::flat_hash_map<
+ TUniqueId,
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
+ -> Status {
+ for (auto& pipeline_itr : map) {
+ if (pipeline_itr.second->is_timeout(now)) {
+ std::vector<TUniqueId> ins_ids;
+
reinterpret_cast<pipeline::PipelineXFragmentContext*>(
+ pipeline_itr.second.get())
+ ->instance_ids(ins_ids);
+ for (auto& ins_id : ins_ids) {
+ queries_timeout.push_back(ins_id);
+ }
+ } else {
+ pipeline_itr.second->clear_finished_tasks();
+ }
+ }
+ return Status::OK();
+ });
}
{
- std::unique_lock lock(_query_ctx_map_lock);
- for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
- if (it->second->is_timeout(now)) {
- LOG_WARNING("Query {} is timeout", print_id(it->first));
- it = _query_ctx_map.erase(it);
- } else {
- if (config::enable_brpc_connection_check) {
- auto brpc_stubs = it->second->get_using_brpc_stubs();
- for (auto& item : brpc_stubs) {
- if (!brpc_stub_with_queries.contains(item.second))
{
- brpc_stub_with_queries.emplace(item.second,
- BrpcItem
{item.first, {it->second}});
- } else {
-
brpc_stub_with_queries[item.second].queries.emplace_back(
- it->second);
+ _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId,
std::shared_ptr<QueryContext>>&
+ map) -> Status {
+ for (auto it = map.begin(); it != map.end();) {
+ if (it->second->is_timeout(now)) {
+ LOG_WARNING("Query {} is timeout",
print_id(it->first));
+ it = map.erase(it);
+ } else {
+ if (config::enable_brpc_connection_check) {
+ auto brpc_stubs =
it->second->get_using_brpc_stubs();
+ for (auto& item : brpc_stubs) {
+ if
(!brpc_stub_with_queries.contains(item.second)) {
+ brpc_stub_with_queries.emplace(
+ item.second, BrpcItem {item.first,
{it->second}});
+ } else {
+
brpc_stub_with_queries[item.second].queries.emplace_back(
+ it->second);
+ }
}
}
+ ++it;
}
- ++it;
}
- }
+ return Status::OK();
+ });
}
{
- std::shared_lock lock(_query_ctx_map_lock);
// We use a very conservative cancel strategy.
// 0. If there are no running frontends, do not cancel any queries.
// 1. If query's process uuid is zero, do not cancel
// 2. If same process uuid, do not cancel
// 3. If fe has zero process uuid, do not cancel
- if (running_fes.empty() && !_query_ctx_map.empty()) {
+ if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
LOG_EVERY_N(WARNING, 10)
<< "Could not find any running frontends, maybe we are
upgrading or "
"starting? "
<< "We will not cancel any outdated queries in this
situation.";
} else {
- for (const auto& q : _query_ctx_map) {
- auto q_ctx = q.second;
- const int64_t fe_process_uuid =
q_ctx->get_fe_process_uuid();
-
- if (fe_process_uuid == 0) {
- // zero means this query is from a older version fe or
- // this fe is starting
- continue;
- }
+ _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId,
+
std::shared_ptr<QueryContext>>& map)
+ -> Status {
+ for (const auto& q : map) {
+ auto q_ctx = q.second;
+ const int64_t fe_process_uuid =
q_ctx->get_fe_process_uuid();
+
+ if (fe_process_uuid == 0) {
+ // zero means this query is from a older version
fe or
+ // this fe is starting
+ continue;
+ }
- // If the query is not running on the any frontends,
cancel it.
- if (auto itr =
running_queries_on_all_fes.find(fe_process_uuid);
- itr != running_queries_on_all_fes.end()) {
- // Query not found on this frontend, and the query
arrives before the last check
- if (itr->second.find(q_ctx->query_id()) ==
itr->second.end() &&
- // tv_nsec represents the number of nanoseconds
that have elapsed since the time point stored in tv_sec.
- // tv_sec is enough, we do not need to check
tv_nsec.
- q_ctx->get_query_arrival_timestamp().tv_sec <
- check_invalid_query_last_timestamp.tv_sec
&&
- q_ctx->get_query_source() ==
QuerySource::INTERNAL_FRONTEND) {
- if (q_ctx->enable_pipeline_x_exec()) {
-
queries_pipeline_task_leak.push_back(q_ctx->query_id());
- LOG_INFO(
- "Query {}, type {} is not found on any
frontends, maybe it "
- "is leaked.",
- print_id(q_ctx->query_id()),
- toString(q_ctx->get_query_source()));
- continue;
+ // If the query is not running on the any frontends,
cancel it.
+ if (auto itr =
running_queries_on_all_fes.find(fe_process_uuid);
+ itr != running_queries_on_all_fes.end()) {
+ // Query not found on this frontend, and the query
arrives before the last check
+ if (itr->second.find(q_ctx->query_id()) ==
itr->second.end() &&
+ // tv_nsec represents the number of
nanoseconds that have elapsed since the time point stored in tv_sec.
+ // tv_sec is enough, we do not need to check
tv_nsec.
+ q_ctx->get_query_arrival_timestamp().tv_sec <
+
check_invalid_query_last_timestamp.tv_sec &&
+ q_ctx->get_query_source() ==
QuerySource::INTERNAL_FRONTEND) {
+ if (q_ctx->enable_pipeline_x_exec()) {
+
queries_pipeline_task_leak.push_back(q_ctx->query_id());
+ LOG_INFO(
+ "Query {}, type {} is not found on
any frontends, "
+ "maybe it "
+ "is leaked.",
+ print_id(q_ctx->query_id()),
+
toString(q_ctx->get_query_source()));
+ continue;
+ }
}
}
- }
- auto query_context = q.second;
+ auto query_context = q.second;
- auto itr = running_fes.find(query_context->coord_addr);
- if (itr != running_fes.end()) {
- if (q.second->get_fe_process_uuid() ==
itr->second.info.process_uuid ||
- itr->second.info.process_uuid == 0) {
- continue;
- } else {
- LOG_WARNING("Coordinator of query {} restarted,
going to cancel it.",
+ auto itr = running_fes.find(query_context->coord_addr);
+ if (itr != running_fes.end()) {
+ if (q.second->get_fe_process_uuid() ==
itr->second.info.process_uuid ||
+ itr->second.info.process_uuid == 0) {
+ continue;
+ } else {
+ LOG_WARNING(
+ "Coordinator of query {} restarted,
going to cancel it.",
print_id(q.second->query_id()));
- }
- } else {
- // In some rear cases, the rpc port of follower is not
updated in time,
- // then the port of this follower will be zero, but
acutally it is still running,
- // and be has already received the query from follower.
- // So we need to check if host is in running_fes.
- bool fe_host_is_standing =
- std::any_of(running_fes.begin(),
running_fes.end(),
- [query_context](const auto& fe) {
- return fe.first.hostname ==
-
query_context->coord_addr.hostname &&
- fe.first.port == 0;
- });
-
- if (fe_host_is_standing) {
- LOG_WARNING(
- "Coordinator {}:{} is not found, but its
host is still "
- "running with an unstable rpc port, not
going to cancel "
- "it.",
- query_context->coord_addr.hostname,
- query_context->coord_addr.port,
- print_id(query_context->query_id()));
- continue;
+ }
} else {
- LOG_WARNING(
- "Could not find target coordinator {}:{}
of query {}, "
- "going to "
- "cancel it.",
- query_context->coord_addr.hostname,
- query_context->coord_addr.port,
- print_id(query_context->query_id()));
+ // In some rear cases, the rpc port of follower is
not updated in time,
+ // then the port of this follower will be zero,
but acutally it is still running,
+ // and be has already received the query from
follower.
+ // So we need to check if host is in running_fes.
+ bool fe_host_is_standing = std::any_of(
+ running_fes.begin(), running_fes.end(),
+ [query_context](const auto& fe) {
+ return fe.first.hostname ==
+
query_context->coord_addr.hostname &&
+ fe.first.port == 0;
+ });
+
+ if (fe_host_is_standing) {
+ LOG_WARNING(
+ "Coordinator {}:{} is not found, but
its host is still "
+ "running with an unstable rpc port,
not going to cancel "
+ "it.",
+ query_context->coord_addr.hostname,
+ query_context->coord_addr.port,
+ print_id(query_context->query_id()));
+ continue;
+ } else {
+ LOG_WARNING(
+ "Could not find target coordinator
{}:{} of query {}, "
+ "going to "
+ "cancel it.",
+ query_context->coord_addr.hostname,
+ query_context->coord_addr.port,
+ print_id(query_context->query_id()));
+ }
}
- }
- // Coorninator of this query has already dead.
- queries_to_cancel.push_back(q.first);
- }
+ // Coorninator of this query has already dead.
+ queries_to_cancel.push_back(q.first);
+ }
+ return Status::OK();
+ });
}
}
@@ -1493,15 +1547,18 @@ void FragmentMgr::cancel_worker() {
void FragmentMgr::debug(std::stringstream& ss) {
// Keep things simple
- std::lock_guard<std::mutex> lock(_lock);
-
- ss << "FragmentMgr have " << _fragment_instance_map.size() << " jobs.\n";
+ ss << "FragmentMgr have " << _fragment_instance_map.num_items() << "
jobs.\n";
ss << "job_id\t\tstart_time\t\texecute_time(s)\n";
VecDateTimeValue now = VecDateTimeValue::local_time();
- for (auto& it : _fragment_instance_map) {
- ss << it.first << "\t" << it.second->start_time().debug_string() <<
"\t"
- << now.second_diff(it.second->start_time()) << "\n";
- }
+ _fragment_instance_map.apply(
+ [&](phmap::flat_hash_map<TUniqueId,
std::shared_ptr<PlanFragmentExecutor>>& map)
+ -> Status {
+ for (auto& it : map) {
+ ss << it.first << "\t" <<
it.second->start_time().debug_string() << "\t"
+ << now.second_diff(it.second->start_time()) << "\n";
+ }
+ return Status::OK();
+ });
}
void FragmentMgr::_check_brpc_available(const
std::shared_ptr<PBackendService_Stub>& brpc_stub,
@@ -1684,26 +1741,22 @@ Status FragmentMgr::apply_filter(const
PPublishFilterRequest* request,
RuntimeFilterMgr* runtime_filter_mgr = nullptr;
if (is_pipeline) {
- std::unique_lock<std::mutex> lock(_lock);
- auto iter = _pipeline_map.find(tfragment_instance_id);
- if (iter == _pipeline_map.end()) {
+ pip_context = _pipeline_map.find(tfragment_instance_id);
+ if (pip_context == nullptr) {
VLOG_CRITICAL << "unknown.... fragment-id:" <<
fragment_instance_id;
return Status::InvalidArgument("fragment-id: {}",
fragment_instance_id.to_string());
}
- pip_context = iter->second;
DCHECK(pip_context != nullptr);
runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
query_thread_context = {pip_context->get_query_ctx()->query_id(),
pip_context->get_query_ctx()->query_mem_tracker};
} else {
- std::unique_lock<std::mutex> lock(_lock);
- auto iter = _fragment_instance_map.find(tfragment_instance_id);
- if (iter == _fragment_instance_map.end()) {
+ fragment_executor = _fragment_instance_map.find(tfragment_instance_id);
+ if (fragment_executor == nullptr) {
VLOG_CRITICAL << "unknown.... fragment instance id:" <<
print_id(tfragment_instance_id);
return Status::InvalidArgument("fragment-id: {}",
print_id(tfragment_instance_id));
}
- fragment_executor = iter->second;
DCHECK(fragment_executor != nullptr);
runtime_filter_mgr =
@@ -1730,16 +1783,14 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
const auto& fragment_instance_ids = request->fragment_instance_ids();
{
- std::unique_lock<std::mutex> lock(_lock);
for (UniqueId fragment_instance_id : fragment_instance_ids) {
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
if (is_pipeline) {
- auto iter = _pipeline_map.find(tfragment_instance_id);
- if (iter == _pipeline_map.end()) {
+ pip_context = _pipeline_map.find(tfragment_instance_id);
+ if (pip_context == nullptr) {
continue;
}
- pip_context = iter->second;
DCHECK(pip_context != nullptr);
runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
@@ -1748,11 +1799,10 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
pip_context->get_query_ctx()->query_mem_tracker,
pip_context->get_query_ctx()->workload_group()};
} else {
- auto iter = _fragment_instance_map.find(tfragment_instance_id);
- if (iter == _fragment_instance_map.end()) {
+ fragment_executor =
_fragment_instance_map.find(tfragment_instance_id);
+ if (fragment_executor == nullptr) {
continue;
}
- fragment_executor = iter->second;
DCHECK(fragment_executor != nullptr);
runtime_filter_mgr =
fragment_executor->get_query_ctx()->runtime_filter_mgr();
@@ -1796,14 +1846,11 @@ Status FragmentMgr::send_filter_size(const
PSendFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::shared_lock lock(_query_ctx_map_lock);
- auto iter = _query_ctx_map.find(query_id);
- if (iter == _query_ctx_map.end()) {
+ query_ctx = _query_ctx_map.find(query_id);
+ if (query_ctx == nullptr) {
return Status::EndOfFile("Query context (query-id: {}) not found,
maybe finished",
queryid.to_string());
}
-
- query_ctx = iter->second;
}
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
@@ -1819,13 +1866,10 @@ Status FragmentMgr::sync_filter_size(const
PSyncFilterSizeRequest* request) {
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::shared_lock lock(_query_ctx_map_lock);
- auto iter = _query_ctx_map.find(query_id);
- if (iter == _query_ctx_map.end()) {
+ query_ctx = _query_ctx_map.find(query_id);
+ if (query_ctx == nullptr) {
return Status::InvalidArgument("query-id: {}",
queryid.to_string());
}
-
- query_ctx = iter->second;
}
return query_ctx->runtime_filter_mgr()->sync_filter_size(request);
}
@@ -1842,15 +1886,10 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
- std::shared_lock lock(_query_ctx_map_lock);
- auto iter = _query_ctx_map.find(query_id);
- if (iter == _query_ctx_map.end()) {
+ query_ctx = _query_ctx_map.find(query_id);
+ if (query_ctx == nullptr) {
return Status::InvalidArgument("query-id: {}",
queryid.to_string());
}
-
- // hold reference to pip_context, or else runtime_state can be
destroyed
- // when filter_controller->merge is still in progress
- query_ctx = iter->second;
}
SCOPED_ATTACH_TASK(query_ctx.get());
auto merge_status = filter_controller->merge(request, attach_data,
opt_remote_rf);
@@ -1936,17 +1975,19 @@ void
FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
}
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>*
query_info_list) {
- {
- std::shared_lock lock(_query_ctx_map_lock);
- for (const auto& q : _query_ctx_map) {
- WorkloadQueryInfo workload_query_info;
- workload_query_info.query_id = print_id(q.first);
- workload_query_info.tquery_id = q.first;
- workload_query_info.wg_id =
- q.second->workload_group() == nullptr ? -1 :
q.second->workload_group()->id();
- query_info_list->push_back(workload_query_info);
- }
- }
+ _query_ctx_map.apply(
+ [&](phmap::flat_hash_map<TUniqueId,
std::shared_ptr<QueryContext>>& map) -> Status {
+ for (const auto& q : map) {
+ WorkloadQueryInfo workload_query_info;
+ workload_query_info.query_id = print_id(q.first);
+ workload_query_info.tquery_id = q.first;
+ workload_query_info.wg_id = q.second->workload_group() ==
nullptr
+ ? -1
+ :
q.second->workload_group()->id();
+ query_info_list->push_back(workload_query_info);
+ }
+ return Status::OK();
+ });
}
} // namespace doris
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 53cea30686f..1ceedb5337d 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -68,6 +68,46 @@ class WorkloadQueryInfo;
std::string to_load_error_http_path(const std::string& file_name);
+template <typename Key, typename Value, typename ValueType>
+class ConcurrentContextMap {
+public:
+ using ApplyFunction = std::function<Status(phmap::flat_hash_map<Key,
Value>&)>;
+ ConcurrentContextMap();
+ Value find(const Key& query_id);
+ void insert(const Key& query_id, std::shared_ptr<ValueType>);
+ void clear();
+ void erase(const Key& query_id);
+ size_t num_items() const {
+ size_t n = 0;
+ for (auto& pair : _internal_map) {
+ std::shared_lock lock(*pair.first);
+ auto& map = pair.second;
+ n += map.size();
+ }
+ return n;
+ }
+ void apply(ApplyFunction&& function) {
+ for (auto& pair : _internal_map) {
+ // TODO: Now only the cancel worker do the GC the _query_ctx_map.
each query must
+ // do erase the finish query unless in _query_ctx_map. Rethink the
logic is ok
+ std::unique_lock lock(*pair.first);
+ static_cast<void>(function(pair.second));
+ }
+ }
+
+ Status apply_if_not_exists(const Key& query_id,
std::shared_ptr<ValueType>& query_ctx,
+ ApplyFunction&& function);
+
+private:
+ // The lock should only be used to protect the structures in fragment
manager. Has to be
+ // used in a very small scope because it may dead lock. For example, if
the _lock is used
+ // in prepare stage, the call path is prepare --> expr prepare --> may
call allocator
+ // when allocate failed, allocator may call query_is_cancelled, query is
callced will also
+ // call _lock, so that there is dead lock.
+ std::vector<std::pair<std::unique_ptr<std::shared_mutex>,
phmap::flat_hash_map<Key, Value>>>
+ _internal_map;
+};
+
// This class used to manage all the fragment execute in this instance
class FragmentMgr : public RestMonitorIface {
public:
@@ -142,10 +182,7 @@ public:
std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id);
- int32_t running_query_num() {
- std::shared_lock ctx_lock(_query_ctx_map_lock);
- return _query_ctx_map.size();
- }
+ int32_t running_query_num() { return _query_ctx_map.num_items(); }
std::string dump_pipeline_tasks(int64_t duration = 0);
std::string dump_pipeline_tasks(TUniqueId& query_id);
@@ -189,21 +226,17 @@ private:
// This is input params
ExecEnv* _exec_env = nullptr;
- // The lock should only be used to protect the structures in fragment
manager. Has to be
- // used in a very small scope because it may dead lock. For example, if
the _lock is used
- // in prepare stage, the call path is prepare --> expr prepare --> may
call allocator
- // when allocate failed, allocator may call query_is_cancelled, query is
callced will also
- // call _lock, so that there is dead lock.
- std::mutex _lock;
-
// Make sure that remove this before no data reference PlanFragmentExecutor
- std::unordered_map<TUniqueId, std::shared_ptr<PlanFragmentExecutor>>
_fragment_instance_map;
-
- std::unordered_map<TUniqueId,
std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
+ // (QueryID, FragmentID) -> PipelineFragmentContext
+ ConcurrentContextMap<TUniqueId, std::shared_ptr<PlanFragmentExecutor>,
PlanFragmentExecutor>
+ _fragment_instance_map;
+ // (QueryID, FragmentID) -> PipelineFragmentContext
+ ConcurrentContextMap<TUniqueId,
std::shared_ptr<pipeline::PipelineFragmentContext>,
+ pipeline::PipelineFragmentContext>
+ _pipeline_map;
- std::shared_mutex _query_ctx_map_lock;
// query id -> QueryContext
- phmap::flat_hash_map<TUniqueId, std::shared_ptr<QueryContext>>
_query_ctx_map;
+ ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryContext>,
QueryContext> _query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>>
_bf_size_map;
CountDownLatch _stop_background_threads_latch;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index dcff789043a..66623a46fbc 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -325,7 +325,13 @@ public:
return _using_brpc_stubs;
}
+ void push_instance_ids(const TUniqueId& ins_id) {
+ std::lock_guard<std::mutex> lock(_ins_lock);
+ fragment_instance_ids.push_back(ins_id);
+ }
+
private:
+ std::mutex _ins_lock;
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
VecDateTimeValue _start_time;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]