This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit 729b35f86fd5df36beba2b7f7ee7a7dd2d1ef4d2 Author: Maxim Smyatkin <[email protected]> AuthorDate: Mon Apr 10 16:01:08 2023 +0300 [yagp_hooks_collector] Switch to query_info_collect_hook and fix stability Use query_info_collect_hook for finer-grained lifecycle tracking. Fix two segfaults in early init paths. Skip hooks in UTILITY mode. General robustness improvements. --- protos/yagpcc_set_service.proto | 7 +- src/EventSender.cpp | 207 ++++++++++++++------- src/EventSender.h | 13 +- src/GrpcConnector.cpp | 4 +- src/GrpcConnector.h | 2 +- src/hook_wrappers.cpp | 65 ++++--- .../pg_stat_statements_ya_parser.c | 21 +++ 7 files changed, 206 insertions(+), 113 deletions(-) diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto index 97c5691a6f5..93c2f5a01d1 100644 --- a/protos/yagpcc_set_service.proto +++ b/protos/yagpcc_set_service.proto @@ -30,9 +30,10 @@ message SetQueryReq { QueryStatus query_status = 1; google.protobuf.Timestamp datetime = 2; QueryKey query_key = 3; - QueryInfo query_info = 4; - GPMetrics query_metrics = 5; - repeated MetricPlan plan_tree = 6; + SegmentKey segment_key = 4; + QueryInfo query_info = 5; + GPMetrics query_metrics = 6; + repeated MetricPlan plan_tree = 7; } message SetPlanNodeReq { diff --git a/src/EventSender.cpp b/src/EventSender.cpp index b7c3cd70b85..5ab6bbd60df 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -1,29 +1,30 @@ -#include "EventSender.h" #include "GrpcConnector.h" #include "ProcStats.h" -#include "protos/yagpcc_set_service.pb.h" #include <ctime> extern "C" { #include "postgres.h" + #include "access/hash.h" -#include "utils/metrics_utils.h" -#include "utils/elog.h" -#include "executor/executor.h" -#include "commands/explain.h" #include "commands/dbcommands.h" +#include "commands/explain.h" #include "commands/resgroupcmds.h" +#include "executor/executor.h" +#include "utils/elog.h" +#include "utils/metrics_utils.h" -#include "cdb/cdbvars.h" #include "cdb/cdbexplain.h" +#include "cdb/cdbvars.h" +#include "stat_statements_parser/pg_stat_statements_ya_parser.h" #include "tcop/utility.h" -#include "pg_stat_statements_ya_parser.h" void get_spill_info(int ssid, int ccid, int32_t *file_count, int64_t *total_bytes); } +#include "EventSender.h" + namespace { std::string *get_user_name() { @@ -102,90 +103,152 @@ void set_query_text(yagpcc::QueryInfo *qi, QueryDesc *query_desc) { pfree(norm_query); } -void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *query_desc) { - if (query_desc->sourceText) { - set_query_text(qi, query_desc); +void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *query_desc, + bool with_text, bool with_plan) { + if (Gp_session_role == GP_ROLE_DISPATCH) { + if (query_desc->sourceText && with_text) { + set_query_text(qi, query_desc); + } + if (query_desc->plannedstmt && with_plan) { + set_query_plan(qi, query_desc); + qi->set_query_id(query_desc->plannedstmt->queryId); + } + qi->set_allocated_username(get_user_name()); + qi->set_allocated_databasename(get_db_name()); } - if (query_desc->plannedstmt) { - set_query_plan(qi, query_desc); - qi->set_query_id(query_desc->plannedstmt->queryId); - } - qi->set_allocated_username(get_user_name()); - qi->set_allocated_databasename(get_db_name()); } void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics, QueryDesc *query_desc) { auto instrument = query_desc->planstate->instrument; - metrics->set_ntuples(instrument->ntuples); - metrics->set_nloops(instrument->nloops); - metrics->set_tuplecount(instrument->tuplecount); - metrics->set_firsttuple(instrument->firsttuple); - metrics->set_startup(instrument->startup); - metrics->set_total(instrument->total); - auto &buffusage = instrument->bufusage; - metrics->set_shared_blks_hit(buffusage.shared_blks_hit); - metrics->set_shared_blks_read(buffusage.shared_blks_read); - metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied); - metrics->set_shared_blks_written(buffusage.shared_blks_written); - metrics->set_local_blks_hit(buffusage.local_blks_hit); - metrics->set_local_blks_read(buffusage.local_blks_read); - metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied); - metrics->set_local_blks_written(buffusage.local_blks_written); - metrics->set_temp_blks_read(buffusage.temp_blks_read); - metrics->set_temp_blks_written(buffusage.temp_blks_written); - metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time)); - metrics->set_blk_write_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time)); -} - -void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc) { - int32_t n_spill_files = 0; - int64_t n_spill_bytes = 0; - get_spill_info(gp_session_id, gp_command_count, &n_spill_files, - &n_spill_bytes); - metrics->mutable_spill()->set_filecount(n_spill_files); - metrics->mutable_spill()->set_totalbytes(n_spill_bytes); - if (query_desc->planstate->instrument) { + if (instrument) { + metrics->set_ntuples(instrument->ntuples); + metrics->set_nloops(instrument->nloops); + metrics->set_tuplecount(instrument->tuplecount); + metrics->set_firsttuple(instrument->firsttuple); + metrics->set_startup(instrument->startup); + metrics->set_total(instrument->total); + auto &buffusage = instrument->bufusage; + metrics->set_shared_blks_hit(buffusage.shared_blks_hit); + metrics->set_shared_blks_read(buffusage.shared_blks_read); + metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied); + metrics->set_shared_blks_written(buffusage.shared_blks_written); + metrics->set_local_blks_hit(buffusage.local_blks_hit); + metrics->set_local_blks_read(buffusage.local_blks_read); + metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied); + metrics->set_local_blks_written(buffusage.local_blks_written); + metrics->set_temp_blks_read(buffusage.temp_blks_read); + metrics->set_temp_blks_written(buffusage.temp_blks_written); + metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time)); + metrics->set_blk_write_time( + INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time)); + } +} + +void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc, + bool need_spillinfo) { + if (need_spillinfo) { + int32_t n_spill_files = 0; + int64_t n_spill_bytes = 0; + get_spill_info(gp_session_id, gp_command_count, &n_spill_files, + &n_spill_bytes); + metrics->mutable_spill()->set_filecount(n_spill_files); + metrics->mutable_spill()->set_totalbytes(n_spill_bytes); + } + if (query_desc->planstate && query_desc->planstate->instrument) { set_metric_instrumentation(metrics->mutable_instrumentation(), query_desc); } fill_self_stats(metrics->mutable_systemstat()); } +yagpcc::SetQueryReq create_query_req(QueryDesc *query_desc, + yagpcc::QueryStatus status) { + yagpcc::SetQueryReq req; + req.set_query_status(status); + *req.mutable_datetime() = current_ts(); + set_query_key(req.mutable_query_key(), query_desc); + set_segment_key(req.mutable_segment_key(), query_desc); + return req; +} + } // namespace -void EventSender::ExecutorStart(QueryDesc *query_desc, int /* eflags*/) { +void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { + if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) { + return; + } + switch (status) { + case METRICS_PLAN_NODE_INITIALIZE: + case METRICS_PLAN_NODE_EXECUTING: + case METRICS_PLAN_NODE_FINISHED: + // TODO + break; + case METRICS_QUERY_SUBMIT: + collect_query_submit(reinterpret_cast<QueryDesc *>(arg)); + break; + case METRICS_QUERY_START: + // no-op: executor_after_start is enough + break; + case METRICS_QUERY_DONE: + collect_query_done(reinterpret_cast<QueryDesc *>(arg), "done"); + break; + case METRICS_QUERY_ERROR: + collect_query_done(reinterpret_cast<QueryDesc *>(arg), "error"); + break; + case METRICS_QUERY_CANCELING: + collect_query_done(reinterpret_cast<QueryDesc *>(arg), "calcelling"); + break; + case METRICS_QUERY_CANCELED: + collect_query_done(reinterpret_cast<QueryDesc *>(arg), "cancelled"); + break; + case METRICS_INNER_QUERY_DONE: + // TODO + break; + default: + elog(FATAL, "Unknown query status: %d", status); + } +} + +void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) { + if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) { + return; + } + auto req = + create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_START); + set_query_info(req.mutable_query_info(), query_desc, false, true); + send_query_info(&req, "started"); +} + +void EventSender::collect_query_submit(QueryDesc *query_desc) { query_desc->instrument_options |= INSTRUMENT_BUFFERS; query_desc->instrument_options |= INSTRUMENT_ROWS; query_desc->instrument_options |= INSTRUMENT_TIMER; - elog(DEBUG1, "Query %s start recording", query_desc->sourceText); - yagpcc::SetQueryReq req; - req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START); - *req.mutable_datetime() = current_ts(); - set_query_key(req.mutable_query_key(), query_desc); - auto result = connector->set_metric_query(req); - if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) { - elog(WARNING, "Query %s start reporting failed with an error %s", - query_desc->sourceText, result.error_text().c_str()); - } else { - elog(DEBUG1, "Query %s start successful", query_desc->sourceText); - } + auto req = + create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT); + set_query_info(req.mutable_query_info(), query_desc, true, false); + send_query_info(&req, "submit"); } -void EventSender::ExecutorFinish(QueryDesc *query_desc) { - elog(DEBUG1, "Query %s finish recording", query_desc->sourceText); - yagpcc::SetQueryReq req; - req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE); - *req.mutable_datetime() = current_ts(); - set_query_key(req.mutable_query_key(), query_desc); - set_query_info(req.mutable_query_info(), query_desc); - set_gp_metrics(req.mutable_query_metrics(), query_desc); - auto result = connector->set_metric_query(req); +void EventSender::collect_query_done(QueryDesc *query_desc, + const std::string &status) { + auto req = + create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_DONE); + set_query_info(req.mutable_query_info(), query_desc, false, false); + // NOTE: there are no cummulative spillinfo stats AFAIU, so no need to gather + // it here. It only makes sense when doing regular stat checks. + set_gp_metrics(req.mutable_query_metrics(), query_desc, + /*need_spillinfo*/ false); + send_query_info(&req, status); +} + +void EventSender::send_query_info(yagpcc::SetQueryReq *req, + const std::string &event) { + auto result = connector->set_metric_query(*req); if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) { - elog(WARNING, "Query %s finish reporting failed with an error %s", - query_desc->sourceText, result.error_text().c_str()); - } else { - elog(DEBUG1, "Query %s finish successful", query_desc->sourceText); + elog(WARNING, "Query {%d-%d-%d} %s reporting failed with an error %s", + req->query_key().tmid(), req->query_key().ssid(), + req->query_key().ccnt(), event.c_str(), result.error_text().c_str()); } } diff --git a/src/EventSender.h b/src/EventSender.h index d69958db9b0..9c574cba9a1 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -1,18 +1,25 @@ #pragma once #include <memory> +#include <string> class GrpcConnector; - struct QueryDesc; +namespace yagpcc { +class SetQueryReq; +} class EventSender { public: - void ExecutorStart(QueryDesc *query_desc, int eflags); - void ExecutorFinish(QueryDesc *query_desc); + void executor_after_start(QueryDesc *query_desc, int eflags); + void query_metrics_collect(QueryMetricsStatus status, void *arg); static EventSender *instance(); private: + void collect_query_submit(QueryDesc *query_desc); + void collect_query_done(QueryDesc *query_desc, const std::string &status); + EventSender(); + void send_query_info(yagpcc::SetQueryReq *req, const std::string &event); std::unique_ptr<GrpcConnector> connector; }; \ No newline at end of file diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp index 1a820404428..bca1acd9ce2 100644 --- a/src/GrpcConnector.cpp +++ b/src/GrpcConnector.cpp @@ -16,8 +16,10 @@ public: yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req) { yagpcc::MetricResponse response; grpc::ClientContext context; + // TODO: find a more secure way to send messages than relying on a fixed + // timeout auto deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(50); + std::chrono::system_clock::now() + std::chrono::milliseconds(200); context.set_deadline(deadline); grpc::Status status = (stub->SetMetricQuery)(&context, req, &response); diff --git a/src/GrpcConnector.h b/src/GrpcConnector.h index 810c0bd3e15..4fca6960a4e 100644 --- a/src/GrpcConnector.h +++ b/src/GrpcConnector.h @@ -1,6 +1,6 @@ #pragma once -#include "yagpcc_set_service.pb.h" +#include "protos/yagpcc_set_service.pb.h" class GrpcConnector { public: diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index 739cca80f01..be39c953970 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -1,6 +1,3 @@ -#include "hook_wrappers.h" -#include "EventSender.h" - extern "C" { #include "postgres.h" #include "utils/metrics_utils.h" @@ -14,55 +11,57 @@ extern "C" { } #include "stat_statements_parser/pg_stat_statements_ya_parser.h" +#include "hook_wrappers.h" +#include "EventSender.h" static ExecutorStart_hook_type previous_ExecutorStart_hook = nullptr; -static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr; - -static void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags); -static void ya_ExecutorFinish_hook(QueryDesc *query_desc); +static query_info_collect_hook_type previous_query_info_collect_hook = nullptr; -#define REPLACE_HOOK(hookName) \ - previous_##hookName = hookName; \ - hookName = ya_##hookName; +static void ya_ExecutorAfterStart_hook(QueryDesc *query_desc, int eflags); +static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg); void hooks_init() { - REPLACE_HOOK(ExecutorStart_hook); - REPLACE_HOOK(ExecutorFinish_hook); + previous_ExecutorStart_hook = ExecutorStart_hook; + ExecutorStart_hook = ya_ExecutorAfterStart_hook; + previous_query_info_collect_hook = query_info_collect_hook; + query_info_collect_hook = ya_query_info_collect_hook; stat_statements_parser_init(); } void hooks_deinit() { ExecutorStart_hook = previous_ExecutorStart_hook; - ExecutorFinish_hook = previous_ExecutorFinish_hook; + query_info_collect_hook = previous_query_info_collect_hook; stat_statements_parser_deinit(); } -#define CREATE_HOOK_WRAPPER(hookName, ...) \ - PG_TRY(); \ - { EventSender::instance()->hookName(__VA_ARGS__); } \ - PG_CATCH(); \ - { \ - ereport(WARNING, (errmsg("EventSender failed in %s", #hookName))); \ - PG_RE_THROW(); \ - } \ - PG_END_TRY(); \ - if (previous_##hookName##_hook) \ - (*previous_##hookName##_hook)(__VA_ARGS__); \ - else \ - standard_##hookName(__VA_ARGS__); - -void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) { - CREATE_HOOK_WRAPPER(ExecutorStart, query_desc, eflags); +void ya_ExecutorAfterStart_hook(QueryDesc *query_desc, int eflags) { + if (previous_ExecutorStart_hook) { + (*previous_ExecutorStart_hook)(query_desc, eflags); + } else { + standard_ExecutorStart(query_desc, eflags); + } PG_TRY(); - { EventSender::instance()->ExecutorStart(query_desc, eflags); } + { EventSender::instance()->executor_after_start(query_desc, eflags); } PG_CATCH(); { - ereport(WARNING, (errmsg("EventSender failed in ExecutorStart afterhook"))); + ereport(WARNING, + (errmsg("EventSender failed in ya_ExecutorAfterStart_hook"))); PG_RE_THROW(); } PG_END_TRY(); } -void ya_ExecutorFinish_hook(QueryDesc *query_desc) { - CREATE_HOOK_WRAPPER(ExecutorFinish, query_desc); +void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg) { + PG_TRY(); + { EventSender::instance()->query_metrics_collect(status, arg); } + PG_CATCH(); + { + ereport(WARNING, + (errmsg("EventSender failed in ya_query_info_collect_hook"))); + PG_RE_THROW(); + } + PG_END_TRY(); + if (previous_query_info_collect_hook) { + (*previous_query_info_collect_hook)(status, arg); + } } \ No newline at end of file diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.c b/src/stat_statements_parser/pg_stat_statements_ya_parser.c index ae79e7dc40a..737e77745df 100644 --- a/src/stat_statements_parser/pg_stat_statements_ya_parser.c +++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.c @@ -205,6 +205,13 @@ JumbleRangeTable(pgssJumbleState *jstate, List *rtable) APP_JUMB_STRING(rte->ctename); APP_JUMB(rte->ctelevelsup); break; + /* GPDB RTEs */ + case RTE_VOID: + break; + case RTE_TABLEFUNCTION: + JumbleQuery(jstate, rte->subquery); + JumbleExpr(jstate, (Node *)rte->functions); + break; default: elog(ERROR, "unrecognized RTE kind: %d", (int)rte->rtekind); break; @@ -609,6 +616,20 @@ JumbleExpr(pgssJumbleState *jstate, Node *node) JumbleExpr(jstate, rtfunc->funcexpr); } break; + /* GPDB nodes */ + case T_GroupingFunc: + { + GroupingFunc *grpnode = (GroupingFunc *)node; + + JumbleExpr(jstate, (Node *)grpnode->args); + } + break; + case T_Grouping: + case T_GroupId: + case T_Integer: + case T_Value: + // TODO: no idea what to do with those + break; default: /* Only a warning, since we can stumble along anyway */ elog(WARNING, "unrecognized node type: %d", --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
