This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b2caa40ed7b [Improvement](fragment) Use partitioned hash map to manage
contexts (… (#46282)
b2caa40ed7b is described below
commit b2caa40ed7bd787a35a1a60b1d705eabfea6ce28
Author: Gabriel <[email protected]>
AuthorDate: Fri Jan 3 16:46:02 2025 +0800
[Improvement](fragment) Use partitioned hash map to manage contexts (…
(#46282)
…#46235)
Contexts in `fragment_mgr` are managed by a global map and accessed by
multiple threads concurrently with a global lock. It introduced a
obvious overhead. To solve it , this PR use a partitioned hash table to
optimize the global lock.
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 2 +
be/src/runtime/fragment_mgr.cpp | 508 +++++++++++++++++++++++-----------------
be/src/runtime/fragment_mgr.h | 60 +++--
4 files changed, 342 insertions(+), 229 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 587862beffd..342638786ee 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -211,6 +211,7 @@ DEFINE_Int32(check_consistency_worker_count, "1");
DEFINE_Int32(upload_worker_count, "1");
// the count of thread to download
DEFINE_Int32(download_worker_count, "1");
+DEFINE_Int32(num_query_ctx_map_partitions, "128");
// the count of thread to make snapshot
DEFINE_Int32(make_snapshot_worker_count, "5");
// the count of thread to release snapshot
diff --git a/be/src/common/config.h b/be/src/common/config.h
index bcad3ee29a2..5706a38b4b0 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1350,6 +1350,8 @@ DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_mBool(check_segment_when_build_rowset_meta);
+DECLARE_Int32(num_query_ctx_map_partitions);
+
DECLARE_mBool(enable_s3_rate_limiter);
DECLARE_mInt64(s3_get_bucket_tokens);
DECLARE_mInt64(s3_get_token_per_second);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 87c165222fc..9abdc18ee35 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -225,6 +225,89 @@ static std::map<int64_t, std::unordered_set<TUniqueId>>
_get_all_running_queries
return result;
}
+inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
+ uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
+ value = HashUtil::hash(&query_id.hi, 8, value);
+ return value % capacity;
+}
+
+inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
+ uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
+ value = HashUtil::hash(&key.first.hi, 8, value);
+ return value % capacity;
+}
+
+template <typename Key, typename Value, typename ValueType>
+ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
+ _internal_map.resize(config::num_query_ctx_map_partitions);
+ for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
+ _internal_map[i] = {std::make_unique<std::shared_mutex>(),
+ phmap::flat_hash_map<Key, Value>()};
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
+ auto id = get_map_id(query_id, _internal_map.size());
+ {
+ std::shared_lock lock(*_internal_map[id].first);
+ auto& map = _internal_map[id].second;
+ auto search = map.find(query_id);
+ if (search != map.end()) {
+ return search->second;
+ }
+ return std::shared_ptr<ValueType>(nullptr);
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
+ const Key& query_id, std::shared_ptr<ValueType>& query_ctx,
ApplyFunction&& function) {
+ auto id = get_map_id(query_id, _internal_map.size());
+ {
+ std::unique_lock lock(*_internal_map[id].first);
+ auto& map = _internal_map[id].second;
+ auto search = map.find(query_id);
+ if (search != map.end()) {
+ query_ctx = search->second.lock();
+ }
+ if (!query_ctx) {
+ return function(map);
+ }
+ return Status::OK();
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
+ auto id = get_map_id(query_id, _internal_map.size());
+ {
+ std::unique_lock lock(*_internal_map[id].first);
+ auto& map = _internal_map[id].second;
+ map.erase(query_id);
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
+
std::shared_ptr<ValueType> query_ctx) {
+ auto id = get_map_id(query_id, _internal_map.size());
+ {
+ std::unique_lock lock(*_internal_map[id].first);
+ auto& map = _internal_map[id].second;
+ map.insert({query_id, query_ctx});
+ }
+}
+
+template <typename Key, typename Value, typename ValueType>
+void ConcurrentContextMap<Key, Value, ValueType>::clear() {
+ for (auto& pair : _internal_map) {
+ std::unique_lock lock(*pair.first);
+ auto& map = pair.second;
+ map.clear();
+ }
+}
+
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env), _stop_background_threads_latch(1) {
_entity =
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
@@ -253,14 +336,8 @@ void FragmentMgr::stop() {
}
// Only me can delete
- {
- std::unique_lock lock(_query_ctx_map_mutex);
- _query_ctx_map.clear();
- }
- {
- std::unique_lock lock(_pipeline_map_mutex);
- _pipeline_map.clear();
- }
+ _query_ctx_map.clear();
+ _pipeline_map.clear();
_thread_pool->shutdown();
}
@@ -640,17 +717,13 @@ void FragmentMgr::remove_pipeline_context(
g_fragment_executing_count << -1;
g_fragment_last_active_time.set_value(now);
- std::unique_lock lock(_pipeline_map_mutex);
_pipeline_map.erase({query_id, f_context->get_fragment_id()});
}
std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId&
query_id) {
- std::shared_lock lock(_query_ctx_map_mutex);
- auto search = _query_ctx_map.find(query_id);
- if (search != _query_ctx_map.end()) {
- if (auto q_ctx = search->second.lock()) {
- return q_ctx;
- }
+ auto val = _query_ctx_map.find(query_id);
+ if (auto q_ctx = val.lock()) {
+ return q_ctx;
}
return nullptr;
}
@@ -677,67 +750,66 @@ Status FragmentMgr::_get_or_create_query_ctx(const
TPipelineFragmentParams& para
}
} else {
if (!query_ctx) {
- std::unique_lock lock(_query_ctx_map_mutex);
- // Only one thread need create query ctx. other thread just get
query_ctx in _query_ctx_map.
- auto search = _query_ctx_map.find(query_id);
- if (search != _query_ctx_map.end()) {
- query_ctx = search->second.lock();
- }
-
- if (!query_ctx) {
- WorkloadGroupPtr workload_group_ptr = nullptr;
- std::string wg_info_str = "Workload Group not set";
- if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
- uint64_t wg_id = params.workload_groups[0].id;
- workload_group_ptr =
_exec_env->workload_group_mgr()->get_group(wg_id);
- if (workload_group_ptr != nullptr) {
- wg_info_str = workload_group_ptr->debug_string();
- } else {
- wg_info_str = "set wg but not find it in be";
- }
- }
+ RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
+ query_id, query_ctx,
+ [&](phmap::flat_hash_map<TUniqueId,
std::weak_ptr<QueryContext>>& map)
+ -> Status {
+ WorkloadGroupPtr workload_group_ptr = nullptr;
+ std::string wg_info_str = "Workload Group not set";
+ if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
+ uint64_t wg_id = params.workload_groups[0].id;
+ workload_group_ptr =
_exec_env->workload_group_mgr()->get_group(wg_id);
+ if (workload_group_ptr != nullptr) {
+ wg_info_str =
workload_group_ptr->debug_string();
+ } else {
+ wg_info_str = "set wg but not find it in be";
+ }
+ }
- // First time a fragment of a query arrived. print logs.
- LOG(INFO) << "query_id: " << print_id(query_id) << ",
coord_addr: " << params.coord
- << ", total fragment num on current host: " <<
params.fragment_num_on_host
- << ", fe process uuid: " <<
params.query_options.fe_process_uuid
- << ", query type: " <<
params.query_options.query_type
- << ", report audit fe:" << params.current_connect_fe
- << ", use wg:" << wg_info_str;
-
- // This may be a first fragment request of the query.
- // Create the query fragments context.
- query_ctx = QueryContext::create_shared(query_id, _exec_env,
params.query_options,
- params.coord,
pipeline, params.is_nereids,
-
params.current_connect_fe, query_source);
-
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
- RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool),
params.desc_tbl,
- &(query_ctx->desc_tbl)));
- // set file scan range params
- if (params.__isset.file_scan_params) {
- query_ctx->file_scan_range_params_map =
params.file_scan_params;
- }
+ // First time a fragment of a query arrived. print
logs.
+ LOG(INFO) << "query_id: " << print_id(query_id)
+ << ", coord_addr: " << params.coord
+ << ", total fragment num on current host: "
+ << params.fragment_num_on_host
+ << ", fe process uuid: " <<
params.query_options.fe_process_uuid
+ << ", query type: " <<
params.query_options.query_type
+ << ", report audit fe:" <<
params.current_connect_fe
+ << ", use wg:" << wg_info_str;
+
+ // This may be a first fragment request of the query.
+ // Create the query fragments context.
+ query_ctx = QueryContext::create_shared(
+ query_id, _exec_env, params.query_options,
params.coord, pipeline,
+ params.is_nereids, params.current_connect_fe,
query_source);
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker);
+ RETURN_IF_ERROR(DescriptorTbl::create(
+ &(query_ctx->obj_pool), params.desc_tbl,
&(query_ctx->desc_tbl)));
+ // set file scan range params
+ if (params.__isset.file_scan_params) {
+ query_ctx->file_scan_range_params_map =
params.file_scan_params;
+ }
- query_ctx->query_globals = params.query_globals;
+ query_ctx->query_globals = params.query_globals;
- if (params.__isset.resource_info) {
- query_ctx->user = params.resource_info.user;
- query_ctx->group = params.resource_info.group;
- query_ctx->set_rsc_info = true;
- }
+ if (params.__isset.resource_info) {
+ query_ctx->user = params.resource_info.user;
+ query_ctx->group = params.resource_info.group;
+ query_ctx->set_rsc_info = true;
+ }
- _set_scan_concurrency(params, query_ctx.get());
+ _set_scan_concurrency(params, query_ctx.get());
- if (workload_group_ptr != nullptr) {
- RETURN_IF_ERROR(workload_group_ptr->add_query(query_id,
query_ctx));
- query_ctx->set_workload_group(workload_group_ptr);
-
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
- print_id(query_id), workload_group_ptr->id());
- }
- // There is some logic in query ctx's dctor, we could not
check if exists and delete the
- // temp query ctx now. For example, the query id maybe removed
from workload group's queryset.
- _query_ctx_map.insert({query_id, query_ctx});
- }
+ if (workload_group_ptr != nullptr) {
+
RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, query_ctx));
+ query_ctx->set_workload_group(workload_group_ptr);
+
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(
+ print_id(query_id),
workload_group_ptr->id());
+ }
+ // There is some logic in query ctx's dctor, we could
not check if exists and delete the
+ // temp query ctx now. For example, the query id maybe
removed from workload group's queryset.
+ map.insert({query_id, query_ctx});
+ return Status::OK();
+ }));
}
}
return Status::OK();
@@ -754,24 +826,30 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t
duration) {
{
fmt::format_to(debug_string_buffer,
"{} pipeline fragment contexts are still running!
duration_limit={}\n",
- _pipeline_map.size(), duration);
+ _pipeline_map.num_items(), duration);
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
- std::shared_lock lock(_pipeline_map_mutex);
- for (auto& it : _pipeline_map) {
- auto elapsed = it.second->elapsed_time() / 1000000000.0;
- if (elapsed < duration) {
- // Only display tasks which has been running for more than
{duration} seconds.
- continue;
+ _pipeline_map.apply([&](phmap::flat_hash_map<
+ std::pair<TUniqueId, int>,
+
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
+ -> Status {
+ for (auto& it : map) {
+ auto elapsed = it.second->elapsed_time() / 1000000000.0;
+ if (elapsed < duration) {
+ // Only display tasks which has been running for more than
{duration} seconds.
+ continue;
+ }
+ auto timeout_second = it.second->timeout_second();
+ fmt::format_to(
+ debug_string_buffer,
+ "No.{} (elapse_second={}s, query_timeout_second={}s,
is_timeout={}) : {}\n",
+ i, elapsed, timeout_second, it.second->is_timeout(now),
+ it.second->debug_string());
+ i++;
}
- auto timeout_second = it.second->timeout_second();
- fmt::format_to(
- debug_string_buffer,
- "No.{} (elapse_second={}s, query_timeout_second={}s,
is_timeout={}) : {}\n", i,
- elapsed, timeout_second, it.second->is_timeout(now),
it.second->debug_string());
- i++;
- }
+ return Status::OK();
+ });
}
return fmt::to_string(debug_string_buffer);
}
@@ -842,14 +920,13 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
g_fragment_last_active_time.set_value(now);
// (query_id, fragment_id) is executed only on one BE, locks
_pipeline_map.
- std::unique_lock lock(_pipeline_map_mutex);
- auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
- if (iter != _pipeline_map.end()) {
+ auto res = _pipeline_map.find({params.query_id, params.fragment_id});
+ if (res != nullptr) {
return Status::InternalError(
"exec_plan_fragment query_id({}) input duplicated
fragment_id({})",
print_id(params.query_id), params.fragment_id);
}
- _pipeline_map.insert({{params.query_id, params.fragment_id}, context});
+ _pipeline_map.insert({params.query_id, params.fragment_id}, context);
}
if (!params.__isset.need_wait_execution_trigger ||
!params.need_wait_execution_trigger) {
@@ -890,10 +967,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id,
const Status reason) {
}
}
query_ctx->cancel(reason);
- {
- std::unique_lock l(_query_ctx_map_mutex);
- _query_ctx_map.erase(query_id);
- }
+ _query_ctx_map.erase(query_id);
LOG(INFO) << "Query " << print_id(query_id)
<< " is cancelled and removed. Reason: " << reason.to_string();
}
@@ -926,119 +1000,130 @@ void FragmentMgr::cancel_worker() {
}
std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> ctx;
- {
- std::shared_lock lock(_pipeline_map_mutex);
- ctx.reserve(_pipeline_map.size());
- for (auto& pipeline_itr : _pipeline_map) {
- ctx.push_back(pipeline_itr.second);
- }
- }
+ _pipeline_map.apply(
+ [&](phmap::flat_hash_map<std::pair<TUniqueId, int>,
+
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
+ -> Status {
+ ctx.reserve(ctx.size() + map.size());
+ for (auto& pipeline_itr : map) {
+ ctx.push_back(pipeline_itr.second);
+ }
+ return Status::OK();
+ });
for (auto& c : ctx) {
c->clear_finished_tasks();
}
{
- {
- // TODO: Now only the cancel worker do the GC the
_query_ctx_map. each query must
- // do erase the finish query unless in _query_ctx_map. Rethink
the logic is ok
- std::unique_lock lock(_query_ctx_map_mutex);
- for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
- if (auto q_ctx = it->second.lock()) {
- if (q_ctx->is_timeout(now)) {
- LOG_WARNING("Query {} is timeout",
print_id(it->first));
- queries_timeout.push_back(it->first);
+ _query_ctx_map.apply(
+ [&](phmap::flat_hash_map<TUniqueId,
std::weak_ptr<QueryContext>>& map)
+ -> Status {
+ for (auto it = map.begin(); it != map.end();) {
+ if (auto q_ctx = it->second.lock()) {
+ if (q_ctx->is_timeout(now)) {
+ LOG_WARNING("Query {} is timeout",
print_id(it->first));
+ queries_timeout.push_back(it->first);
+ }
+ ++it;
+ } else {
+ it = map.erase(it);
+ }
}
- ++it;
- } else {
- it = _query_ctx_map.erase(it);
- }
- }
- }
+ return Status::OK();
+ });
- std::shared_lock lock(_query_ctx_map_mutex);
// We use a very conservative cancel strategy.
// 0. If there are no running frontends, do not cancel any queries.
// 1. If query's process uuid is zero, do not cancel
// 2. If same process uuid, do not cancel
// 3. If fe has zero process uuid, do not cancel
- if (running_fes.empty() && !_query_ctx_map.empty()) {
+ if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
LOG_EVERY_N(WARNING, 10)
<< "Could not find any running frontends, maybe we are
upgrading or "
"starting? "
<< "We will not cancel any outdated queries in this
situation.";
} else {
- for (const auto& it : _query_ctx_map) {
- if (auto q_ctx = it.second.lock()) {
- const int64_t fe_process_uuid =
q_ctx->get_fe_process_uuid();
-
- if (fe_process_uuid == 0) {
- // zero means this query is from a older version
fe or
- // this fe is starting
- continue;
- }
-
- // If the query is not running on the any frontends,
cancel it.
- if (auto itr =
running_queries_on_all_fes.find(fe_process_uuid);
- itr != running_queries_on_all_fes.end()) {
- // Query not found on this frontend, and the query
arrives before the last check
- if (itr->second.find(it.first) ==
itr->second.end() &&
- // tv_nsec represents the number of
nanoseconds that have elapsed since the time point stored in tv_sec.
- // tv_sec is enough, we do not need to check
tv_nsec.
- q_ctx->get_query_arrival_timestamp().tv_sec <
-
check_invalid_query_last_timestamp.tv_sec &&
- q_ctx->get_query_source() ==
QuerySource::INTERNAL_FRONTEND) {
-
queries_pipeline_task_leak.push_back(q_ctx->query_id());
- LOG_INFO(
- "Query {}, type {} is not found on any
frontends, maybe it "
- "is leaked.",
- print_id(q_ctx->query_id()),
- toString(q_ctx->get_query_source()));
+ _query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId,
+
std::weak_ptr<QueryContext>>& map)
+ -> Status {
+ for (const auto& it : map) {
+ if (auto q_ctx = it.second.lock()) {
+ const int64_t fe_process_uuid =
q_ctx->get_fe_process_uuid();
+
+ if (fe_process_uuid == 0) {
+ // zero means this query is from a older
version fe or
+ // this fe is starting
continue;
}
- }
- auto itr = running_fes.find(q_ctx->coord_addr);
- if (itr != running_fes.end()) {
- if (fe_process_uuid ==
itr->second.info.process_uuid ||
- itr->second.info.process_uuid == 0) {
- continue;
- } else {
- LOG_WARNING(
- "Coordinator of query {} restarted,
going to cancel it.",
- print_id(q_ctx->query_id()));
+ // If the query is not running on the any
frontends, cancel it.
+ if (auto itr =
running_queries_on_all_fes.find(fe_process_uuid);
+ itr != running_queries_on_all_fes.end()) {
+ // Query not found on this frontend, and the
query arrives before the last check
+ if (itr->second.find(it.first) ==
itr->second.end() &&
+ // tv_nsec represents the number of
nanoseconds that have elapsed since the time point stored in tv_sec.
+ // tv_sec is enough, we do not need to
check tv_nsec.
+
q_ctx->get_query_arrival_timestamp().tv_sec <
+
check_invalid_query_last_timestamp.tv_sec &&
+ q_ctx->get_query_source() ==
QuerySource::INTERNAL_FRONTEND) {
+
queries_pipeline_task_leak.push_back(q_ctx->query_id());
+ LOG_INFO(
+ "Query {}, type {} is not found on
any frontends, "
+ "maybe it "
+ "is leaked.",
+ print_id(q_ctx->query_id()),
+
toString(q_ctx->get_query_source()));
+ continue;
+ }
}
- } else {
- // In some rear cases, the rpc port of follower is
not updated in time,
- // then the port of this follower will be zero,
but acutally it is still running,
- // and be has already received the query from
follower.
- // So we need to check if host is in running_fes.
- bool fe_host_is_standing = std::any_of(
- running_fes.begin(), running_fes.end(),
- [&q_ctx](const auto& fe) {
- return fe.first.hostname ==
q_ctx->coord_addr.hostname &&
- fe.first.port == 0;
- });
- if (fe_host_is_standing) {
- LOG_WARNING(
- "Coordinator {}:{} is not found, but
its host is still "
- "running with an unstable brpc port,
not going to cancel "
- "it.",
- q_ctx->coord_addr.hostname,
q_ctx->coord_addr.port,
- print_id(q_ctx->query_id()));
- continue;
+
+ auto itr = running_fes.find(q_ctx->coord_addr);
+ if (itr != running_fes.end()) {
+ if (fe_process_uuid ==
itr->second.info.process_uuid ||
+ itr->second.info.process_uuid == 0) {
+ continue;
+ } else {
+ LOG_WARNING(
+ "Coordinator of query {}
restarted, going to cancel "
+ "it.",
+ print_id(q_ctx->query_id()));
+ }
} else {
- LOG_WARNING(
- "Could not find target coordinator
{}:{} of query {}, "
- "going to "
- "cancel it.",
- q_ctx->coord_addr.hostname,
q_ctx->coord_addr.port,
- print_id(q_ctx->query_id()));
+ // In some rear cases, the rpc port of
follower is not updated in time,
+ // then the port of this follower will be
zero, but acutally it is still running,
+ // and be has already received the query from
follower.
+ // So we need to check if host is in
running_fes.
+ bool fe_host_is_standing =
+ std::any_of(running_fes.begin(),
running_fes.end(),
+ [&q_ctx](const auto& fe) {
+ return
fe.first.hostname ==
+
q_ctx->coord_addr.hostname &&
+ fe.first.port
== 0;
+ });
+ if (fe_host_is_standing) {
+ LOG_WARNING(
+ "Coordinator {}:{} is not found,
but its host is still "
+ "running with an unstable brpc
port, not going to "
+ "cancel "
+ "it.",
+ q_ctx->coord_addr.hostname,
q_ctx->coord_addr.port,
+ print_id(q_ctx->query_id()));
+ continue;
+ } else {
+ LOG_WARNING(
+ "Could not find target coordinator
{}:{} of query {}, "
+ "going to "
+ "cancel it.",
+ q_ctx->coord_addr.hostname,
q_ctx->coord_addr.port,
+ print_id(q_ctx->query_id()));
+ }
}
}
+ // Coordinator of this query has already dead or query
context has been released.
+ queries_lost_coordinator.push_back(it.first);
}
- // Coordinator of this query has already dead or query
context has been released.
- queries_lost_coordinator.push_back(it.first);
- }
+ return Status::OK();
+ });
}
}
@@ -1169,7 +1254,6 @@ Status FragmentMgr::exec_external_plan_fragment(const
TScanOpenParams& params,
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
butil::IOBufAsZeroCopyInputStream*
attach_data) {
- bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
int64_t start_apply = MonotonicMillis();
std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
@@ -1179,24 +1263,18 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
const auto& fragment_ids = request->fragment_ids();
{
- std::shared_lock lock(_pipeline_map_mutex);
for (auto fragment_id : fragment_ids) {
- if (is_pipeline) {
- auto iter = _pipeline_map.find(
- {UniqueId(request->query_id()).to_thrift(),
fragment_id});
- if (iter == _pipeline_map.end()) {
- continue;
- }
- pip_context = iter->second;
-
- DCHECK(pip_context != nullptr);
- runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
- query_thread_context =
{pip_context->get_query_ctx()->query_id(),
-
pip_context->get_query_ctx()->query_mem_tracker,
-
pip_context->get_query_ctx()->workload_group()};
- } else {
- return Status::InternalError("Non-pipeline is disabled!");
+ pip_context =
+
_pipeline_map.find({UniqueId(request->query_id()).to_thrift(), fragment_id});
+ if (pip_context == nullptr) {
+ continue;
}
+
+ DCHECK(pip_context != nullptr);
+ runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
+ query_thread_context = {pip_context->get_query_ctx()->query_id(),
+
pip_context->get_query_ctx()->query_mem_tracker,
+
pip_context->get_query_ctx()->workload_group()};
break;
}
}
@@ -1294,22 +1372,24 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
}
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>*
query_info_list) {
- {
- std::unique_lock lock(_query_ctx_map_mutex);
- for (auto iter = _query_ctx_map.begin(); iter !=
_query_ctx_map.end();) {
- if (auto q_ctx = iter->second.lock()) {
- WorkloadQueryInfo workload_query_info;
- workload_query_info.query_id = print_id(iter->first);
- workload_query_info.tquery_id = iter->first;
- workload_query_info.wg_id =
- q_ctx->workload_group() == nullptr ? -1 :
q_ctx->workload_group()->id();
- query_info_list->push_back(workload_query_info);
- iter++;
- } else {
- iter = _query_ctx_map.erase(iter);
- }
- }
- }
+ _query_ctx_map.apply(
+ [&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>&
map) -> Status {
+ for (auto iter = map.begin(); iter != map.end();) {
+ if (auto q_ctx = iter->second.lock()) {
+ WorkloadQueryInfo workload_query_info;
+ workload_query_info.query_id = print_id(iter->first);
+ workload_query_info.tquery_id = iter->first;
+ workload_query_info.wg_id = q_ctx->workload_group() ==
nullptr
+ ? -1
+ :
q_ctx->workload_group()->id();
+ query_info_list->push_back(workload_query_info);
+ iter++;
+ } else {
+ iter = map.erase(iter);
+ }
+ }
+ return Status::OK();
+ });
}
Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 0e7691647dd..fb01c899104 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -69,6 +69,46 @@ class WorkloadQueryInfo;
std::string to_load_error_http_path(const std::string& file_name);
+template <typename Key, typename Value, typename ValueType>
+class ConcurrentContextMap {
+public:
+ using ApplyFunction = std::function<Status(phmap::flat_hash_map<Key,
Value>&)>;
+ ConcurrentContextMap();
+ Value find(const Key& query_id);
+ void insert(const Key& query_id, std::shared_ptr<ValueType>);
+ void clear();
+ void erase(const Key& query_id);
+ size_t num_items() const {
+ size_t n = 0;
+ for (auto& pair : _internal_map) {
+ std::shared_lock lock(*pair.first);
+ auto& map = pair.second;
+ n += map.size();
+ }
+ return n;
+ }
+ void apply(ApplyFunction&& function) {
+ for (auto& pair : _internal_map) {
+ // TODO: Now only the cancel worker do the GC the _query_ctx_map.
each query must
+ // do erase the finish query unless in _query_ctx_map. Rethink the
logic is ok
+ std::unique_lock lock(*pair.first);
+ static_cast<void>(function(pair.second));
+ }
+ }
+
+ Status apply_if_not_exists(const Key& query_id,
std::shared_ptr<ValueType>& query_ctx,
+ ApplyFunction&& function);
+
+private:
+ // The lock should only be used to protect the structures in fragment
manager. Has to be
+ // used in a very small scope because it may dead lock. For example, if
the _lock is used
+ // in prepare stage, the call path is prepare --> expr prepare --> may
call allocator
+ // when allocate failed, allocator may call query_is_cancelled, query is
callced will also
+ // call _lock, so that there is dead lock.
+ std::vector<std::pair<std::unique_ptr<std::shared_mutex>,
phmap::flat_hash_map<Key, Value>>>
+ _internal_map;
+};
+
// This class used to manage all the fragment execute in this instance
class FragmentMgr : public RestMonitorIface {
public:
@@ -131,10 +171,7 @@ public:
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
- int32_t running_query_num() {
- std::shared_lock lock(_query_ctx_map_mutex);
- return _query_ctx_map.size();
- }
+ int32_t running_query_num() { return _query_ctx_map.num_items(); }
std::string dump_pipeline_tasks(int64_t duration = 0);
std::string dump_pipeline_tasks(TUniqueId& query_id);
@@ -164,21 +201,14 @@ private:
// This is input params
ExecEnv* _exec_env = nullptr;
- // The lock protect the `_pipeline_map`
- std::shared_mutex _pipeline_map_mutex;
// (QueryID, FragmentID) -> PipelineFragmentContext
- phmap::flat_hash_map<std::pair<TUniqueId, int>,
- std::shared_ptr<pipeline::PipelineFragmentContext>>
+ ConcurrentContextMap<std::pair<TUniqueId, int>,
+ std::shared_ptr<pipeline::PipelineFragmentContext>,
+ pipeline::PipelineFragmentContext>
_pipeline_map;
- // The lock should only be used to protect the structures in fragment
manager. Has to be
- // used in a very small scope because it may dead lock. For example, if
the _lock is used
- // in prepare stage, the call path is prepare --> expr prepare --> may
call allocator
- // when allocate failed, allocator may call query_is_cancelled, query is
callced will also
- // call _lock, so that there is dead lock.
- std::shared_mutex _query_ctx_map_mutex;
// query id -> QueryContext
- phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>
_query_ctx_map;
+ ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext>
_query_ctx_map;
std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>>
_bf_size_map;
CountDownLatch _stop_background_threads_latch;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]