This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 622cb83aedf979982263817707971eaf4f357b48
Author: NJrslv <[email protected]>
AuthorDate: Tue Jun 24 14:41:03 2025 +0300

    [yagp_hooks_collector] Add conditional EXPLAIN ANALYZE collection
    
    When enable_analyze is true and execution time exceeds
    min_analyze_time, generate EXPLAIN (ANALYZE, BUFFERS, TIMING,
    VERBOSE) output and include it in the done event.
---
 protos/yagpcc_metrics.proto |  1 +
 src/Config.cpp              | 22 +++++++++++++++++--
 src/Config.h                |  2 ++
 src/EventSender.cpp         | 51 +++++++++++++++++++++++++++++++++++++++----
 src/EventSender.h           |  1 +
 src/PgUtils.cpp             | 37 +++++++++++++++++++++++++++++++
 src/PgUtils.h               |  1 +
 src/ProtoUtils.cpp          | 53 +++++++++++++++++++++++++++++++++++++--------
 src/ProtoUtils.h            |  4 +++-
 src/hook_wrappers.cpp       | 24 ++++++++++++++++++++
 10 files changed, 180 insertions(+), 16 deletions(-)

diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto
index 086f3e63379..91ac0c4941a 100644
--- a/protos/yagpcc_metrics.proto
+++ b/protos/yagpcc_metrics.proto
@@ -34,6 +34,7 @@ message QueryInfo {
     string userName = 8;
     string databaseName = 9;
     string rsgname = 10;
+    string analyze_text = 11;
 }
 
 message AdditionalQueryInfo {
diff --git a/src/Config.cpp b/src/Config.cpp
index 5e0749f171d..ac274a1e218 100644
--- a/src/Config.cpp
+++ b/src/Config.cpp
@@ -16,7 +16,10 @@ static bool guc_enable_cdbstats = true;
 static bool guc_enable_collector = true;
 static bool guc_report_nested_queries = true;
 static char *guc_ignored_users = nullptr;
-static int guc_max_text_size = 1024; // in KB
+static int guc_max_text_size = 1024;  // in KB
+static int guc_max_plan_size = 1024;  // in KB
+static int guc_min_analyze_time = -1; // uninitialized state
+
 static std::unique_ptr<std::unordered_set<std::string>> ignored_users_set =
     nullptr;
 static bool ignored_users_guc_dirty = false;
@@ -89,9 +92,22 @@ void Config::init() {
 
   DefineCustomIntVariable(
       "yagpcc.max_text_size",
-      "Make yagpcc trim plan and query texts longer than configured size", 
NULL,
+      "Make yagpcc trim query texts longer than configured size", NULL,
       &guc_max_text_size, 1024, 0, INT_MAX / 1024, PGC_SUSET,
       GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_UNIT_KB, NULL, NULL, NULL);
+
+  DefineCustomIntVariable(
+      "yagpcc.max_plan_size",
+      "Make yagpcc trim plan longer than configured size", NULL,
+      &guc_max_plan_size, 1024, 0, INT_MAX / 1024, PGC_SUSET,
+      GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_UNIT_KB, NULL, NULL, NULL);
+
+  DefineCustomIntVariable(
+      "yagpcc.min_analyze_time",
+      "Sets the minimum execution time above which plans will be logged.",
+      "Zero prints all plans. -1 turns this feature off.",
+      &guc_min_analyze_time, -1, -1, INT_MAX, PGC_USERSET,
+      GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_UNIT_MS, NULL, NULL, NULL);
 }
 
 std::string Config::uds_path() { return guc_uds_path; }
@@ -100,6 +116,8 @@ bool Config::enable_cdbstats() { return 
guc_enable_cdbstats; }
 bool Config::enable_collector() { return guc_enable_collector; }
 bool Config::report_nested_queries() { return guc_report_nested_queries; }
 size_t Config::max_text_size() { return guc_max_text_size * 1024; }
+size_t Config::max_plan_size() { return guc_max_plan_size * 1024; }
+int Config::min_analyze_time() { return guc_min_analyze_time; };
 
 bool Config::filter_user(const std::string *username) {
   if (!username || !ignored_users_set) {
diff --git a/src/Config.h b/src/Config.h
index 3caa0c78339..dd081c41dd6 100644
--- a/src/Config.h
+++ b/src/Config.h
@@ -12,5 +12,7 @@ public:
   static bool filter_user(const std::string *username);
   static bool report_nested_queries();
   static size_t max_text_size();
+  static size_t max_plan_size();
+  static int min_analyze_time();
   static void sync();
 };
\ No newline at end of file
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index fc0f7e1aa07..19787fe0db0 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -20,6 +20,10 @@ extern "C" {
 #include "PgUtils.h"
 #include "ProtoUtils.h"
 
+#define need_collect_analyze()                                                 
\
+  (Gp_role == GP_ROLE_DISPATCH && Config::min_analyze_time() >= 0 &&           
\
+   Config::enable_analyze())
+
 void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) {
   if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
     return;
@@ -53,8 +57,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus 
status, void *arg) {
   }
 }
 
-void EventSender::executor_before_start(QueryDesc *query_desc,
-                                        int /* eflags*/) {
+void EventSender::executor_before_start(QueryDesc *query_desc, int eflags) {
   if (!connector) {
     return;
   }
@@ -67,7 +70,8 @@ void EventSender::executor_before_start(QueryDesc *query_desc,
     return;
   }
   collect_query_submit(query_desc);
-  if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze()) {
+  if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze() &&
+      (eflags & EXEC_FLAG_EXPLAIN_ONLY) == 0) {
     query_desc->instrument_options |= INSTRUMENT_BUFFERS;
     query_desc->instrument_options |= INSTRUMENT_ROWS;
     query_desc->instrument_options |= INSTRUMENT_TIMER;
@@ -97,6 +101,17 @@ void EventSender::executor_after_start(QueryDesc 
*query_desc, int /* eflags*/) {
       }
       update_query_state(query_desc, query, QueryState::START);
       set_query_plan(query_msg, query_desc);
+      if (need_collect_analyze()) {
+        // Set up to track total elapsed time during query run.
+        // Make sure the space is allocated in the per-query
+        // context so it will go away at executor_end.
+        if (query_desc->totaltime == NULL) {
+          MemoryContext oldcxt;
+          oldcxt = MemoryContextSwitchTo(query_desc->estate->es_query_cxt);
+          query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL);
+          MemoryContextSwitchTo(oldcxt);
+        }
+      }
       yagpcc::GPMetrics stats;
       std::swap(stats, *query_msg->mutable_query_metrics());
       if (connector->report_query(*query_msg, "started")) {
@@ -262,6 +277,34 @@ void EventSender::ic_metrics_collect() {
 #endif
 }
 
+void EventSender::analyze_stats_collect(QueryDesc *query_desc) {
+  if (!connector || Gp_role != GP_ROLE_DISPATCH) {
+    return;
+  }
+  if (!need_collect(query_desc, nesting_level)) {
+    return;
+  }
+  auto query = get_query_message(query_desc);
+  auto query_msg = query->message;
+  *query_msg->mutable_end_time() = current_ts();
+  // Yet another greenplum weirdness: thats actually a nested query
+  // which is being committed/rollbacked. Treat it accordingly.
+  if (query->state == UNKNOWN && !need_report_nested_query()) {
+    return;
+  }
+  if (!query_desc->totaltime || !need_collect_analyze()) {
+    return;
+  }
+  // Make sure stats accumulation is done.
+  // (Note: it's okay if several levels of hook all do this.)
+  InstrEndLoop(query_desc->totaltime);
+
+  double ms = query_desc->totaltime->total * 1000.0;
+  if (ms >= Config::min_analyze_time()) {
+    set_analyze_plan_text_json(query_desc, query_msg);
+  }
+}
+
 EventSender::EventSender() {
   if (Config::enable_collector()) {
     try {
@@ -350,4 +393,4 @@ void EventSender::update_nested_counters(QueryDesc 
*query_desc) {
 
 EventSender::QueryItem::QueryItem(EventSender::QueryState st,
                                   yagpcc::SetQueryReq *msg)
-    : state(st), message(msg) {}
\ No newline at end of file
+    : state(st), message(msg) {}
diff --git a/src/EventSender.h b/src/EventSender.h
index 99f7b24753d..4d09b429fc8 100644
--- a/src/EventSender.h
+++ b/src/EventSender.h
@@ -27,6 +27,7 @@ public:
   void executor_end(QueryDesc *query_desc);
   void query_metrics_collect(QueryMetricsStatus status, void *arg);
   void ic_metrics_collect();
+  void analyze_stats_collect(QueryDesc *query_desc);
   void incr_depth() { nesting_level++; }
   void decr_depth() { nesting_level--; }
   EventSender();
diff --git a/src/PgUtils.cpp b/src/PgUtils.cpp
index 5982ff77c1c..ed3e69c6d44 100644
--- a/src/PgUtils.cpp
+++ b/src/PgUtils.cpp
@@ -109,3 +109,40 @@ ExplainState get_explain_state(QueryDesc *query_desc, bool 
costs) {
   ExplainEndOutput(&es);
   return es;
 }
+
+ExplainState get_analyze_state_json(QueryDesc *query_desc, bool analyze) {
+  ExplainState es;
+  ExplainInitState(&es);
+  es.analyze = analyze;
+  es.verbose = true;
+  es.buffers = es.analyze;
+  es.timing = es.analyze;
+  es.summary = es.analyze;
+  es.format = EXPLAIN_FORMAT_JSON;
+  ExplainBeginOutput(&es);
+  if (analyze) {
+    PG_TRY();
+    {
+      ExplainPrintPlan(&es, query_desc);
+      ExplainPrintExecStatsEnd(&es, query_desc);
+    }
+    PG_CATCH();
+    {
+      // PG and GP both have known and yet unknown bugs in EXPLAIN VERBOSE
+      // implementation. We don't want any queries to fail due to those bugs, 
so
+      // we report the bug here for future investigatin and continue collecting
+      // metrics w/o reporting any plans
+      resetStringInfo(es.str);
+      appendStringInfo(
+          es.str,
+          "Unable to restore analyze plan due to PostgreSQL internal error. "
+          "See logs for more information");
+      ereport(INFO,
+              (errmsg("YAGPCC failed to reconstruct analyze text for query: 
%s",
+                      query_desc->sourceText)));
+    }
+    PG_END_TRY();
+  }
+  ExplainEndOutput(&es);
+  return es;
+}
diff --git a/src/PgUtils.h b/src/PgUtils.h
index 85b1eb833cd..81282a473a8 100644
--- a/src/PgUtils.h
+++ b/src/PgUtils.h
@@ -14,3 +14,4 @@ bool need_report_nested_query();
 bool filter_query(QueryDesc *query_desc);
 bool need_collect(QueryDesc *query_desc, int nesting_level);
 ExplainState get_explain_state(QueryDesc *query_desc, bool costs);
+ExplainState get_analyze_state_json(QueryDesc *query_desc, bool analyze);
diff --git a/src/ProtoUtils.cpp b/src/ProtoUtils.cpp
index c37cefb72d6..6e9fa6bd5c5 100644
--- a/src/ProtoUtils.cpp
+++ b/src/ProtoUtils.cpp
@@ -8,6 +8,7 @@
 extern "C" {
 #include "postgres.h"
 #include "access/hash.h"
+#include "access/xact.h"
 #include "cdb/cdbinterconnect.h"
 #include "cdb/cdbvars.h"
 #include "cdb/ml_ipc.h"
@@ -47,8 +48,9 @@ void set_segment_key(yagpcc::SegmentKey *key) {
   key->set_segindex(GpIdentity.segindex);
 }
 
-inline std::string char_to_trimmed_str(const char *str, size_t len) {
-  return std::string(str, std::min(len, Config::max_text_size()));
+inline std::string char_to_trimmed_str(const char *str, size_t len,
+                                       size_t lim) {
+  return std::string(str, std::min(len, lim));
 }
 
 void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
@@ -61,10 +63,11 @@ void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc 
*query_desc) {
         MemoryContextSwitchTo(query_desc->estate->es_query_cxt);
     auto es = get_explain_state(query_desc, true);
     MemoryContextSwitchTo(oldcxt);
-    *qi->mutable_plan_text() = char_to_trimmed_str(es.str->data, es.str->len);
+    *qi->mutable_plan_text() =
+        char_to_trimmed_str(es.str->data, es.str->len, 
Config::max_plan_size());
     StringInfo norm_plan = gen_normplan(es.str->data);
-    *qi->mutable_template_plan_text() =
-        char_to_trimmed_str(norm_plan->data, norm_plan->len);
+    *qi->mutable_template_plan_text() = char_to_trimmed_str(
+        norm_plan->data, norm_plan->len, Config::max_plan_size());
     qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, 
norm_plan->len));
     qi->set_query_id(query_desc->plannedstmt->queryId);
     pfree(es.str->data);
@@ -76,10 +79,11 @@ void set_query_text(yagpcc::SetQueryReq *req, QueryDesc 
*query_desc) {
   if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->sourceText) {
     auto qi = req->mutable_query_info();
     *qi->mutable_query_text() = char_to_trimmed_str(
-        query_desc->sourceText, strlen(query_desc->sourceText));
+        query_desc->sourceText, strlen(query_desc->sourceText),
+        Config::max_text_size());
     char *norm_query = gen_normquery(query_desc->sourceText);
-    *qi->mutable_template_query_text() =
-        char_to_trimmed_str(norm_query, strlen(norm_query));
+    *qi->mutable_template_query_text() = char_to_trimmed_str(
+        norm_query, strlen(norm_query), Config::max_text_size());
   }
 }
 
@@ -90,6 +94,7 @@ void clear_big_fields(yagpcc::SetQueryReq *req) {
     qi->clear_template_plan_text();
     qi->clear_query_text();
     qi->clear_template_query_text();
+    qi->clear_analyze_text();
   }
 }
 
@@ -115,7 +120,8 @@ void set_qi_slice_id(yagpcc::SetQueryReq *req) {
 void set_qi_error_message(yagpcc::SetQueryReq *req) {
   auto aqi = req->mutable_add_info();
   auto error = elog_message();
-  *aqi->mutable_error_message() = char_to_trimmed_str(error, strlen(error));
+  *aqi->mutable_error_message() =
+      char_to_trimmed_str(error, strlen(error), Config::max_text_size());
 }
 
 void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics,
@@ -217,4 +223,33 @@ yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus 
status) {
 
 double protots_to_double(const google::protobuf::Timestamp &ts) {
   return double(ts.seconds()) + double(ts.nanos()) / 1000000000.0;
+}
+
+void set_analyze_plan_text_json(QueryDesc *query_desc,
+                                yagpcc::SetQueryReq *req) {
+  // Make sure it is a valid txn and it is not an utility
+  // statement for ExplainPrintPlan() later.
+  if (!IsTransactionState() || !query_desc->plannedstmt) {
+    return;
+  }
+  MemoryContext oldcxt =
+      MemoryContextSwitchTo(query_desc->estate->es_query_cxt);
+
+  ExplainState es = get_analyze_state_json(
+      query_desc, query_desc->instrument_options && Config::enable_analyze());
+  // Remove last line break.
+  if (es.str->len > 0 && es.str->data[es.str->len - 1] == '\n') {
+    es.str->data[--es.str->len] = '\0';
+  }
+  // Convert JSON array to JSON object.
+  if (es.str->len > 0) {
+    es.str->data[0] = '{';
+    es.str->data[es.str->len - 1] = '}';
+  }
+  auto trimmed_analyze =
+      char_to_trimmed_str(es.str->data, es.str->len, Config::max_plan_size());
+  req->mutable_query_info()->set_analyze_text(trimmed_analyze);
+
+  pfree(es.str->data);
+  MemoryContextSwitchTo(oldcxt);
 }
\ No newline at end of file
diff --git a/src/ProtoUtils.h b/src/ProtoUtils.h
index 4e4ed5e76a3..6fb880c2eb8 100644
--- a/src/ProtoUtils.h
+++ b/src/ProtoUtils.h
@@ -16,4 +16,6 @@ void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc 
*query_desc,
 void set_ic_stats(yagpcc::MetricInstrumentation *metrics,
                   const ICStatistics *ic_statistics);
 yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status);
-double protots_to_double(const google::protobuf::Timestamp &ts);
\ No newline at end of file
+double protots_to_double(const google::protobuf::Timestamp &ts);
+void set_analyze_plan_text_json(QueryDesc *query_desc,
+                                yagpcc::SetQueryReq *message);
\ No newline at end of file
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index f1d403b82f1..79d3ec45881 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -3,6 +3,7 @@ extern "C" {
 #include "postgres.h"
 #include "funcapi.h"
 #include "executor/executor.h"
+#include "executor/execUtils.h"
 #include "utils/elog.h"
 #include "utils/builtins.h"
 #include "utils/metrics_utils.h"
@@ -24,6 +25,10 @@ static ExecutorRun_hook_type previous_ExecutorRun_hook = 
nullptr;
 static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr;
 static ExecutorEnd_hook_type previous_ExecutorEnd_hook = nullptr;
 static query_info_collect_hook_type previous_query_info_collect_hook = nullptr;
+#ifdef ANALYZE_STATS_COLLECT_HOOK
+static analyze_stats_collect_hook_type previous_analyze_stats_collect_hook =
+    nullptr;
+#endif
 #ifdef IC_TEARDOWN_HOOK
 static ic_teardown_hook_type previous_ic_teardown_hook = nullptr;
 #endif
@@ -36,6 +41,9 @@ static void ya_ExecutorEnd_hook(QueryDesc *query_desc);
 static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg);
 static void ya_ic_teardown_hook(ChunkTransportState *transportStates,
                                 bool hasErrors);
+#ifdef ANALYZE_STATS_COLLECT_HOOK
+static void ya_analyze_stats_collect_hook(QueryDesc *query_desc);
+#endif
 
 static EventSender *sender = nullptr;
 
@@ -71,6 +79,10 @@ void hooks_init() {
 #ifdef IC_TEARDOWN_HOOK
   previous_ic_teardown_hook = ic_teardown_hook;
   ic_teardown_hook = ya_ic_teardown_hook;
+#endif
+#ifdef ANALYZE_STATS_COLLECT_HOOK
+  previous_analyze_stats_collect_hook = analyze_stats_collect_hook;
+  analyze_stats_collect_hook = ya_analyze_stats_collect_hook;
 #endif
   stat_statements_parser_init();
 }
@@ -83,6 +95,9 @@ void hooks_deinit() {
   query_info_collect_hook = previous_query_info_collect_hook;
 #ifdef IC_TEARDOWN_HOOK
   ic_teardown_hook = previous_ic_teardown_hook;
+#endif
+#ifdef ANALYZE_STATS_COLLECT_HOOK
+  analyze_stats_collect_hook = previous_analyze_stats_collect_hook;
 #endif
   stat_statements_parser_deinit();
   if (sender) {
@@ -165,6 +180,15 @@ void ya_ic_teardown_hook(ChunkTransportState 
*transportStates, bool hasErrors) {
 #endif
 }
 
+#ifdef ANALYZE_STATS_COLLECT_HOOK
+void ya_analyze_stats_collect_hook(QueryDesc *query_desc) {
+  cpp_call(get_sender(), &EventSender::analyze_stats_collect, query_desc);
+  if (previous_analyze_stats_collect_hook) {
+    (*previous_analyze_stats_collect_hook)(query_desc);
+  }
+}
+#endif
+
 static void check_stats_loaded() {
   if (!YagpStat::loaded()) {
     ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to