This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit d76bac80d746b37c0744f95f5d0b24afb32b9c4f Author: Maxim Smyatkin <[email protected]> AuthorDate: Mon Nov 13 15:38:31 2023 +0300 [yagp_hooks_collector] Add stat_messages() runtime statistics view Add SQL functions stat_messages() and stat_messages_reset() exposing per-segment UDS transport counters: total_messages, send_failures, connection_failures, other_errors, max_message_size. --- sql/yagp-hooks-collector--1.0.sql | 2 - sql/yagp-hooks-collector--unpackaged--1.0.sql | 2 - sql/yagp_hooks_collector--1.0.sql | 55 +++++++++++++ src/UDSConnector.cpp | 13 +++- src/UDSConnector.h | 3 - src/YagpStat.cpp | 91 ++++++++++++++++++++++ src/YagpStat.h | 21 +++++ src/hook_wrappers.cpp | 52 ++++++++++++- src/hook_wrappers.h | 2 + src/yagp_hooks_collector.c | 13 +++- ...llector.control => yagp_hooks_collector.control | 4 +- 11 files changed, 242 insertions(+), 16 deletions(-) diff --git a/sql/yagp-hooks-collector--1.0.sql b/sql/yagp-hooks-collector--1.0.sql deleted file mode 100644 index f9ab15fb400..00000000000 --- a/sql/yagp-hooks-collector--1.0.sql +++ /dev/null @@ -1,2 +0,0 @@ --- complain if script is sourced in psql, rather than via CREATE EXTENSION -\echo Use '''CREATE EXTENSION "yagp-hooks-collector"''' to load this file. \quit diff --git a/sql/yagp-hooks-collector--unpackaged--1.0.sql b/sql/yagp-hooks-collector--unpackaged--1.0.sql deleted file mode 100644 index 0441c97bd84..00000000000 --- a/sql/yagp-hooks-collector--unpackaged--1.0.sql +++ /dev/null @@ -1,2 +0,0 @@ --- complain if script is sourced in psql, rather than via CREATE EXTENSION -\echo Use '''CREATE EXTENSION "uuid-cb" FROM unpackaged''' to load this file. \quit diff --git a/sql/yagp_hooks_collector--1.0.sql b/sql/yagp_hooks_collector--1.0.sql new file mode 100644 index 00000000000..88bbe4e0dc7 --- /dev/null +++ b/sql/yagp_hooks_collector--1.0.sql @@ -0,0 +1,55 @@ +/* yagp_hooks_collector--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION yagp_hooks_collector" to load this file. \quit + +CREATE FUNCTION __yagp_stat_messages_reset_f_on_master() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_stat_messages_reset' +LANGUAGE C EXECUTE ON MASTER; + +CREATE FUNCTION __yagp_stat_messages_reset_f_on_segments() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_stat_messages_reset' +LANGUAGE C EXECUTE ON ALL SEGMENTS; + +CREATE FUNCTION yagp_stat_messages_reset() +RETURNS void +AS +$$ + SELECT __yagp_stat_messages_reset_f_on_master(); + SELECT __yagp_stat_messages_reset_f_on_segments(); +$$ +LANGUAGE SQL EXECUTE ON MASTER; + +CREATE FUNCTION __yagp_stat_messages_f_on_master() +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'yagp_stat_messages' +LANGUAGE C STRICT VOLATILE EXECUTE ON MASTER; + +CREATE FUNCTION __yagp_stat_messages_f_on_segments() +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'yagp_stat_messages' +LANGUAGE C STRICT VOLATILE EXECUTE ON ALL SEGMENTS; + +CREATE VIEW yagp_stat_messages AS + SELECT C.* + FROM __yagp_stat_messages_f_on_master() as C ( + segid int, + total_messages bigint, + send_failures bigint, + connection_failures bigint, + other_errors bigint, + max_message_size int + ) + UNION ALL + SELECT C.* + FROM __yagp_stat_messages_f_on_segments() as C ( + segid int, + total_messages bigint, + send_failures bigint, + connection_failures bigint, + other_errors bigint, + max_message_size int + ) +ORDER BY segid; diff --git a/src/UDSConnector.cpp b/src/UDSConnector.cpp index 339a5d4f374..b9088205250 100644 --- a/src/UDSConnector.cpp +++ b/src/UDSConnector.cpp @@ -1,5 +1,6 @@ #include "UDSConnector.h" #include "Config.h" +#include "YagpStat.h" #include <string> #include <unistd.h> @@ -15,9 +16,7 @@ extern "C" { #include "cdb/cdbvars.h" } -UDSConnector::UDSConnector() : uds_path("unix://" + Config::uds_path()) { - GOOGLE_PROTOBUF_VERIFY_VERSION; -} +UDSConnector::UDSConnector() { GOOGLE_PROTOBUF_VERIFY_VERSION; } static void inline log_tracing_failure(const yagpcc::SetQueryReq &req, const std::string &event) { @@ -31,7 +30,7 @@ bool UDSConnector::report_query(const yagpcc::SetQueryReq &req, const std::string &event) { sockaddr_un address; address.sun_family = AF_UNIX; - strcpy(address.sun_path, uds_path.c_str()); + strcpy(address.sun_path, Config::uds_path().c_str()); bool success = true; auto sockfd = socket(AF_UNIX, SOCK_STREAM, 0); if (sockfd != -1) { @@ -58,12 +57,16 @@ bool UDSConnector::report_query(const yagpcc::SetQueryReq &req, if (sent < 0) { log_tracing_failure(req, event); success = false; + YagpStat::report_bad_send(total_size); + } else { + YagpStat::report_send(total_size); } pfree(buf); } else { // log the error and go on log_tracing_failure(req, event); success = false; + YagpStat::report_bad_connection(); } } else { // That's a very important error that should never happen, so make it @@ -72,12 +75,14 @@ bool UDSConnector::report_query(const yagpcc::SetQueryReq &req, (errmsg("Unable to create non-blocking socket connection %s", strerror(errno)))); success = false; + YagpStat::report_error(); } close(sockfd); } else { // log the error and go on log_tracing_failure(req, event); success = false; + YagpStat::report_error(); } return success; } \ No newline at end of file diff --git a/src/UDSConnector.h b/src/UDSConnector.h index 574653023e6..42e0aa20968 100644 --- a/src/UDSConnector.h +++ b/src/UDSConnector.h @@ -7,7 +7,4 @@ class UDSConnector { public: UDSConnector(); bool report_query(const yagpcc::SetQueryReq &req, const std::string &event); - -private: - const std::string uds_path; }; \ No newline at end of file diff --git a/src/YagpStat.cpp b/src/YagpStat.cpp new file mode 100644 index 00000000000..879cde85212 --- /dev/null +++ b/src/YagpStat.cpp @@ -0,0 +1,91 @@ +#include "YagpStat.h" + +#include <algorithm> + +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "storage/spin.h" +} + +namespace { +struct ProtectedData { + slock_t mutex; + YagpStat::Data data; +}; +shmem_startup_hook_type prev_shmem_startup_hook = NULL; +ProtectedData *data = nullptr; + +void yagp_shmem_startup() { + if (prev_shmem_startup_hook) + prev_shmem_startup_hook(); + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + bool found; + data = reinterpret_cast<ProtectedData *>( + ShmemInitStruct("yagp_stat_messages", sizeof(ProtectedData), &found)); + if (!found) { + SpinLockInit(&data->mutex); + data->data = YagpStat::Data(); + } + LWLockRelease(AddinShmemInitLock); +} + +class LockGuard { +public: + LockGuard(slock_t *mutex) : mutex_(mutex) { SpinLockAcquire(mutex_); } + ~LockGuard() { SpinLockRelease(mutex_); } + +private: + slock_t *mutex_; +}; +} // namespace + +void YagpStat::init() { + if (!process_shared_preload_libraries_in_progress) + return; + RequestAddinShmemSpace(sizeof(ProtectedData)); + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = yagp_shmem_startup; +} + +void YagpStat::deinit() { shmem_startup_hook = prev_shmem_startup_hook; } + +void YagpStat::reset() { + LockGuard lg(&data->mutex); + data->data = YagpStat::Data(); +} + +void YagpStat::report_send(int32_t msg_size) { + LockGuard lg(&data->mutex); + data->data.total++; + data->data.max_message_size = std::max(msg_size, data->data.max_message_size); +} + +void YagpStat::report_bad_connection() { + LockGuard lg(&data->mutex); + data->data.total++; + data->data.failed_connects++; +} + +void YagpStat::report_bad_send(int32_t msg_size) { + LockGuard lg(&data->mutex); + data->data.total++; + data->data.failed_sends++; + data->data.max_message_size = std::max(msg_size, data->data.max_message_size); +} + +void YagpStat::report_error() { + LockGuard lg(&data->mutex); + data->data.total++; + data->data.failed_other++; +} + +YagpStat::Data YagpStat::get_stats() { + LockGuard lg(&data->mutex); + return data->data; +} + +bool YagpStat::loaded() { return data != nullptr; } diff --git a/src/YagpStat.h b/src/YagpStat.h new file mode 100644 index 00000000000..110b1fdcbb1 --- /dev/null +++ b/src/YagpStat.h @@ -0,0 +1,21 @@ +#pragma once + +#include <cstdint> + +class YagpStat { +public: + struct Data { + int64_t total, failed_sends, failed_connects, failed_other; + int32_t max_message_size; + }; + + static void init(); + static void deinit(); + static void reset(); + static void report_send(int32_t msg_size); + static void report_bad_connection(); + static void report_bad_send(int32_t msg_size); + static void report_error(); + static Data get_stats(); + static bool loaded(); +}; \ No newline at end of file diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index 66ba6547ce2..37f80385a6b 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -1,16 +1,17 @@ extern "C" { #include "postgres.h" +#include "funcapi.h" #include "executor/executor.h" #include "utils/elog.h" +#include "utils/builtins.h" #include "utils/metrics_utils.h" - #include "cdb/cdbexplain.h" #include "cdb/cdbvars.h" - #include "tcop/utility.h" } #include "Config.h" +#include "YagpStat.h" #include "EventSender.h" #include "hook_wrappers.h" #include "stat_statements_parser/pg_stat_statements_ya_parser.h" @@ -39,6 +40,7 @@ static inline EventSender *get_sender() { void hooks_init() { Config::init(); + YagpStat::init(); previous_ExecutorStart_hook = ExecutorStart_hook; ExecutorStart_hook = ya_ExecutorStart_hook; previous_ExecutorRun_hook = ExecutorRun_hook; @@ -62,6 +64,7 @@ void hooks_deinit() { if (sender) { delete sender; } + YagpStat::deinit(); } void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) { @@ -150,4 +153,49 @@ void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg) { if (previous_query_info_collect_hook) { (*previous_query_info_collect_hook)(status, arg); } +} + +static void check_stats_loaded() { + if (!YagpStat::loaded()) { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("yagp_hooks_collector must be loaded via " + "shared_preload_libraries"))); + } +} + +void yagp_functions_reset() { + check_stats_loaded(); + YagpStat::reset(); +} + +Datum yagp_functions_get(FunctionCallInfo fcinfo) { + const int ATTNUM = 6; + check_stats_loaded(); + auto stats = YagpStat::get_stats(); + TupleDesc tupdesc = CreateTemplateTupleDesc(ATTNUM, false); + TupleDescInitEntry(tupdesc, (AttrNumber)1, "segid", INT4OID, -1 /* typmod */, + 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber)2, "total_messages", INT8OID, + -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber)3, "send_failures", INT8OID, + -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber)4, "connection_failures", INT8OID, + -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber)5, "other_errors", INT8OID, + -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber)6, "max_message_size", INT4OID, + -1 /* typmod */, 0 /* attdim */); + tupdesc = BlessTupleDesc(tupdesc); + Datum values[ATTNUM]; + bool nulls[ATTNUM]; + MemSet(nulls, 0, sizeof(nulls)); + values[0] = Int32GetDatum(GpIdentity.segindex); + values[1] = Int64GetDatum(stats.total); + values[2] = Int64GetDatum(stats.failed_sends); + values[3] = Int64GetDatum(stats.failed_connects); + values[4] = Int64GetDatum(stats.failed_other); + values[5] = Int32GetDatum(stats.max_message_size); + HeapTuple tuple = heap_form_tuple(tupdesc, values, nulls); + Datum result = HeapTupleGetDatum(tuple); + PG_RETURN_DATUM(result); } \ No newline at end of file diff --git a/src/hook_wrappers.h b/src/hook_wrappers.h index 815fcb7cd51..c158f42cf1d 100644 --- a/src/hook_wrappers.h +++ b/src/hook_wrappers.h @@ -6,6 +6,8 @@ extern "C" { extern void hooks_init(); extern void hooks_deinit(); +extern void yagp_functions_reset(); +extern Datum yagp_functions_get(FunctionCallInfo fcinfo); #ifdef __cplusplus } diff --git a/src/yagp_hooks_collector.c b/src/yagp_hooks_collector.c index 69475ea5079..2a9e7328e6d 100644 --- a/src/yagp_hooks_collector.c +++ b/src/yagp_hooks_collector.c @@ -1,6 +1,6 @@ #include "postgres.h" #include "cdb/cdbvars.h" -#include "fmgr.h" +#include "utils/builtins.h" #include "hook_wrappers.h" @@ -8,6 +8,8 @@ PG_MODULE_MAGIC; void _PG_init(void); void _PG_fini(void); +PG_FUNCTION_INFO_V1(yagp_stat_messages_reset); +PG_FUNCTION_INFO_V1(yagp_stat_messages); void _PG_init(void) { if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) { @@ -20,3 +22,12 @@ void _PG_fini(void) { hooks_deinit(); } } + +Datum yagp_stat_messages_reset(PG_FUNCTION_ARGS) { + yagp_functions_reset(); + PG_RETURN_VOID(); +} + +Datum yagp_stat_messages(PG_FUNCTION_ARGS) { + return yagp_functions_get(fcinfo); +} \ No newline at end of file diff --git a/yagp-hooks-collector.control b/yagp_hooks_collector.control similarity index 61% rename from yagp-hooks-collector.control rename to yagp_hooks_collector.control index 82c189a88fc..b5539dd6462 100644 --- a/yagp-hooks-collector.control +++ b/yagp_hooks_collector.control @@ -1,5 +1,5 @@ -# yagp-hooks-collector extension +# yagp_hooks_collector extension comment = 'Intercept query and plan execution hooks and report them to Yandex GPCC agents' default_version = '1.0' -module_pathname = '$libdir/yagp-hooks-collector' +module_pathname = '$libdir/yagp_hooks_collector' superuser = true --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
