This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1c3228add1668b5b27daaa3eab078301cadee349 Author: Pxl <pxl...@qq.com> AuthorDate: Thu Sep 19 10:56:31 2024 +0800 [Improvement](load) make load profile more real-time (#40924) ## Proposed changes make load profile more real-time --- be/src/runtime/fragment_mgr.cpp | 334 ++++++++++++++++++++-------------------- 1 file changed, 163 insertions(+), 171 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 5bb01dc9ba5..6c61f9fa656 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -324,208 +324,200 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { DCHECK(req.runtime_state != nullptr); - if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) { - // this is a load plan, and load is not finished, just make a brief report + if (req.runtime_state->query_type() == TQueryType::LOAD) { params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); - } else { - if (req.runtime_state->query_type() == TQueryType::LOAD) { - params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); - params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); - } - params.__isset.detailed_report = true; - DCHECK(!req.runtime_states.empty()); - const bool enable_profile = (*req.runtime_states.begin())->enable_profile(); - if (enable_profile) { - params.__isset.profile = true; - params.__isset.loadChannelProfile = false; - for (auto* rs : req.runtime_states) { - DCHECK(req.load_channel_profile); - TDetailedReportParams detailed_param; - rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile); - // merge all runtime_states.loadChannelProfile to req.load_channel_profile - req.load_channel_profile->update(detailed_param.loadChannelProfile); - } - req.load_channel_profile->to_thrift(¶ms.loadChannelProfile); - } else { - params.__isset.profile = false; + } + params.__isset.detailed_report = true; + DCHECK(!req.runtime_states.empty()); + const bool enable_profile = (*req.runtime_states.begin())->enable_profile(); + if (enable_profile) { + params.__isset.profile = true; + params.__isset.loadChannelProfile = false; + for (auto* rs : req.runtime_states) { + DCHECK(req.load_channel_profile); + TDetailedReportParams detailed_param; + rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile); + // merge all runtime_states.loadChannelProfile to req.load_channel_profile + req.load_channel_profile->update(detailed_param.loadChannelProfile); } + req.load_channel_profile->to_thrift(¶ms.loadChannelProfile); + } else { + params.__isset.profile = false; + } - if (enable_profile) { - DCHECK(req.profile != nullptr); + if (enable_profile) { + DCHECK(req.profile != nullptr); + TDetailedReportParams detailed_param; + detailed_param.__isset.fragment_instance_id = false; + detailed_param.__isset.profile = true; + detailed_param.__isset.loadChannelProfile = false; + detailed_param.__set_is_fragment_level(true); + req.profile->to_thrift(&detailed_param.profile); + params.detailed_report.push_back(detailed_param); + for (auto pipeline_profile : req.runtime_state->pipeline_id_to_profile()) { TDetailedReportParams detailed_param; detailed_param.__isset.fragment_instance_id = false; detailed_param.__isset.profile = true; detailed_param.__isset.loadChannelProfile = false; - detailed_param.__set_is_fragment_level(true); - req.profile->to_thrift(&detailed_param.profile); - params.detailed_report.push_back(detailed_param); - for (auto pipeline_profile : req.runtime_state->pipeline_id_to_profile()) { - TDetailedReportParams detailed_param; - detailed_param.__isset.fragment_instance_id = false; - detailed_param.__isset.profile = true; - detailed_param.__isset.loadChannelProfile = false; - pipeline_profile->to_thrift(&detailed_param.profile); - params.detailed_report.push_back(std::move(detailed_param)); - } + pipeline_profile->to_thrift(&detailed_param.profile); + params.detailed_report.push_back(std::move(detailed_param)); } - if (!req.runtime_state->output_files().empty()) { - params.__isset.delta_urls = true; - for (auto& it : req.runtime_state->output_files()) { + } + if (!req.runtime_state->output_files().empty()) { + params.__isset.delta_urls = true; + for (auto& it : req.runtime_state->output_files()) { + params.delta_urls.push_back(to_http_path(it)); + } + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + for (auto& it : rs->output_files()) { params.delta_urls.push_back(to_http_path(it)); } - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - for (auto& it : rs->output_files()) { - params.delta_urls.push_back(to_http_path(it)); - } - } - if (!params.delta_urls.empty()) { - params.__isset.delta_urls = true; - } } + if (!params.delta_urls.empty()) { + params.__isset.delta_urls = true; + } + } - // load rows - static std::string s_dpp_normal_all = "dpp.norm.ALL"; - static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; - static std::string s_unselected_rows = "unselected.rows"; - int64_t num_rows_load_success = 0; - int64_t num_rows_load_filtered = 0; - int64_t num_rows_load_unselected = 0; - if (req.runtime_state->num_rows_load_total() > 0 || - req.runtime_state->num_rows_load_filtered() > 0 || - req.runtime_state->num_finished_range() > 0) { - params.__isset.load_counters = true; - - num_rows_load_success = req.runtime_state->num_rows_load_success(); - num_rows_load_filtered = req.runtime_state->num_rows_load_filtered(); - num_rows_load_unselected = req.runtime_state->num_rows_load_unselected(); - params.__isset.fragment_instance_reports = true; - TFragmentInstanceReport t; - t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id()); - t.__set_num_finished_range(req.runtime_state->num_finished_range()); - params.fragment_instance_reports.push_back(t); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 || - req.runtime_state->num_finished_range() > 0) { - params.__isset.load_counters = true; - num_rows_load_success += rs->num_rows_load_success(); - num_rows_load_filtered += rs->num_rows_load_filtered(); - num_rows_load_unselected += rs->num_rows_load_unselected(); - params.__isset.fragment_instance_reports = true; - TFragmentInstanceReport t; - t.__set_fragment_instance_id(rs->fragment_instance_id()); - t.__set_num_finished_range(rs->num_finished_range()); - params.fragment_instance_reports.push_back(t); - } + // load rows + static std::string s_dpp_normal_all = "dpp.norm.ALL"; + static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; + static std::string s_unselected_rows = "unselected.rows"; + int64_t num_rows_load_success = 0; + int64_t num_rows_load_filtered = 0; + int64_t num_rows_load_unselected = 0; + if (req.runtime_state->num_rows_load_total() > 0 || + req.runtime_state->num_rows_load_filtered() > 0 || + req.runtime_state->num_finished_range() > 0) { + params.__isset.load_counters = true; + + num_rows_load_success = req.runtime_state->num_rows_load_success(); + num_rows_load_filtered = req.runtime_state->num_rows_load_filtered(); + num_rows_load_unselected = req.runtime_state->num_rows_load_unselected(); + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id()); + t.__set_num_finished_range(req.runtime_state->num_finished_range()); + params.fragment_instance_reports.push_back(t); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 || + req.runtime_state->num_finished_range() > 0) { + params.__isset.load_counters = true; + num_rows_load_success += rs->num_rows_load_success(); + num_rows_load_filtered += rs->num_rows_load_filtered(); + num_rows_load_unselected += rs->num_rows_load_unselected(); + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(rs->fragment_instance_id()); + t.__set_num_finished_range(rs->num_finished_range()); + params.fragment_instance_reports.push_back(t); } } - params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); - params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); - params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); - - if (!req.runtime_state->get_error_log_file_path().empty()) { - params.__set_tracking_url( - to_load_error_http_path(req.runtime_state->get_error_log_file_path())); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->get_error_log_file_path().empty()) { - params.__set_tracking_url( - to_load_error_http_path(rs->get_error_log_file_path())); - } - if (rs->wal_id() > 0) { - params.__set_txn_id(rs->wal_id()); - params.__set_label(rs->import_label()); - } + } + params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); + params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); + params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); + + if (!req.runtime_state->get_error_log_file_path().empty()) { + params.__set_tracking_url( + to_load_error_http_path(req.runtime_state->get_error_log_file_path())); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->get_error_log_file_path().empty()) { + params.__set_tracking_url(to_load_error_http_path(rs->get_error_log_file_path())); } - } - if (!req.runtime_state->export_output_files().empty()) { - params.__isset.export_files = true; - params.export_files = req.runtime_state->export_output_files(); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->export_output_files().empty()) { - params.__isset.export_files = true; - params.export_files.insert(params.export_files.end(), - rs->export_output_files().begin(), - rs->export_output_files().end()); - } + if (rs->wal_id() > 0) { + params.__set_txn_id(rs->wal_id()); + params.__set_label(rs->import_label()); } } - if (!req.runtime_state->tablet_commit_infos().empty()) { - params.__isset.commitInfos = true; - params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size()); - for (auto& info : req.runtime_state->tablet_commit_infos()) { - params.commitInfos.push_back(info); - } - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->tablet_commit_infos().empty()) { - params.__isset.commitInfos = true; - params.commitInfos.insert(params.commitInfos.end(), - rs->tablet_commit_infos().begin(), - rs->tablet_commit_infos().end()); - } + } + if (!req.runtime_state->export_output_files().empty()) { + params.__isset.export_files = true; + params.export_files = req.runtime_state->export_output_files(); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->export_output_files().empty()) { + params.__isset.export_files = true; + params.export_files.insert(params.export_files.end(), + rs->export_output_files().begin(), + rs->export_output_files().end()); } } - if (!req.runtime_state->error_tablet_infos().empty()) { - params.__isset.errorTabletInfos = true; - params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size()); - for (auto& info : req.runtime_state->error_tablet_infos()) { - params.errorTabletInfos.push_back(info); + } + if (!req.runtime_state->tablet_commit_infos().empty()) { + params.__isset.commitInfos = true; + params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size()); + for (auto& info : req.runtime_state->tablet_commit_infos()) { + params.commitInfos.push_back(info); + } + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->tablet_commit_infos().empty()) { + params.__isset.commitInfos = true; + params.commitInfos.insert(params.commitInfos.end(), + rs->tablet_commit_infos().begin(), + rs->tablet_commit_infos().end()); } - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->error_tablet_infos().empty()) { - params.__isset.errorTabletInfos = true; - params.errorTabletInfos.insert(params.errorTabletInfos.end(), - rs->error_tablet_infos().begin(), - rs->error_tablet_infos().end()); - } + } + } + if (!req.runtime_state->error_tablet_infos().empty()) { + params.__isset.errorTabletInfos = true; + params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size()); + for (auto& info : req.runtime_state->error_tablet_infos()) { + params.errorTabletInfos.push_back(info); + } + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->error_tablet_infos().empty()) { + params.__isset.errorTabletInfos = true; + params.errorTabletInfos.insert(params.errorTabletInfos.end(), + rs->error_tablet_infos().begin(), + rs->error_tablet_infos().end()); } } + } - if (!req.runtime_state->hive_partition_updates().empty()) { - params.__isset.hive_partition_updates = true; - params.hive_partition_updates.reserve( - req.runtime_state->hive_partition_updates().size()); - for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) { - params.hive_partition_updates.push_back(hive_partition_update); - } - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->hive_partition_updates().empty()) { - params.__isset.hive_partition_updates = true; - params.hive_partition_updates.insert(params.hive_partition_updates.end(), - rs->hive_partition_updates().begin(), - rs->hive_partition_updates().end()); - } + if (!req.runtime_state->hive_partition_updates().empty()) { + params.__isset.hive_partition_updates = true; + params.hive_partition_updates.reserve(req.runtime_state->hive_partition_updates().size()); + for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) { + params.hive_partition_updates.push_back(hive_partition_update); + } + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->hive_partition_updates().empty()) { + params.__isset.hive_partition_updates = true; + params.hive_partition_updates.insert(params.hive_partition_updates.end(), + rs->hive_partition_updates().begin(), + rs->hive_partition_updates().end()); } } + } - if (!req.runtime_state->iceberg_commit_datas().empty()) { - params.__isset.iceberg_commit_datas = true; - params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size()); - for (auto& iceberg_commit_data : req.runtime_state->iceberg_commit_datas()) { - params.iceberg_commit_datas.push_back(iceberg_commit_data); - } - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->iceberg_commit_datas().empty()) { - params.__isset.iceberg_commit_datas = true; - params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), - rs->iceberg_commit_datas().begin(), - rs->iceberg_commit_datas().end()); - } + if (!req.runtime_state->iceberg_commit_datas().empty()) { + params.__isset.iceberg_commit_datas = true; + params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size()); + for (auto& iceberg_commit_data : req.runtime_state->iceberg_commit_datas()) { + params.iceberg_commit_datas.push_back(iceberg_commit_data); + } + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->iceberg_commit_datas().empty()) { + params.__isset.iceberg_commit_datas = true; + params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), + rs->iceberg_commit_datas().begin(), + rs->iceberg_commit_datas().end()); } } - - // Send new errors to coordinator - req.runtime_state->get_unreported_errors(&(params.error_log)); - params.__isset.error_log = (params.error_log.size() > 0); } + // Send new errors to coordinator + req.runtime_state->get_unreported_errors(&(params.error_log)); + params.__isset.error_log = (!params.error_log.empty()); + if (_exec_env->master_info()->__isset.backend_id) { params.__set_backend_id(_exec_env->master_info()->backend_id); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org