This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ee9240c3a65 [refactor](fragment) Use fragment ID to manage fragment
context (#42048) (#42258)
ee9240c3a65 is described below
commit ee9240c3a658dcaea90a838f39bf330fe5888d32
Author: Gabriel <[email protected]>
AuthorDate: Wed Oct 23 10:10:57 2024 +0800
[refactor](fragment) Use fragment ID to manage fragment context (#42048)
(#42258)
pick #42048
Use fragment ID to manage fragment context
---
be/src/pipeline/pipeline_fragment_context.cpp | 9 +--
be/src/pipeline/pipeline_fragment_context.h | 14 -----
be/src/runtime/fragment_mgr.cpp | 68 +++++-----------------
be/src/runtime/fragment_mgr.h | 8 +--
be/src/runtime/query_context.h | 2 -
be/src/runtime/runtime_filter_mgr.cpp | 16 +++--
be/src/runtime/runtime_filter_mgr.h | 1 -
be/src/service/backend_service.cpp | 7 ---
be/src/service/backend_service.h | 2 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 24 +++++---
gensrc/proto/internal_service.proto | 1 +
gensrc/thrift/PaloInternalService.thrift | 1 +
12 files changed, 51 insertions(+), 102 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8c998ab8c2f..b1ee5933d27 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -585,10 +585,7 @@ Status
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
void PipelineFragmentContext::_init_next_report_time() {
auto interval_s = config::pipeline_status_report_interval;
if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
- std::vector<string> ins_ids;
- instance_ids(ins_ids);
- VLOG_FILE << "enable period report: instance_id="
- << fmt::format("{}", fmt::join(ins_ids, ", "));
+ VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) *
NANOS_PER_SEC;
// We don't want to wait longer than it takes to run the entire
fragment.
_previous_report_time =
@@ -626,11 +623,9 @@ void
PipelineFragmentContext::trigger_report_if_necessary() {
return;
}
if (VLOG_FILE_IS_ON) {
- std::vector<string> ins_ids;
- instance_ids(ins_ids);
VLOG_FILE << "Reporting "
<< "profile for query_id " << print_id(_query_id)
- << ", instance ids: " << fmt::format("{}",
fmt::join(ins_ids, ", "));
+ << ", fragment id: " << _fragment_id;
std::stringstream ss;
_runtime_state->runtime_profile()->compute_time_in_profile();
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index b20c324756c..822a23c54bd 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -117,20 +117,6 @@ public:
[[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; }
- void instance_ids(std::vector<TUniqueId>& ins_ids) const {
- ins_ids.resize(_fragment_instance_ids.size());
- for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
- ins_ids[i] = _fragment_instance_ids[i];
- }
- }
-
- void instance_ids(std::vector<string>& ins_ids) const {
- ins_ids.resize(_fragment_instance_ids.size());
- for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
- ins_ids[i] = print_id(_fragment_instance_ids[i]);
- }
- }
-
void clear_finished_tasks() {
for (size_t j = 0; j < _tasks.size(); j++) {
for (size_t i = 0; i < _tasks[j].size(); i++) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 577b7ffa157..7ba73442c90 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -106,7 +106,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size,
MetricUnit::NOUNIT);
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr",
"prepare");
-bvar::Adder<int64_t>
g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count");
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
bvar::Status<uint64_t> g_fragment_last_active_time(
@@ -637,18 +636,13 @@ void FragmentMgr::remove_pipeline_context(
{
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
- std::vector<TUniqueId> ins_ids;
- f_context->instance_ids(ins_ids);
int64 now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << -1;
g_fragment_last_active_time.set_value(now);
- for (const auto& ins_id : ins_ids) {
- LOG_INFO("Removing query {} instance {}", print_id(query_id),
print_id(ins_id));
- _pipeline_map.erase(ins_id);
- g_pipeline_fragment_instances_count << -1;
- }
+ LOG_INFO("Removing query {} fragment {}", print_id(query_id),
f_context->get_fragment_id());
+ _pipeline_map.erase({query_id, f_context->get_fragment_id()});
}
}
@@ -782,11 +776,10 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t
duration) {
continue;
}
auto timeout_second = it.second->timeout_second();
- fmt::format_to(debug_string_buffer,
- "No.{} (elapse_second={}s,
query_timeout_second={}s, instance_id="
- "{}, is_timeout={}) : {}\n",
- i, elapsed, timeout_second, print_id(it.first),
- it.second->is_timeout(now),
it.second->debug_string());
+ fmt::format_to(
+ debug_string_buffer,
+ "No.{} (elapse_second={}s, query_timeout_second={}s,
is_timeout={}) : {}\n", i,
+ elapsed, timeout_second, it.second->is_timeout(now),
it.second->debug_string());
i++;
}
}
@@ -846,11 +839,10 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
for (const auto& local_param : params.local_params) {
const TUniqueId& fragment_instance_id =
local_param.fragment_instance_id;
std::lock_guard<std::mutex> lock(_lock);
- auto iter = _pipeline_map.find(fragment_instance_id);
+ auto iter = _pipeline_map.find({params.query_id, params.fragment_id});
if (iter != _pipeline_map.end()) {
- return Status::InternalError(
- "exec_plan_fragment input duplicated
fragment_instance_id({})",
- UniqueId(fragment_instance_id).to_string());
+ return Status::InternalError("exec_plan_fragment input duplicated
fragment_id({})",
+ params.fragment_id);
}
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
@@ -866,12 +858,8 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
std::lock_guard<std::mutex> lock(_lock);
- std::vector<TUniqueId> ins_ids;
- context->instance_ids(ins_ids);
// TODO: simplify this mapping
- for (const auto& ins_id : ins_ids) {
- _pipeline_map.insert({ins_id, context});
- }
+ _pipeline_map.insert({{params.query_id, params.fragment_id}, context});
}
query_ctx->set_pipeline_context(params.fragment_id, context);
@@ -916,31 +904,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id,
const Status reason) {
<< " is cancelled and removed. Reason: " << reason.to_string();
}
-void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status
reason) {
- std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx;
- {
- std::lock_guard<std::mutex> state_lock(_lock);
- DCHECK(!_pipeline_map.contains(instance_id))
- << " Pipeline tasks should be canceled by query instead of
instance! Query ID: "
- << print_id(_pipeline_map[instance_id]->get_query_id());
- const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
- if (is_pipeline_instance) {
- auto itr = _pipeline_map.find(instance_id);
- if (itr != _pipeline_map.end()) {
- pipeline_ctx = itr->second;
- } else {
- LOG(WARNING) << "Could not find the pipeline instance id:" <<
print_id(instance_id)
- << " to cancel";
- return;
- }
- }
- }
-
- if (pipeline_ctx != nullptr) {
- pipeline_ctx->cancel(reason);
- }
-}
-
void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
@@ -1206,15 +1169,16 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
RuntimeFilterMgr* runtime_filter_mgr = nullptr;
- const auto& fragment_instance_ids = request->fragment_instance_ids();
+ const auto& fragment_ids = request->fragment_ids();
{
std::unique_lock<std::mutex> lock(_lock);
- for (UniqueId fragment_instance_id : fragment_instance_ids) {
- TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
+ for (auto fragment_id : fragment_ids) {
if (is_pipeline) {
- auto iter = _pipeline_map.find(tfragment_instance_id);
+ auto iter = _pipeline_map.find(
+ {UniqueId(request->query_id()).to_thrift(),
fragment_id});
if (iter == _pipeline_map.end()) {
+ LOG(WARNING) << "No pipeline fragment is found: Query-ID =
"
+ << request->query_id() << " fragment_id = "
<< fragment_id;
continue;
}
pip_context = iter->second;
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index bc066066f7b..41b63db0b23 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -100,9 +100,6 @@ public:
Status trigger_pipeline_context_report(const ReportStatusRequest,
std::shared_ptr<pipeline::PipelineFragmentContext>&&);
- // Cancel instance (pipeline or nonpipeline).
- void cancel_instance(const TUniqueId instance_id, const Status reason);
-
// Can be used in both version.
void cancel_query(const TUniqueId query_id, const Status reason);
@@ -169,7 +166,10 @@ private:
// call _lock, so that there is dead lock.
std::mutex _lock;
- std::unordered_map<TUniqueId,
std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;
+ // (QueryID, FragmentID) -> PipelineFragmentContext
+ std::unordered_map<std::pair<TUniqueId, int>,
+ std::shared_ptr<pipeline::PipelineFragmentContext>>
+ _pipeline_map;
// query id -> QueryContext
std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 7a6d6d3c53d..afc86f404cb 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -195,8 +195,6 @@ public:
ThreadPool* get_memtable_flush_pool();
- std::vector<TUniqueId> get_fragment_instance_ids() const { return
fragment_instance_ids; }
-
int64_t mem_limit() const { return _bytes_limit; }
void set_merge_controller_handler(
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index fddf81b434b..a4631cfaba7 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -228,7 +228,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
// so we need to copy to cnt_val
cnt_val->producer_size = producer_size;
cnt_val->runtime_filter_desc = *runtime_filter_desc;
- cnt_val->target_info = *target_info;
cnt_val->pool.reset(new ObjectPool());
cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state,
runtime_filter_desc));
@@ -458,10 +457,17 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
}
closure->cntl_->set_timeout_ms(std::min(3600,
_state->execution_timeout) * 1000);
// set fragment-id
- for (auto& target_fragment_instance_id :
target.target_fragment_instance_ids) {
- PUniqueId* cur_id =
closure->request_->add_fragment_instance_ids();
- cur_id->set_hi(target_fragment_instance_id.hi);
- cur_id->set_lo(target_fragment_instance_id.lo);
+ if (target.__isset.target_fragment_ids) {
+ for (auto& target_fragment_id : target.target_fragment_ids) {
+ closure->request_->add_fragment_ids(target_fragment_id);
+ }
+ } else {
+ // FE not upgraded yet.
+ for (auto& target_fragment_instance_id :
target.target_fragment_instance_ids) {
+ PUniqueId* cur_id =
closure->request_->add_fragment_instance_ids();
+ cur_id->set_hi(target_fragment_instance_id.hi);
+ cur_id->set_lo(target_fragment_instance_id.lo);
+ }
}
std::shared_ptr<PBackendService_Stub> stub(
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index d89a3b9f1b1..b0aea7568cf 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -168,7 +168,6 @@ public:
int producer_size;
uint64_t global_size;
TRuntimeFilterDesc runtime_filter_desc;
- std::vector<doris::TRuntimeFilterTargetParams> target_info;
std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
IRuntimeFilter* filter = nullptr;
std::unordered_set<UniqueId> arrive_id;
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index aa29661da02..d56aa49b19b 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -657,13 +657,6 @@ Status BaseBackendService::start_plan_fragment_execution(
QuerySource::INTERNAL_FRONTEND);
}
-void BaseBackendService::cancel_plan_fragment(TCancelPlanFragmentResult&
return_val,
- const TCancelPlanFragmentParams&
params) {
- LOG(INFO) << "cancel_plan_fragment(): instance_id=" <<
print_id(params.fragment_instance_id);
- _exec_env->fragment_mgr()->cancel_instance(
- params.fragment_instance_id, Status::InternalError("cancel message
received from FE"));
-}
-
void BaseBackendService::transmit_data(TTransmitDataResult& return_val,
const TTransmitDataParams& params) {
VLOG_ROW << "transmit_data(): instance_id=" <<
params.dest_fragment_instance_id
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 4d01107ba8a..1d4219e2191 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -90,7 +90,7 @@ public:
const TExecPlanFragmentParams& params) override;
void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
- const TCancelPlanFragmentParams& params)
override;
+ const TCancelPlanFragmentParams& params)
override {};
void transmit_data(TTransmitDataResult& return_val, const
TTransmitDataParams& params) override;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8f558726bff..b9f90242a29 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1995,7 +1995,8 @@ public class Coordinator implements CoordInterface {
List<FRuntimeFilterTargetParam> targetFragments =
ridToTargetParam.computeIfAbsent(rid,
k -> new ArrayList<>());
for (final FInstanceExecParam instance :
params.instanceExecParams) {
- targetFragments.add(new
FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host)));
+ targetFragments.add(new
FRuntimeFilterTargetParam(instance.fragment().getFragmentId().asInt(),
+ toBrpcHost(instance.host)));
}
}
@@ -3179,8 +3180,8 @@ public class Coordinator implements CoordInterface {
for (FRuntimeFilterTargetParam targetParam :
fParams) {
if
(targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
- .target_fragment_instance_ids
-
.add(targetParam.targetFragmentInstanceId);
+ .target_fragment_ids
+ .add(targetParam.targetFragmentId);
} else {
targetParamsV2.put(targetParam.targetFragmentInstanceAddr,
new
TRuntimeFilterTargetParamsV2());
@@ -3188,11 +3189,15 @@ public class Coordinator implements CoordInterface {
.target_fragment_instance_addr
=
targetParam.targetFragmentInstanceAddr;
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
- .target_fragment_instance_ids
+ .target_fragment_ids
= new ArrayList<>();
+
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
+ .target_fragment_ids
+ .add(targetParam.targetFragmentId);
+ // `target_fragment_instance_ids` is a
required field
targetParamsV2.get(targetParam.targetFragmentInstanceAddr)
.target_fragment_instance_ids
-
.add(targetParam.targetFragmentInstanceId);
+ = new ArrayList<>();
}
}
@@ -3201,7 +3206,8 @@ public class Coordinator implements CoordInterface {
} else {
List<TRuntimeFilterTargetParams> targetParams =
Lists.newArrayList();
for (FRuntimeFilterTargetParam targetParam :
fParams) {
- targetParams.add(new
TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
+ // Instance id make no sense if this runtime
filter doesn't have remote targets.
+ targetParams.add(new
TRuntimeFilterTargetParams(new TUniqueId(),
targetParam.targetFragmentInstanceAddr));
}
localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(),
@@ -3371,12 +3377,12 @@ public class Coordinator implements CoordInterface {
// Runtime filter target fragment instance param
static class FRuntimeFilterTargetParam {
- public TUniqueId targetFragmentInstanceId;
+ public int targetFragmentId;
public TNetworkAddress targetFragmentInstanceAddr;
- public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) {
- this.targetFragmentInstanceId = id;
+ public FRuntimeFilterTargetParam(int id, TNetworkAddress host) {
+ this.targetFragmentId = id;
this.targetFragmentInstanceAddr = host;
}
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 9abf9d7ea65..f3764cea233 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -613,6 +613,7 @@ message PPublishFilterRequestV2 {
optional int64 merge_time = 9;
optional bool contain_null = 10;
optional bool ignored = 11;
+ repeated int32 fragment_ids = 12;
};
message PPublishFilterResponse {
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 0a3496ca434..5570019ee2b 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -368,6 +368,7 @@ struct TRuntimeFilterTargetParamsV2 {
1: required list<Types.TUniqueId> target_fragment_instance_ids
// The address of the instance where the fragment is expected to run
2: required Types.TNetworkAddress target_fragment_instance_addr
+ 3: optional list<i32> target_fragment_ids
}
struct TRuntimeFilterParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]