This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit 622cb83aedf979982263817707971eaf4f357b48 Author: NJrslv <[email protected]> AuthorDate: Tue Jun 24 14:41:03 2025 +0300 [yagp_hooks_collector] Add conditional EXPLAIN ANALYZE collection When enable_analyze is true and execution time exceeds min_analyze_time, generate EXPLAIN (ANALYZE, BUFFERS, TIMING, VERBOSE) output and include it in the done event. --- protos/yagpcc_metrics.proto | 1 + src/Config.cpp | 22 +++++++++++++++++-- src/Config.h | 2 ++ src/EventSender.cpp | 51 +++++++++++++++++++++++++++++++++++++++---- src/EventSender.h | 1 + src/PgUtils.cpp | 37 +++++++++++++++++++++++++++++++ src/PgUtils.h | 1 + src/ProtoUtils.cpp | 53 +++++++++++++++++++++++++++++++++++++-------- src/ProtoUtils.h | 4 +++- src/hook_wrappers.cpp | 24 ++++++++++++++++++++ 10 files changed, 180 insertions(+), 16 deletions(-) diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto index 086f3e63379..91ac0c4941a 100644 --- a/protos/yagpcc_metrics.proto +++ b/protos/yagpcc_metrics.proto @@ -34,6 +34,7 @@ message QueryInfo { string userName = 8; string databaseName = 9; string rsgname = 10; + string analyze_text = 11; } message AdditionalQueryInfo { diff --git a/src/Config.cpp b/src/Config.cpp index 5e0749f171d..ac274a1e218 100644 --- a/src/Config.cpp +++ b/src/Config.cpp @@ -16,7 +16,10 @@ static bool guc_enable_cdbstats = true; static bool guc_enable_collector = true; static bool guc_report_nested_queries = true; static char *guc_ignored_users = nullptr; -static int guc_max_text_size = 1024; // in KB +static int guc_max_text_size = 1024; // in KB +static int guc_max_plan_size = 1024; // in KB +static int guc_min_analyze_time = -1; // uninitialized state + static std::unique_ptr<std::unordered_set<std::string>> ignored_users_set = nullptr; static bool ignored_users_guc_dirty = false; @@ -89,9 +92,22 @@ void Config::init() { DefineCustomIntVariable( "yagpcc.max_text_size", - "Make yagpcc trim plan and query texts longer than configured size", NULL, + "Make yagpcc trim query texts longer than configured size", NULL, &guc_max_text_size, 1024, 0, INT_MAX / 1024, PGC_SUSET, GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_UNIT_KB, NULL, NULL, NULL); + + DefineCustomIntVariable( + "yagpcc.max_plan_size", + "Make yagpcc trim plan longer than configured size", NULL, + &guc_max_plan_size, 1024, 0, INT_MAX / 1024, PGC_SUSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_UNIT_KB, NULL, NULL, NULL); + + DefineCustomIntVariable( + "yagpcc.min_analyze_time", + "Sets the minimum execution time above which plans will be logged.", + "Zero prints all plans. -1 turns this feature off.", + &guc_min_analyze_time, -1, -1, INT_MAX, PGC_USERSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_UNIT_MS, NULL, NULL, NULL); } std::string Config::uds_path() { return guc_uds_path; } @@ -100,6 +116,8 @@ bool Config::enable_cdbstats() { return guc_enable_cdbstats; } bool Config::enable_collector() { return guc_enable_collector; } bool Config::report_nested_queries() { return guc_report_nested_queries; } size_t Config::max_text_size() { return guc_max_text_size * 1024; } +size_t Config::max_plan_size() { return guc_max_plan_size * 1024; } +int Config::min_analyze_time() { return guc_min_analyze_time; }; bool Config::filter_user(const std::string *username) { if (!username || !ignored_users_set) { diff --git a/src/Config.h b/src/Config.h index 3caa0c78339..dd081c41dd6 100644 --- a/src/Config.h +++ b/src/Config.h @@ -12,5 +12,7 @@ public: static bool filter_user(const std::string *username); static bool report_nested_queries(); static size_t max_text_size(); + static size_t max_plan_size(); + static int min_analyze_time(); static void sync(); }; \ No newline at end of file diff --git a/src/EventSender.cpp b/src/EventSender.cpp index fc0f7e1aa07..19787fe0db0 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -20,6 +20,10 @@ extern "C" { #include "PgUtils.h" #include "ProtoUtils.h" +#define need_collect_analyze() \ + (Gp_role == GP_ROLE_DISPATCH && Config::min_analyze_time() >= 0 && \ + Config::enable_analyze()) + void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) { return; @@ -53,8 +57,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { } } -void EventSender::executor_before_start(QueryDesc *query_desc, - int /* eflags*/) { +void EventSender::executor_before_start(QueryDesc *query_desc, int eflags) { if (!connector) { return; } @@ -67,7 +70,8 @@ void EventSender::executor_before_start(QueryDesc *query_desc, return; } collect_query_submit(query_desc); - if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze()) { + if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze() && + (eflags & EXEC_FLAG_EXPLAIN_ONLY) == 0) { query_desc->instrument_options |= INSTRUMENT_BUFFERS; query_desc->instrument_options |= INSTRUMENT_ROWS; query_desc->instrument_options |= INSTRUMENT_TIMER; @@ -97,6 +101,17 @@ void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) { } update_query_state(query_desc, query, QueryState::START); set_query_plan(query_msg, query_desc); + if (need_collect_analyze()) { + // Set up to track total elapsed time during query run. + // Make sure the space is allocated in the per-query + // context so it will go away at executor_end. + if (query_desc->totaltime == NULL) { + MemoryContext oldcxt; + oldcxt = MemoryContextSwitchTo(query_desc->estate->es_query_cxt); + query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); + MemoryContextSwitchTo(oldcxt); + } + } yagpcc::GPMetrics stats; std::swap(stats, *query_msg->mutable_query_metrics()); if (connector->report_query(*query_msg, "started")) { @@ -262,6 +277,34 @@ void EventSender::ic_metrics_collect() { #endif } +void EventSender::analyze_stats_collect(QueryDesc *query_desc) { + if (!connector || Gp_role != GP_ROLE_DISPATCH) { + return; + } + if (!need_collect(query_desc, nesting_level)) { + return; + } + auto query = get_query_message(query_desc); + auto query_msg = query->message; + *query_msg->mutable_end_time() = current_ts(); + // Yet another greenplum weirdness: thats actually a nested query + // which is being committed/rollbacked. Treat it accordingly. + if (query->state == UNKNOWN && !need_report_nested_query()) { + return; + } + if (!query_desc->totaltime || !need_collect_analyze()) { + return; + } + // Make sure stats accumulation is done. + // (Note: it's okay if several levels of hook all do this.) + InstrEndLoop(query_desc->totaltime); + + double ms = query_desc->totaltime->total * 1000.0; + if (ms >= Config::min_analyze_time()) { + set_analyze_plan_text_json(query_desc, query_msg); + } +} + EventSender::EventSender() { if (Config::enable_collector()) { try { @@ -350,4 +393,4 @@ void EventSender::update_nested_counters(QueryDesc *query_desc) { EventSender::QueryItem::QueryItem(EventSender::QueryState st, yagpcc::SetQueryReq *msg) - : state(st), message(msg) {} \ No newline at end of file + : state(st), message(msg) {} diff --git a/src/EventSender.h b/src/EventSender.h index 99f7b24753d..4d09b429fc8 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -27,6 +27,7 @@ public: void executor_end(QueryDesc *query_desc); void query_metrics_collect(QueryMetricsStatus status, void *arg); void ic_metrics_collect(); + void analyze_stats_collect(QueryDesc *query_desc); void incr_depth() { nesting_level++; } void decr_depth() { nesting_level--; } EventSender(); diff --git a/src/PgUtils.cpp b/src/PgUtils.cpp index 5982ff77c1c..ed3e69c6d44 100644 --- a/src/PgUtils.cpp +++ b/src/PgUtils.cpp @@ -109,3 +109,40 @@ ExplainState get_explain_state(QueryDesc *query_desc, bool costs) { ExplainEndOutput(&es); return es; } + +ExplainState get_analyze_state_json(QueryDesc *query_desc, bool analyze) { + ExplainState es; + ExplainInitState(&es); + es.analyze = analyze; + es.verbose = true; + es.buffers = es.analyze; + es.timing = es.analyze; + es.summary = es.analyze; + es.format = EXPLAIN_FORMAT_JSON; + ExplainBeginOutput(&es); + if (analyze) { + PG_TRY(); + { + ExplainPrintPlan(&es, query_desc); + ExplainPrintExecStatsEnd(&es, query_desc); + } + PG_CATCH(); + { + // PG and GP both have known and yet unknown bugs in EXPLAIN VERBOSE + // implementation. We don't want any queries to fail due to those bugs, so + // we report the bug here for future investigatin and continue collecting + // metrics w/o reporting any plans + resetStringInfo(es.str); + appendStringInfo( + es.str, + "Unable to restore analyze plan due to PostgreSQL internal error. " + "See logs for more information"); + ereport(INFO, + (errmsg("YAGPCC failed to reconstruct analyze text for query: %s", + query_desc->sourceText))); + } + PG_END_TRY(); + } + ExplainEndOutput(&es); + return es; +} diff --git a/src/PgUtils.h b/src/PgUtils.h index 85b1eb833cd..81282a473a8 100644 --- a/src/PgUtils.h +++ b/src/PgUtils.h @@ -14,3 +14,4 @@ bool need_report_nested_query(); bool filter_query(QueryDesc *query_desc); bool need_collect(QueryDesc *query_desc, int nesting_level); ExplainState get_explain_state(QueryDesc *query_desc, bool costs); +ExplainState get_analyze_state_json(QueryDesc *query_desc, bool analyze); diff --git a/src/ProtoUtils.cpp b/src/ProtoUtils.cpp index c37cefb72d6..6e9fa6bd5c5 100644 --- a/src/ProtoUtils.cpp +++ b/src/ProtoUtils.cpp @@ -8,6 +8,7 @@ extern "C" { #include "postgres.h" #include "access/hash.h" +#include "access/xact.h" #include "cdb/cdbinterconnect.h" #include "cdb/cdbvars.h" #include "cdb/ml_ipc.h" @@ -47,8 +48,9 @@ void set_segment_key(yagpcc::SegmentKey *key) { key->set_segindex(GpIdentity.segindex); } -inline std::string char_to_trimmed_str(const char *str, size_t len) { - return std::string(str, std::min(len, Config::max_text_size())); +inline std::string char_to_trimmed_str(const char *str, size_t len, + size_t lim) { + return std::string(str, std::min(len, lim)); } void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { @@ -61,10 +63,11 @@ void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { MemoryContextSwitchTo(query_desc->estate->es_query_cxt); auto es = get_explain_state(query_desc, true); MemoryContextSwitchTo(oldcxt); - *qi->mutable_plan_text() = char_to_trimmed_str(es.str->data, es.str->len); + *qi->mutable_plan_text() = + char_to_trimmed_str(es.str->data, es.str->len, Config::max_plan_size()); StringInfo norm_plan = gen_normplan(es.str->data); - *qi->mutable_template_plan_text() = - char_to_trimmed_str(norm_plan->data, norm_plan->len); + *qi->mutable_template_plan_text() = char_to_trimmed_str( + norm_plan->data, norm_plan->len, Config::max_plan_size()); qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, norm_plan->len)); qi->set_query_id(query_desc->plannedstmt->queryId); pfree(es.str->data); @@ -76,10 +79,11 @@ void set_query_text(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->sourceText) { auto qi = req->mutable_query_info(); *qi->mutable_query_text() = char_to_trimmed_str( - query_desc->sourceText, strlen(query_desc->sourceText)); + query_desc->sourceText, strlen(query_desc->sourceText), + Config::max_text_size()); char *norm_query = gen_normquery(query_desc->sourceText); - *qi->mutable_template_query_text() = - char_to_trimmed_str(norm_query, strlen(norm_query)); + *qi->mutable_template_query_text() = char_to_trimmed_str( + norm_query, strlen(norm_query), Config::max_text_size()); } } @@ -90,6 +94,7 @@ void clear_big_fields(yagpcc::SetQueryReq *req) { qi->clear_template_plan_text(); qi->clear_query_text(); qi->clear_template_query_text(); + qi->clear_analyze_text(); } } @@ -115,7 +120,8 @@ void set_qi_slice_id(yagpcc::SetQueryReq *req) { void set_qi_error_message(yagpcc::SetQueryReq *req) { auto aqi = req->mutable_add_info(); auto error = elog_message(); - *aqi->mutable_error_message() = char_to_trimmed_str(error, strlen(error)); + *aqi->mutable_error_message() = + char_to_trimmed_str(error, strlen(error), Config::max_text_size()); } void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics, @@ -217,4 +223,33 @@ yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status) { double protots_to_double(const google::protobuf::Timestamp &ts) { return double(ts.seconds()) + double(ts.nanos()) / 1000000000.0; +} + +void set_analyze_plan_text_json(QueryDesc *query_desc, + yagpcc::SetQueryReq *req) { + // Make sure it is a valid txn and it is not an utility + // statement for ExplainPrintPlan() later. + if (!IsTransactionState() || !query_desc->plannedstmt) { + return; + } + MemoryContext oldcxt = + MemoryContextSwitchTo(query_desc->estate->es_query_cxt); + + ExplainState es = get_analyze_state_json( + query_desc, query_desc->instrument_options && Config::enable_analyze()); + // Remove last line break. + if (es.str->len > 0 && es.str->data[es.str->len - 1] == '\n') { + es.str->data[--es.str->len] = '\0'; + } + // Convert JSON array to JSON object. + if (es.str->len > 0) { + es.str->data[0] = '{'; + es.str->data[es.str->len - 1] = '}'; + } + auto trimmed_analyze = + char_to_trimmed_str(es.str->data, es.str->len, Config::max_plan_size()); + req->mutable_query_info()->set_analyze_text(trimmed_analyze); + + pfree(es.str->data); + MemoryContextSwitchTo(oldcxt); } \ No newline at end of file diff --git a/src/ProtoUtils.h b/src/ProtoUtils.h index 4e4ed5e76a3..6fb880c2eb8 100644 --- a/src/ProtoUtils.h +++ b/src/ProtoUtils.h @@ -16,4 +16,6 @@ void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc, void set_ic_stats(yagpcc::MetricInstrumentation *metrics, const ICStatistics *ic_statistics); yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status); -double protots_to_double(const google::protobuf::Timestamp &ts); \ No newline at end of file +double protots_to_double(const google::protobuf::Timestamp &ts); +void set_analyze_plan_text_json(QueryDesc *query_desc, + yagpcc::SetQueryReq *message); \ No newline at end of file diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index f1d403b82f1..79d3ec45881 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -3,6 +3,7 @@ extern "C" { #include "postgres.h" #include "funcapi.h" #include "executor/executor.h" +#include "executor/execUtils.h" #include "utils/elog.h" #include "utils/builtins.h" #include "utils/metrics_utils.h" @@ -24,6 +25,10 @@ static ExecutorRun_hook_type previous_ExecutorRun_hook = nullptr; static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr; static ExecutorEnd_hook_type previous_ExecutorEnd_hook = nullptr; static query_info_collect_hook_type previous_query_info_collect_hook = nullptr; +#ifdef ANALYZE_STATS_COLLECT_HOOK +static analyze_stats_collect_hook_type previous_analyze_stats_collect_hook = + nullptr; +#endif #ifdef IC_TEARDOWN_HOOK static ic_teardown_hook_type previous_ic_teardown_hook = nullptr; #endif @@ -36,6 +41,9 @@ static void ya_ExecutorEnd_hook(QueryDesc *query_desc); static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg); static void ya_ic_teardown_hook(ChunkTransportState *transportStates, bool hasErrors); +#ifdef ANALYZE_STATS_COLLECT_HOOK +static void ya_analyze_stats_collect_hook(QueryDesc *query_desc); +#endif static EventSender *sender = nullptr; @@ -71,6 +79,10 @@ void hooks_init() { #ifdef IC_TEARDOWN_HOOK previous_ic_teardown_hook = ic_teardown_hook; ic_teardown_hook = ya_ic_teardown_hook; +#endif +#ifdef ANALYZE_STATS_COLLECT_HOOK + previous_analyze_stats_collect_hook = analyze_stats_collect_hook; + analyze_stats_collect_hook = ya_analyze_stats_collect_hook; #endif stat_statements_parser_init(); } @@ -83,6 +95,9 @@ void hooks_deinit() { query_info_collect_hook = previous_query_info_collect_hook; #ifdef IC_TEARDOWN_HOOK ic_teardown_hook = previous_ic_teardown_hook; +#endif +#ifdef ANALYZE_STATS_COLLECT_HOOK + analyze_stats_collect_hook = previous_analyze_stats_collect_hook; #endif stat_statements_parser_deinit(); if (sender) { @@ -165,6 +180,15 @@ void ya_ic_teardown_hook(ChunkTransportState *transportStates, bool hasErrors) { #endif } +#ifdef ANALYZE_STATS_COLLECT_HOOK +void ya_analyze_stats_collect_hook(QueryDesc *query_desc) { + cpp_call(get_sender(), &EventSender::analyze_stats_collect, query_desc); + if (previous_analyze_stats_collect_hook) { + (*previous_analyze_stats_collect_hook)(query_desc); + } +} +#endif + static void check_stats_loaded() { if (!YagpStat::loaded()) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
