This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit bef6d57baa012ea8e42029ff1a233cfa748f2106 Author: Maxim Smyatkin <[email protected]> AuthorDate: Thu May 4 14:34:42 2023 +0300 [yagp_hooks_collector] Add CDB metrics, query nesting, and configuration GUCs Add missing Greenplum node types to pg_stat_statements parser. Move stats reporting to ExecutorEnd hook. Improve GRPC failure handling. Track CDB-specific metrics and initial query nesting level. Add resource group collection. Add GUCs for controlling collection. Skip nested and utility statements by default. --- debian/control | 6 +- protos/yagpcc_metrics.proto | 1 + src/Config.cpp | 38 +++++++ src/Config.h | 12 +++ src/EventSender.cpp | 112 +++++++++++++++++---- src/EventSender.h | 6 ++ src/GrpcConnector.cpp | 66 ++++++++++-- src/hook_wrappers.cpp | 93 +++++++++++++++-- .../pg_stat_statements_ya_parser.c | 29 +++++- 9 files changed, 318 insertions(+), 45 deletions(-) diff --git a/debian/control b/debian/control index 600dd4d602e..c740a8590ca 100644 --- a/debian/control +++ b/debian/control @@ -1,11 +1,11 @@ -Source: greenplum-6-yagpcc-hooks-collector-1 +Source: greenplum-6-yagpcc-hooks Section: misc Priority: optional Maintainer: Maxim Smyatkin <[email protected]> -Build-Depends: make, gcc, g++, debhelper (>=9), greenplum-db-6 (>=6.19.3), protobuf-compiler, protobuf-compiler-grpc +Build-Depends: make, gcc, g++, debhelper (>=9), greenplum-db-6 (>=6.19.3), protobuf-compiler, protobuf-compiler-grpc, libgrpc++1, libgrpc++-dev Standards-Version: 3.9.8 -Package: greenplum-6-yagpcc-hooks-collector-1 +Package: greenplum-6-yagpcc-hooks Architecture: any Depends: ${misc:Depends}, ${shlibs:Depends}, greenplum-db-6 (>=6.19.3) Description: Greenplum extension to send query execution metrics to yandex command center agent diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto index f00f329a208..26e0a496460 100644 --- a/protos/yagpcc_metrics.proto +++ b/protos/yagpcc_metrics.proto @@ -33,6 +33,7 @@ message QueryInfo { string temlate_plan_text = 7; string userName = 8; string databaseName = 9; + string rsgname = 10; } enum PlanGenerator diff --git a/src/Config.cpp b/src/Config.cpp new file mode 100644 index 00000000000..d97e5d45984 --- /dev/null +++ b/src/Config.cpp @@ -0,0 +1,38 @@ +#include "Config.h" + +extern "C" { +#include "postgres.h" +#include "utils/builtins.h" +#include "utils/guc.h" +} + +static char *guc_uds_path = nullptr; +static bool guc_enable_analyze = true; +static bool guc_enable_cdbstats = true; +static bool guc_enable_collector = true; + +void Config::init() { + DefineCustomStringVariable( + "yagpcc.uds_path", "Sets filesystem path of the agent socket", 0LL, + &guc_uds_path, "/tmp/yagpcc_agent.sock", PGC_SUSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, 0LL, 0LL, 0LL); + + DefineCustomBoolVariable( + "yagpcc.enable", "Enable metrics collector", 0LL, &guc_enable_collector, + true, PGC_SUSET, GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, 0LL, 0LL, 0LL); + + DefineCustomBoolVariable( + "yagpcc.enable_analyze", "Collect analyze metrics in yagpcc", 0LL, + &guc_enable_analyze, true, PGC_SUSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, 0LL, 0LL, 0LL); + + DefineCustomBoolVariable( + "yagpcc.enable_cdbstats", "Collect CDB metrics in yagpcc", 0LL, + &guc_enable_cdbstats, true, PGC_SUSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, 0LL, 0LL, 0LL); +} + +std::string Config::uds_path() { return guc_uds_path; } +bool Config::enable_analyze() { return guc_enable_analyze; } +bool Config::enable_cdbstats() { return guc_enable_cdbstats; } +bool Config::enable_collector() { return guc_enable_collector; } diff --git a/src/Config.h b/src/Config.h new file mode 100644 index 00000000000..117481f219b --- /dev/null +++ b/src/Config.h @@ -0,0 +1,12 @@ +#pragma once + +#include <string> + +class Config { +public: + static void init(); + static std::string uds_path(); + static bool enable_analyze(); + static bool enable_cdbstats(); + static bool enable_collector(); +}; \ No newline at end of file diff --git a/src/EventSender.cpp b/src/EventSender.cpp index 5ab6bbd60df..55858ed5183 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -1,3 +1,4 @@ +#include "Config.h" #include "GrpcConnector.h" #include "ProcStats.h" #include <ctime> @@ -13,6 +14,7 @@ extern "C" { #include "utils/elog.h" #include "utils/metrics_utils.h" +#include "cdb/cdbdisp.h" #include "cdb/cdbexplain.h" #include "cdb/cdbvars.h" @@ -25,6 +27,10 @@ void get_spill_info(int ssid, int ccid, int32_t *file_count, #include "EventSender.h" +#define need_collect() \ + (nesting_level == 0 && gp_command_count != 0 && \ + query_desc->sourceText != nullptr && Config::enable_collector()) + namespace { std::string *get_user_name() { @@ -39,6 +45,21 @@ std::string *get_db_name() { return result; } +std::string *get_rg_name() { + auto userId = GetUserId(); + if (!OidIsValid(userId)) + return nullptr; + auto groupId = GetResGroupIdForRole(userId); + if (!OidIsValid(groupId)) + return nullptr; + char *rgname = GetResGroupNameForId(groupId); + if (rgname == nullptr) + return nullptr; + auto result = new std::string(rgname); + pfree(rgname); + return result; +} + int get_cur_slice_id(QueryDesc *desc) { if (!desc->estate) { return 0; @@ -103,9 +124,10 @@ void set_query_text(yagpcc::QueryInfo *qi, QueryDesc *query_desc) { pfree(norm_query); } -void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *query_desc, +void set_query_info(yagpcc::SetQueryReq *req, QueryDesc *query_desc, bool with_text, bool with_plan) { if (Gp_session_role == GP_ROLE_DISPATCH) { + auto qi = req->mutable_query_info(); if (query_desc->sourceText && with_text) { set_query_text(qi, query_desc); } @@ -115,6 +137,7 @@ void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *query_desc, } qi->set_allocated_username(get_user_name()); qi->set_allocated_databasename(get_db_name()); + qi->set_allocated_rsgname(get_rg_name()); } } @@ -209,37 +232,79 @@ void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { } } +void EventSender::executor_before_start(QueryDesc *query_desc, + int /* eflags*/) { + if (Gp_role == GP_ROLE_DISPATCH && need_collect() && + Config::enable_analyze()) { + instr_time starttime; + query_desc->instrument_options |= INSTRUMENT_BUFFERS; + query_desc->instrument_options |= INSTRUMENT_ROWS; + query_desc->instrument_options |= INSTRUMENT_TIMER; + if (Config::enable_cdbstats()) { + query_desc->instrument_options |= INSTRUMENT_CDB; + + // TODO: there is a PR resolving some memory leak around auto-explain: + // https://github.com/greenplum-db/gpdb/pull/15164 + // Need to check if the memory leak applies here as well and fix it + Assert(query_desc->showstatctx == NULL); + INSTR_TIME_SET_CURRENT(starttime); + query_desc->showstatctx = + cdbexplain_showExecStatsBegin(query_desc, starttime); + } + } +} + void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) { - if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) { + if ((Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) && + need_collect()) { + auto req = + create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_START); + set_query_info(&req, query_desc, false, true); + send_query_info(&req, "started"); + } +} + +void EventSender::executor_end(QueryDesc *query_desc) { + if (!need_collect() || + (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE)) { return; } + if (query_desc->totaltime && Config::enable_analyze() && + Config::enable_cdbstats()) { + if (query_desc->estate->dispatcherState && + query_desc->estate->dispatcherState->primaryResults) { + cdbdisp_checkDispatchResult(query_desc->estate->dispatcherState, + DISPATCH_WAIT_NONE); + } + InstrEndLoop(query_desc->totaltime); + } auto req = - create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_START); - set_query_info(req.mutable_query_info(), query_desc, false, true); - send_query_info(&req, "started"); + create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_END); + set_query_info(&req, query_desc, false, false); + // NOTE: there are no cummulative spillinfo stats AFAIU, so no need to + // gather it here. It only makes sense when doing regular stat checks. + set_gp_metrics(req.mutable_query_metrics(), query_desc, + /*need_spillinfo*/ false); + send_query_info(&req, "ended"); } void EventSender::collect_query_submit(QueryDesc *query_desc) { - query_desc->instrument_options |= INSTRUMENT_BUFFERS; - query_desc->instrument_options |= INSTRUMENT_ROWS; - query_desc->instrument_options |= INSTRUMENT_TIMER; - - auto req = - create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT); - set_query_info(req.mutable_query_info(), query_desc, true, false); - send_query_info(&req, "submit"); + if (need_collect()) { + auto req = + create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT); + set_query_info(&req, query_desc, true, false); + send_query_info(&req, "submit"); + } } void EventSender::collect_query_done(QueryDesc *query_desc, const std::string &status) { - auto req = - create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_DONE); - set_query_info(req.mutable_query_info(), query_desc, false, false); - // NOTE: there are no cummulative spillinfo stats AFAIU, so no need to gather - // it here. It only makes sense when doing regular stat checks. - set_gp_metrics(req.mutable_query_metrics(), query_desc, - /*need_spillinfo*/ false); - send_query_info(&req, status); + if (need_collect()) { + auto req = + create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_DONE); + set_query_info(&req, query_desc, false, false); + send_query_info(&req, status); + } } void EventSender::send_query_info(yagpcc::SetQueryReq *req, @@ -257,4 +322,7 @@ EventSender *EventSender::instance() { return &sender; } -EventSender::EventSender() { connector = std::make_unique<GrpcConnector>(); } \ No newline at end of file +EventSender::EventSender() { + Config::init(); + connector = std::make_unique<GrpcConnector>(); +} \ No newline at end of file diff --git a/src/EventSender.h b/src/EventSender.h index 9c574cba9a1..9e2ef992f81 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -11,8 +11,12 @@ class SetQueryReq; class EventSender { public: + void executor_before_start(QueryDesc *query_desc, int eflags); void executor_after_start(QueryDesc *query_desc, int eflags); + void executor_end(QueryDesc *query_desc); void query_metrics_collect(QueryMetricsStatus status, void *arg); + void incr_depth() { nesting_level++; } + void decr_depth() { nesting_level--; } static EventSender *instance(); private: @@ -22,4 +26,6 @@ private: EventSender(); void send_query_info(yagpcc::SetQueryReq *req, const std::string &event); std::unique_ptr<GrpcConnector> connector; + + int nesting_level = 0; }; \ No newline at end of file diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp index 5a24d576de1..276c9ceb8a8 100644 --- a/src/GrpcConnector.cpp +++ b/src/GrpcConnector.cpp @@ -1,42 +1,86 @@ #include "GrpcConnector.h" +#include "Config.h" #include "yagpcc_set_service.grpc.pb.h" -#include <grpc++/grpc++.h> +#include <atomic> +#include <condition_variable> #include <grpc++/channel.h> +#include <grpc++/grpc++.h> +#include <mutex> #include <string> +#include <thread> + +extern "C" { +#include "postgres.h" +#include "cdb/cdbvars.h" +} class GrpcConnector::Impl { public: - Impl() { + Impl() : SOCKET_FILE("unix://" + Config::uds_path()) { GOOGLE_PROTOBUF_VERIFY_VERSION; - this->stub = yagpcc::SetQueryInfo::NewStub( - grpc::CreateChannel(SOCKET_FILE, grpc::InsecureChannelCredentials())); + channel = + grpc::CreateChannel(SOCKET_FILE, grpc::InsecureChannelCredentials()); + stub = yagpcc::SetQueryInfo::NewStub(channel); + connected = true; + done = false; + reconnect_thread = std::thread(&Impl::reconnect, this); + } + + ~Impl() { + done = true; + cv.notify_one(); + reconnect_thread.join(); } yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req) { yagpcc::MetricResponse response; + if (!connected) { + response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR); + response.set_error_text( + "Not tracing this query connection to agent has been lost"); + return response; + } grpc::ClientContext context; - // TODO: find a more secure way to send messages than relying on a fixed - // timeout + int timeout = Gp_role == GP_ROLE_DISPATCH ? 500 : 250; auto deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(200); + std::chrono::system_clock::now() + std::chrono::milliseconds(timeout); context.set_deadline(deadline); - grpc::Status status = (stub->SetMetricQuery)(&context, req, &response); - if (!status.ok()) { response.set_error_text("Connection lost: " + status.error_message() + "; " + status.error_details()); response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR); + connected = false; + cv.notify_one(); } return response; } private: - const std::string SOCKET_FILE = "unix:///tmp/yagpcc_agent.sock"; - const std::string TCP_ADDRESS = "127.0.0.1:1432"; + const std::string SOCKET_FILE; std::unique_ptr<yagpcc::SetQueryInfo::Stub> stub; + std::shared_ptr<grpc::Channel> channel; + std::atomic_bool connected; + std::thread reconnect_thread; + std::condition_variable cv; + std::mutex mtx; + bool done; + + void reconnect() { + while (!done) { + { + std::unique_lock<std::mutex> lock(mtx); + cv.wait(lock); + } + while (!connected && !done) { + auto deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(100); + connected = channel->WaitForConnected(deadline); + } + } + } }; GrpcConnector::GrpcConnector() { impl = new Impl(); } diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index be39c953970..edad5798e44 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -1,28 +1,42 @@ extern "C" { #include "postgres.h" -#include "utils/metrics_utils.h" -#include "utils/elog.h" #include "executor/executor.h" +#include "utils/elog.h" +#include "utils/metrics_utils.h" -#include "cdb/cdbvars.h" #include "cdb/cdbexplain.h" +#include "cdb/cdbvars.h" #include "tcop/utility.h" } -#include "stat_statements_parser/pg_stat_statements_ya_parser.h" -#include "hook_wrappers.h" +#include "Config.h" #include "EventSender.h" +#include "hook_wrappers.h" +#include "stat_statements_parser/pg_stat_statements_ya_parser.h" static ExecutorStart_hook_type previous_ExecutorStart_hook = nullptr; +static ExecutorRun_hook_type previous_ExecutorRun_hook = nullptr; +static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr; +static ExecutorEnd_hook_type previous_ExecutorEnd_hook = nullptr; static query_info_collect_hook_type previous_query_info_collect_hook = nullptr; -static void ya_ExecutorAfterStart_hook(QueryDesc *query_desc, int eflags); +static void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags); +static void ya_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction, + long count); +static void ya_ExecutorFinish_hook(QueryDesc *query_desc); +static void ya_ExecutorEnd_hook(QueryDesc *query_desc); static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg); void hooks_init() { previous_ExecutorStart_hook = ExecutorStart_hook; - ExecutorStart_hook = ya_ExecutorAfterStart_hook; + ExecutorStart_hook = ya_ExecutorStart_hook; + previous_ExecutorRun_hook = ExecutorRun_hook; + ExecutorRun_hook = ya_ExecutorRun_hook; + previous_ExecutorFinish_hook = ExecutorFinish_hook; + ExecutorFinish_hook = ya_ExecutorFinish_hook; + previous_ExecutorEnd_hook = ExecutorEnd_hook; + ExecutorEnd_hook = ya_ExecutorEnd_hook; previous_query_info_collect_hook = query_info_collect_hook; query_info_collect_hook = ya_query_info_collect_hook; stat_statements_parser_init(); @@ -30,11 +44,21 @@ void hooks_init() { void hooks_deinit() { ExecutorStart_hook = previous_ExecutorStart_hook; + ExecutorEnd_hook = previous_ExecutorEnd_hook; query_info_collect_hook = previous_query_info_collect_hook; stat_statements_parser_deinit(); } -void ya_ExecutorAfterStart_hook(QueryDesc *query_desc, int eflags) { +void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) { + PG_TRY(); + { EventSender::instance()->executor_before_start(query_desc, eflags); } + PG_CATCH(); + { + ereport(WARNING, + (errmsg("EventSender failed in ya_ExecutorBeforeStart_hook"))); + PG_RE_THROW(); + } + PG_END_TRY(); if (previous_ExecutorStart_hook) { (*previous_ExecutorStart_hook)(query_desc, eflags); } else { @@ -51,6 +75,59 @@ void ya_ExecutorAfterStart_hook(QueryDesc *query_desc, int eflags) { PG_END_TRY(); } +void ya_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction, + long count) { + EventSender::instance()->incr_depth(); + PG_TRY(); + { + if (previous_ExecutorRun_hook) + previous_ExecutorRun_hook(query_desc, direction, count); + else + standard_ExecutorRun(query_desc, direction, count); + EventSender::instance()->decr_depth(); + } + PG_CATCH(); + { + EventSender::instance()->decr_depth(); + PG_RE_THROW(); + } + PG_END_TRY(); +} + +void ya_ExecutorFinish_hook(QueryDesc *query_desc) { + EventSender::instance()->incr_depth(); + PG_TRY(); + { + if (previous_ExecutorFinish_hook) + previous_ExecutorFinish_hook(query_desc); + else + standard_ExecutorFinish(query_desc); + EventSender::instance()->decr_depth(); + } + PG_CATCH(); + { + EventSender::instance()->decr_depth(); + PG_RE_THROW(); + } + PG_END_TRY(); +} + +void ya_ExecutorEnd_hook(QueryDesc *query_desc) { + PG_TRY(); + { EventSender::instance()->executor_end(query_desc); } + PG_CATCH(); + { + ereport(WARNING, (errmsg("EventSender failed in ya_ExecutorEnd_hook"))); + PG_RE_THROW(); + } + PG_END_TRY(); + if (previous_ExecutorEnd_hook) { + (*previous_ExecutorEnd_hook)(query_desc); + } else { + standard_ExecutorEnd(query_desc); + } +} + void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg) { PG_TRY(); { EventSender::instance()->query_metrics_collect(status, arg); } diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.c b/src/stat_statements_parser/pg_stat_statements_ya_parser.c index 737e77745df..a37ac0ef0bf 100644 --- a/src/stat_statements_parser/pg_stat_statements_ya_parser.c +++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.c @@ -617,6 +617,13 @@ JumbleExpr(pgssJumbleState *jstate, Node *node) } break; /* GPDB nodes */ + case T_GroupingClause: + { + GroupingClause *grpnode = (GroupingClause *)node; + + JumbleExpr(jstate, (Node *)grpnode->groupsets); + } + break; case T_GroupingFunc: { GroupingFunc *grpnode = (GroupingFunc *)node; @@ -628,7 +635,27 @@ JumbleExpr(pgssJumbleState *jstate, Node *node) case T_GroupId: case T_Integer: case T_Value: - // TODO: no idea what to do with those + // TODO:seems like nothing to do with it + break; + /* GPDB-only additions, nothing to do */ + case T_PartitionBy: + case T_PartitionElem: + case T_PartitionRangeItem: + case T_PartitionBoundSpec: + case T_PartitionSpec: + case T_PartitionValuesSpec: + case T_AlterPartitionId: + case T_AlterPartitionCmd: + case T_InheritPartitionCmd: + case T_CreateFileSpaceStmt: + case T_FileSpaceEntry: + case T_DropFileSpaceStmt: + case T_TableValueExpr: + case T_DenyLoginInterval: + case T_DenyLoginPoint: + case T_AlterTypeStmt: + case T_SetDistributionCmd: + case T_ExpandStmtSpec: break; default: /* Only a warning, since we can stumble along anyway */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
