This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit cbe5c5f54f1019aeed8d939ae58e02423e02e6e6
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Wed Jun 7 14:58:57 2023 +0300

    [yagp_hooks_collector] Diff system stats per-query and improve error safety
    
    Capture /proc stats at query start and compute diff at end rather than
    reporting lifetime totals.  Suppress error rethrows from the collector
    to avoid breaking other extensions.  Add missing hooks deinitialization.
    Modernize ereport style.
---
 src/EventSender.cpp                                | 24 +++++++++++----
 src/ProcStats.cpp                                  | 36 ++++++++--------------
 src/hook_wrappers.cpp                              | 10 ++----
 .../pg_stat_statements_ya_parser.c                 |  6 ++--
 4 files changed, 36 insertions(+), 40 deletions(-)

diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index 55858ed5183..b1f85cf9f1e 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -2,6 +2,7 @@
 #include "GrpcConnector.h"
 #include "ProcStats.h"
 #include <ctime>
+#include <chrono>
 
 extern "C" {
 #include "postgres.h"
@@ -168,6 +169,8 @@ void 
set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics,
   }
 }
 
+decltype(std::chrono::high_resolution_clock::now()) query_start_time;
+
 void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc,
                     bool need_spillinfo) {
   if (need_spillinfo) {
@@ -182,6 +185,10 @@ void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc 
*query_desc,
     set_metric_instrumentation(metrics->mutable_instrumentation(), query_desc);
   }
   fill_self_stats(metrics->mutable_systemstat());
+  std::chrono::duration<double> elapsed_seconds =
+      std::chrono::high_resolution_clock::now() - query_start_time;
+  metrics->mutable_systemstat()->set_runningtimeseconds(
+      elapsed_seconds.count());
 }
 
 yagpcc::SetQueryReq create_query_req(QueryDesc *query_desc,
@@ -228,14 +235,17 @@ void 
EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) {
     // TODO
     break;
   default:
-    elog(FATAL, "Unknown query status: %d", status);
+    ereport(FATAL, (errmsg("Unknown query status: %d", status)));
   }
 }
 
 void EventSender::executor_before_start(QueryDesc *query_desc,
                                         int /* eflags*/) {
-  if (Gp_role == GP_ROLE_DISPATCH && need_collect() &&
-      Config::enable_analyze()) {
+  if (!need_collect()) {
+    return;
+  }
+  query_start_time = std::chrono::high_resolution_clock::now();
+  if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze()) {
     instr_time starttime;
     query_desc->instrument_options |= INSTRUMENT_BUFFERS;
     query_desc->instrument_options |= INSTRUMENT_ROWS;
@@ -311,9 +321,11 @@ void EventSender::send_query_info(yagpcc::SetQueryReq *req,
                                   const std::string &event) {
   auto result = connector->set_metric_query(*req);
   if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) {
-    elog(WARNING, "Query {%d-%d-%d} %s reporting failed with an error %s",
-         req->query_key().tmid(), req->query_key().ssid(),
-         req->query_key().ccnt(), event.c_str(), result.error_text().c_str());
+    ereport(WARNING,
+            (errmsg("Query {%d-%d-%d} %s reporting failed with an error %s",
+                    req->query_key().tmid(), req->query_key().ssid(),
+                    req->query_key().ccnt(), event.c_str(),
+                    result.error_text().c_str())));
   }
 }
 
diff --git a/src/ProcStats.cpp b/src/ProcStats.cpp
index 5c64f25ec09..668173a0f7e 100644
--- a/src/ProcStats.cpp
+++ b/src/ProcStats.cpp
@@ -13,7 +13,7 @@ namespace {
 #define FILL_IO_STAT(stat_name)                                                
\
   uint64_t stat_name;                                                          
\
   proc_stat >> tmp >> stat_name;                                               
\
-  stats->set_##stat_name(stat_name);
+  stats->set_##stat_name(stat_name - stats->stat_name());
 
 void fill_io_stats(yagpcc::SystemStat *stats) {
   std::ifstream proc_stat("/proc/self/io");
@@ -30,36 +30,23 @@ void fill_io_stats(yagpcc::SystemStat *stats) {
 void fill_cpu_stats(yagpcc::SystemStat *stats) {
   static const int UTIME_ID = 13;
   static const int STIME_ID = 14;
-  static const int STARTTIME_ID = 21;
   static const int VSIZE_ID = 22;
   static const int RSS_ID = 23;
   static const double tps = sysconf(_SC_CLK_TCK);
 
-  double uptime;
-  {
-    std::ifstream proc_stat("/proc/uptime");
-    proc_stat >> uptime;
-  }
-
   std::ifstream proc_stat("/proc/self/stat");
   std::string trash;
-  double start_time = 0;
   for (int i = 0; i <= RSS_ID; ++i) {
     switch (i) {
     case UTIME_ID:
       double utime;
       proc_stat >> utime;
-      stats->set_usertimeseconds(utime / tps);
+      stats->set_usertimeseconds(utime / tps - stats->usertimeseconds());
       break;
     case STIME_ID:
       double stime;
       proc_stat >> stime;
-      stats->set_kerneltimeseconds(stime / tps);
-      break;
-    case STARTTIME_ID:
-      uint64_t starttime;
-      proc_stat >> starttime;
-      start_time = static_cast<double>(starttime) / tps;
+      stats->set_kerneltimeseconds(stime / tps - stats->kerneltimeseconds());
       break;
     case VSIZE_ID:
       uint64_t vsize;
@@ -75,7 +62,6 @@ void fill_cpu_stats(yagpcc::SystemStat *stats) {
     default:
       proc_stat >> trash;
     }
-    stats->set_runningtimeseconds(uptime - start_time);
   }
 }
 
@@ -89,16 +75,16 @@ void fill_status_stats(yagpcc::SystemStat *stats) {
       stats->set_vmpeakkb(value);
       proc_stat >> measure;
       if (measure != "kB") {
-        elog(FATAL, "Expected memory sizes in kB, but got in %s",
-             measure.c_str());
+        ereport(FATAL, (errmsg("Expected memory sizes in kB, but got in %s",
+                               measure.c_str())));
       }
     } else if (key == "VmSize:") {
       uint64_t value;
       proc_stat >> value;
       stats->set_vmsizekb(value);
       if (measure != "kB") {
-        elog(FATAL, "Expected memory sizes in kB, but got in %s",
-             measure.c_str());
+        ereport(FATAL, (errmsg("Expected memory sizes in kB, but got in %s",
+                               measure.c_str())));
       }
     }
   }
@@ -106,7 +92,9 @@ void fill_status_stats(yagpcc::SystemStat *stats) {
 } // namespace
 
 void fill_self_stats(yagpcc::SystemStat *stats) {
-  fill_io_stats(stats);
-  fill_cpu_stats(stats);
-  fill_status_stats(stats);
+  static yagpcc::SystemStat prev_stats;
+  fill_io_stats(&prev_stats);
+  fill_cpu_stats(&prev_stats);
+  fill_status_stats(&prev_stats);
+  *stats = prev_stats;
 }
\ No newline at end of file
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index edad5798e44..a904dc9bafd 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -44,6 +44,8 @@ void hooks_init() {
 
 void hooks_deinit() {
   ExecutorStart_hook = previous_ExecutorStart_hook;
+  ExecutorRun_hook = previous_ExecutorRun_hook;
+  ExecutorFinish_hook = previous_ExecutorFinish_hook;
   ExecutorEnd_hook = previous_ExecutorEnd_hook;
   query_info_collect_hook = previous_query_info_collect_hook;
   stat_statements_parser_deinit();
@@ -56,7 +58,6 @@ void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) 
{
   {
     ereport(WARNING,
             (errmsg("EventSender failed in ya_ExecutorBeforeStart_hook")));
-    PG_RE_THROW();
   }
   PG_END_TRY();
   if (previous_ExecutorStart_hook) {
@@ -70,7 +71,6 @@ void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) 
{
   {
     ereport(WARNING,
             (errmsg("EventSender failed in ya_ExecutorAfterStart_hook")));
-    PG_RE_THROW();
   }
   PG_END_TRY();
 }
@@ -116,10 +116,7 @@ void ya_ExecutorEnd_hook(QueryDesc *query_desc) {
   PG_TRY();
   { EventSender::instance()->executor_end(query_desc); }
   PG_CATCH();
-  {
-    ereport(WARNING, (errmsg("EventSender failed in ya_ExecutorEnd_hook")));
-    PG_RE_THROW();
-  }
+  { ereport(WARNING, (errmsg("EventSender failed in ya_ExecutorEnd_hook"))); }
   PG_END_TRY();
   if (previous_ExecutorEnd_hook) {
     (*previous_ExecutorEnd_hook)(query_desc);
@@ -135,7 +132,6 @@ void ya_query_info_collect_hook(QueryMetricsStatus status, 
void *arg) {
   {
     ereport(WARNING,
             (errmsg("EventSender failed in ya_query_info_collect_hook")));
-    PG_RE_THROW();
   }
   PG_END_TRY();
   if (previous_query_info_collect_hook) {
diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.c 
b/src/stat_statements_parser/pg_stat_statements_ya_parser.c
index a37ac0ef0bf..1c58d936093 100644
--- a/src/stat_statements_parser/pg_stat_statements_ya_parser.c
+++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.c
@@ -213,7 +213,7 @@ JumbleRangeTable(pgssJumbleState *jstate, List *rtable)
                        JumbleExpr(jstate, (Node *)rte->functions);
                        break;
                default:
-                       elog(ERROR, "unrecognized RTE kind: %d", 
(int)rte->rtekind);
+                       ereport(ERROR, (errmsg("unrecognized RTE kind: %d", 
(int)rte->rtekind)));
                        break;
                }
        }
@@ -659,8 +659,8 @@ JumbleExpr(pgssJumbleState *jstate, Node *node)
                break;
        default:
                /* Only a warning, since we can stumble along anyway */
-               elog(WARNING, "unrecognized node type: %d",
-                        (int)nodeTag(node));
+               ereport(WARNING, (errmsg("unrecognized node type: %d",
+                        (int)nodeTag(node))));
                break;
        }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to