This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 1c3228add1668b5b27daaa3eab078301cadee349
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Sep 19 10:56:31 2024 +0800

    [Improvement](load) make load profile more real-time (#40924)
    
    ## Proposed changes
    make load profile more real-time
---
 be/src/runtime/fragment_mgr.cpp | 334 ++++++++++++++++++++--------------------
 1 file changed, 163 insertions(+), 171 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 5bb01dc9ba5..6c61f9fa656 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -324,208 +324,200 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
 
     DCHECK(req.runtime_state != nullptr);
 
-    if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && 
req.status.ok()) {
-        // this is a load plan, and load is not finished, just make a brief 
report
+    if (req.runtime_state->query_type() == TQueryType::LOAD) {
         params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
         params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
-    } else {
-        if (req.runtime_state->query_type() == TQueryType::LOAD) {
-            params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
-            
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
-        }
-        params.__isset.detailed_report = true;
-        DCHECK(!req.runtime_states.empty());
-        const bool enable_profile = 
(*req.runtime_states.begin())->enable_profile();
-        if (enable_profile) {
-            params.__isset.profile = true;
-            params.__isset.loadChannelProfile = false;
-            for (auto* rs : req.runtime_states) {
-                DCHECK(req.load_channel_profile);
-                TDetailedReportParams detailed_param;
-                
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
-                // merge all runtime_states.loadChannelProfile to 
req.load_channel_profile
-                
req.load_channel_profile->update(detailed_param.loadChannelProfile);
-            }
-            req.load_channel_profile->to_thrift(&params.loadChannelProfile);
-        } else {
-            params.__isset.profile = false;
+    }
+    params.__isset.detailed_report = true;
+    DCHECK(!req.runtime_states.empty());
+    const bool enable_profile = 
(*req.runtime_states.begin())->enable_profile();
+    if (enable_profile) {
+        params.__isset.profile = true;
+        params.__isset.loadChannelProfile = false;
+        for (auto* rs : req.runtime_states) {
+            DCHECK(req.load_channel_profile);
+            TDetailedReportParams detailed_param;
+            
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
+            // merge all runtime_states.loadChannelProfile to 
req.load_channel_profile
+            
req.load_channel_profile->update(detailed_param.loadChannelProfile);
         }
+        req.load_channel_profile->to_thrift(&params.loadChannelProfile);
+    } else {
+        params.__isset.profile = false;
+    }
 
-        if (enable_profile) {
-            DCHECK(req.profile != nullptr);
+    if (enable_profile) {
+        DCHECK(req.profile != nullptr);
+        TDetailedReportParams detailed_param;
+        detailed_param.__isset.fragment_instance_id = false;
+        detailed_param.__isset.profile = true;
+        detailed_param.__isset.loadChannelProfile = false;
+        detailed_param.__set_is_fragment_level(true);
+        req.profile->to_thrift(&detailed_param.profile);
+        params.detailed_report.push_back(detailed_param);
+        for (auto pipeline_profile : 
req.runtime_state->pipeline_id_to_profile()) {
             TDetailedReportParams detailed_param;
             detailed_param.__isset.fragment_instance_id = false;
             detailed_param.__isset.profile = true;
             detailed_param.__isset.loadChannelProfile = false;
-            detailed_param.__set_is_fragment_level(true);
-            req.profile->to_thrift(&detailed_param.profile);
-            params.detailed_report.push_back(detailed_param);
-            for (auto pipeline_profile : 
req.runtime_state->pipeline_id_to_profile()) {
-                TDetailedReportParams detailed_param;
-                detailed_param.__isset.fragment_instance_id = false;
-                detailed_param.__isset.profile = true;
-                detailed_param.__isset.loadChannelProfile = false;
-                pipeline_profile->to_thrift(&detailed_param.profile);
-                params.detailed_report.push_back(std::move(detailed_param));
-            }
+            pipeline_profile->to_thrift(&detailed_param.profile);
+            params.detailed_report.push_back(std::move(detailed_param));
         }
-        if (!req.runtime_state->output_files().empty()) {
-            params.__isset.delta_urls = true;
-            for (auto& it : req.runtime_state->output_files()) {
+    }
+    if (!req.runtime_state->output_files().empty()) {
+        params.__isset.delta_urls = true;
+        for (auto& it : req.runtime_state->output_files()) {
+            params.delta_urls.push_back(to_http_path(it));
+        }
+    } else if (!req.runtime_states.empty()) {
+        for (auto* rs : req.runtime_states) {
+            for (auto& it : rs->output_files()) {
                 params.delta_urls.push_back(to_http_path(it));
             }
-        } else if (!req.runtime_states.empty()) {
-            for (auto* rs : req.runtime_states) {
-                for (auto& it : rs->output_files()) {
-                    params.delta_urls.push_back(to_http_path(it));
-                }
-            }
-            if (!params.delta_urls.empty()) {
-                params.__isset.delta_urls = true;
-            }
         }
+        if (!params.delta_urls.empty()) {
+            params.__isset.delta_urls = true;
+        }
+    }
 
-        // load rows
-        static std::string s_dpp_normal_all = "dpp.norm.ALL";
-        static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
-        static std::string s_unselected_rows = "unselected.rows";
-        int64_t num_rows_load_success = 0;
-        int64_t num_rows_load_filtered = 0;
-        int64_t num_rows_load_unselected = 0;
-        if (req.runtime_state->num_rows_load_total() > 0 ||
-            req.runtime_state->num_rows_load_filtered() > 0 ||
-            req.runtime_state->num_finished_range() > 0) {
-            params.__isset.load_counters = true;
-
-            num_rows_load_success = req.runtime_state->num_rows_load_success();
-            num_rows_load_filtered = 
req.runtime_state->num_rows_load_filtered();
-            num_rows_load_unselected = 
req.runtime_state->num_rows_load_unselected();
-            params.__isset.fragment_instance_reports = true;
-            TFragmentInstanceReport t;
-            
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
-            
t.__set_num_finished_range(req.runtime_state->num_finished_range());
-            params.fragment_instance_reports.push_back(t);
-        } else if (!req.runtime_states.empty()) {
-            for (auto* rs : req.runtime_states) {
-                if (rs->num_rows_load_total() > 0 || 
rs->num_rows_load_filtered() > 0 ||
-                    req.runtime_state->num_finished_range() > 0) {
-                    params.__isset.load_counters = true;
-                    num_rows_load_success += rs->num_rows_load_success();
-                    num_rows_load_filtered += rs->num_rows_load_filtered();
-                    num_rows_load_unselected += rs->num_rows_load_unselected();
-                    params.__isset.fragment_instance_reports = true;
-                    TFragmentInstanceReport t;
-                    t.__set_fragment_instance_id(rs->fragment_instance_id());
-                    t.__set_num_finished_range(rs->num_finished_range());
-                    params.fragment_instance_reports.push_back(t);
-                }
+    // load rows
+    static std::string s_dpp_normal_all = "dpp.norm.ALL";
+    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
+    static std::string s_unselected_rows = "unselected.rows";
+    int64_t num_rows_load_success = 0;
+    int64_t num_rows_load_filtered = 0;
+    int64_t num_rows_load_unselected = 0;
+    if (req.runtime_state->num_rows_load_total() > 0 ||
+        req.runtime_state->num_rows_load_filtered() > 0 ||
+        req.runtime_state->num_finished_range() > 0) {
+        params.__isset.load_counters = true;
+
+        num_rows_load_success = req.runtime_state->num_rows_load_success();
+        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
+        num_rows_load_unselected = 
req.runtime_state->num_rows_load_unselected();
+        params.__isset.fragment_instance_reports = true;
+        TFragmentInstanceReport t;
+        
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
+        t.__set_num_finished_range(req.runtime_state->num_finished_range());
+        params.fragment_instance_reports.push_back(t);
+    } else if (!req.runtime_states.empty()) {
+        for (auto* rs : req.runtime_states) {
+            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() 
> 0 ||
+                req.runtime_state->num_finished_range() > 0) {
+                params.__isset.load_counters = true;
+                num_rows_load_success += rs->num_rows_load_success();
+                num_rows_load_filtered += rs->num_rows_load_filtered();
+                num_rows_load_unselected += rs->num_rows_load_unselected();
+                params.__isset.fragment_instance_reports = true;
+                TFragmentInstanceReport t;
+                t.__set_fragment_instance_id(rs->fragment_instance_id());
+                t.__set_num_finished_range(rs->num_finished_range());
+                params.fragment_instance_reports.push_back(t);
             }
         }
-        params.load_counters.emplace(s_dpp_normal_all, 
std::to_string(num_rows_load_success));
-        params.load_counters.emplace(s_dpp_abnormal_all, 
std::to_string(num_rows_load_filtered));
-        params.load_counters.emplace(s_unselected_rows, 
std::to_string(num_rows_load_unselected));
-
-        if (!req.runtime_state->get_error_log_file_path().empty()) {
-            params.__set_tracking_url(
-                    
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
-        } else if (!req.runtime_states.empty()) {
-            for (auto* rs : req.runtime_states) {
-                if (!rs->get_error_log_file_path().empty()) {
-                    params.__set_tracking_url(
-                            
to_load_error_http_path(rs->get_error_log_file_path()));
-                }
-                if (rs->wal_id() > 0) {
-                    params.__set_txn_id(rs->wal_id());
-                    params.__set_label(rs->import_label());
-                }
+    }
+    params.load_counters.emplace(s_dpp_normal_all, 
std::to_string(num_rows_load_success));
+    params.load_counters.emplace(s_dpp_abnormal_all, 
std::to_string(num_rows_load_filtered));
+    params.load_counters.emplace(s_unselected_rows, 
std::to_string(num_rows_load_unselected));
+
+    if (!req.runtime_state->get_error_log_file_path().empty()) {
+        params.__set_tracking_url(
+                
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
+    } else if (!req.runtime_states.empty()) {
+        for (auto* rs : req.runtime_states) {
+            if (!rs->get_error_log_file_path().empty()) {
+                
params.__set_tracking_url(to_load_error_http_path(rs->get_error_log_file_path()));
             }
-        }
-        if (!req.runtime_state->export_output_files().empty()) {
-            params.__isset.export_files = true;
-            params.export_files = req.runtime_state->export_output_files();
-        } else if (!req.runtime_states.empty()) {
-            for (auto* rs : req.runtime_states) {
-                if (!rs->export_output_files().empty()) {
-                    params.__isset.export_files = true;
-                    params.export_files.insert(params.export_files.end(),
-                                               
rs->export_output_files().begin(),
-                                               
rs->export_output_files().end());
-                }
+            if (rs->wal_id() > 0) {
+                params.__set_txn_id(rs->wal_id());
+                params.__set_label(rs->import_label());
             }
         }
-        if (!req.runtime_state->tablet_commit_infos().empty()) {
-            params.__isset.commitInfos = true;
-            
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
-            for (auto& info : req.runtime_state->tablet_commit_infos()) {
-                params.commitInfos.push_back(info);
-            }
-        } else if (!req.runtime_states.empty()) {
-            for (auto* rs : req.runtime_states) {
-                if (!rs->tablet_commit_infos().empty()) {
-                    params.__isset.commitInfos = true;
-                    params.commitInfos.insert(params.commitInfos.end(),
-                                              
rs->tablet_commit_infos().begin(),
-                                              rs->tablet_commit_infos().end());
-                }
+    }
+    if (!req.runtime_state->export_output_files().empty()) {
+        params.__isset.export_files = true;
+        params.export_files = req.runtime_state->export_output_files();
+    } else if (!req.runtime_states.empty()) {
+        for (auto* rs : req.runtime_states) {
+            if (!rs->export_output_files().empty()) {
+                params.__isset.export_files = true;
+                params.export_files.insert(params.export_files.end(),
+                                           rs->export_output_files().begin(),
+                                           rs->export_output_files().end());
             }
         }
-        if (!req.runtime_state->error_tablet_infos().empty()) {
-            params.__isset.errorTabletInfos = true;
-            
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
-            for (auto& info : req.runtime_state->error_tablet_infos()) {
-                params.errorTabletInfos.push_back(info);
+    }
+    if (!req.runtime_state->tablet_commit_infos().empty()) {
+        params.__isset.commitInfos = true;
+        
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
+        for (auto& info : req.runtime_state->tablet_commit_infos()) {
+            params.commitInfos.push_back(info);
+        }
+    } else if (!req.runtime_states.empty()) {
+        for (auto* rs : req.runtime_states) {
+            if (!rs->tablet_commit_infos().empty()) {
+                params.__isset.commitInfos = true;
+                params.commitInfos.insert(params.commitInfos.end(),
+                                          rs->tablet_commit_infos().begin(),
+                                          rs->tablet_commit_infos().end());
             }
-        } else if (!req.runtime_states.empty()) {
-            for (auto* rs : req.runtime_states) {
-                if (!rs->error_tablet_infos().empty()) {
-                    params.__isset.errorTabletInfos = true;
-                    
params.errorTabletInfos.insert(params.errorTabletInfos.end(),
-                                                   
rs->error_tablet_infos().begin(),
-                                                   
rs->error_tablet_infos().end());
-                }
+        }
+    }
+    if (!req.runtime_state->error_tablet_infos().empty()) {
+        params.__isset.errorTabletInfos = true;
+        
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
+        for (auto& info : req.runtime_state->error_tablet_infos()) {
+            params.errorTabletInfos.push_back(info);
+        }
+    } else if (!req.runtime_states.empty()) {
+        for (auto* rs : req.runtime_states) {
+            if (!rs->error_tablet_infos().empty()) {
+                params.__isset.errorTabletInfos = true;
+                params.errorTabletInfos.insert(params.errorTabletInfos.end(),
+                                               
rs->error_tablet_infos().begin(),
+                                               rs->error_tablet_infos().end());
             }
         }
+    }
 
-        if (!req.runtime_state->hive_partition_updates().empty()) {
-            params.__isset.hive_partition_updates = true;
-            params.hive_partition_updates.reserve(
-                    req.runtime_state->hive_partition_updates().size());
-            for (auto& hive_partition_update : 
req.runtime_state->hive_partition_updates()) {
-                params.hive_partition_updates.push_back(hive_partition_update);
-            }
-        } else if (!req.runtime_states.empty()) {
-            for (auto* rs : req.runtime_states) {
-                if (!rs->hive_partition_updates().empty()) {
-                    params.__isset.hive_partition_updates = true;
-                    
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
-                                                         
rs->hive_partition_updates().begin(),
-                                                         
rs->hive_partition_updates().end());
-                }
+    if (!req.runtime_state->hive_partition_updates().empty()) {
+        params.__isset.hive_partition_updates = true;
+        
params.hive_partition_updates.reserve(req.runtime_state->hive_partition_updates().size());
+        for (auto& hive_partition_update : 
req.runtime_state->hive_partition_updates()) {
+            params.hive_partition_updates.push_back(hive_partition_update);
+        }
+    } else if (!req.runtime_states.empty()) {
+        for (auto* rs : req.runtime_states) {
+            if (!rs->hive_partition_updates().empty()) {
+                params.__isset.hive_partition_updates = true;
+                
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
+                                                     
rs->hive_partition_updates().begin(),
+                                                     
rs->hive_partition_updates().end());
             }
         }
+    }
 
-        if (!req.runtime_state->iceberg_commit_datas().empty()) {
-            params.__isset.iceberg_commit_datas = true;
-            
params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
-            for (auto& iceberg_commit_data : 
req.runtime_state->iceberg_commit_datas()) {
-                params.iceberg_commit_datas.push_back(iceberg_commit_data);
-            }
-        } else if (!req.runtime_states.empty()) {
-            for (auto* rs : req.runtime_states) {
-                if (!rs->iceberg_commit_datas().empty()) {
-                    params.__isset.iceberg_commit_datas = true;
-                    
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
-                                                       
rs->iceberg_commit_datas().begin(),
-                                                       
rs->iceberg_commit_datas().end());
-                }
+    if (!req.runtime_state->iceberg_commit_datas().empty()) {
+        params.__isset.iceberg_commit_datas = true;
+        
params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
+        for (auto& iceberg_commit_data : 
req.runtime_state->iceberg_commit_datas()) {
+            params.iceberg_commit_datas.push_back(iceberg_commit_data);
+        }
+    } else if (!req.runtime_states.empty()) {
+        for (auto* rs : req.runtime_states) {
+            if (!rs->iceberg_commit_datas().empty()) {
+                params.__isset.iceberg_commit_datas = true;
+                
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
+                                                   
rs->iceberg_commit_datas().begin(),
+                                                   
rs->iceberg_commit_datas().end());
             }
         }
-
-        // Send new errors to coordinator
-        req.runtime_state->get_unreported_errors(&(params.error_log));
-        params.__isset.error_log = (params.error_log.size() > 0);
     }
 
+    // Send new errors to coordinator
+    req.runtime_state->get_unreported_errors(&(params.error_log));
+    params.__isset.error_log = (!params.error_log.empty());
+
     if (_exec_env->master_info()->__isset.backend_id) {
         params.__set_backend_id(_exec_env->master_info()->backend_id);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to