This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit d76bac80d746b37c0744f95f5d0b24afb32b9c4f
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Mon Nov 13 15:38:31 2023 +0300

    [yagp_hooks_collector] Add stat_messages() runtime statistics view
    
    Add SQL functions stat_messages() and stat_messages_reset()
    exposing per-segment UDS transport counters: total_messages,
    send_failures, connection_failures, other_errors, max_message_size.
---
 sql/yagp-hooks-collector--1.0.sql                  |  2 -
 sql/yagp-hooks-collector--unpackaged--1.0.sql      |  2 -
 sql/yagp_hooks_collector--1.0.sql                  | 55 +++++++++++++
 src/UDSConnector.cpp                               | 13 +++-
 src/UDSConnector.h                                 |  3 -
 src/YagpStat.cpp                                   | 91 ++++++++++++++++++++++
 src/YagpStat.h                                     | 21 +++++
 src/hook_wrappers.cpp                              | 52 ++++++++++++-
 src/hook_wrappers.h                                |  2 +
 src/yagp_hooks_collector.c                         | 13 +++-
 ...llector.control => yagp_hooks_collector.control |  4 +-
 11 files changed, 242 insertions(+), 16 deletions(-)

diff --git a/sql/yagp-hooks-collector--1.0.sql 
b/sql/yagp-hooks-collector--1.0.sql
deleted file mode 100644
index f9ab15fb400..00000000000
--- a/sql/yagp-hooks-collector--1.0.sql
+++ /dev/null
@@ -1,2 +0,0 @@
--- complain if script is sourced in psql, rather than via CREATE EXTENSION
-\echo Use '''CREATE EXTENSION "yagp-hooks-collector"''' to load this file. 
\quit
diff --git a/sql/yagp-hooks-collector--unpackaged--1.0.sql 
b/sql/yagp-hooks-collector--unpackaged--1.0.sql
deleted file mode 100644
index 0441c97bd84..00000000000
--- a/sql/yagp-hooks-collector--unpackaged--1.0.sql
+++ /dev/null
@@ -1,2 +0,0 @@
--- complain if script is sourced in psql, rather than via CREATE EXTENSION
-\echo Use '''CREATE EXTENSION "uuid-cb" FROM unpackaged''' to load this file. 
\quit
diff --git a/sql/yagp_hooks_collector--1.0.sql 
b/sql/yagp_hooks_collector--1.0.sql
new file mode 100644
index 00000000000..88bbe4e0dc7
--- /dev/null
+++ b/sql/yagp_hooks_collector--1.0.sql
@@ -0,0 +1,55 @@
+/* yagp_hooks_collector--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION yagp_hooks_collector" to load this file. \quit
+
+CREATE FUNCTION __yagp_stat_messages_reset_f_on_master()
+RETURNS void
+AS 'MODULE_PATHNAME', 'yagp_stat_messages_reset'
+LANGUAGE C EXECUTE ON MASTER;
+
+CREATE FUNCTION __yagp_stat_messages_reset_f_on_segments()
+RETURNS void
+AS 'MODULE_PATHNAME', 'yagp_stat_messages_reset'
+LANGUAGE C EXECUTE ON ALL SEGMENTS;
+
+CREATE FUNCTION yagp_stat_messages_reset()
+RETURNS void
+AS 
+$$
+  SELECT __yagp_stat_messages_reset_f_on_master();
+  SELECT __yagp_stat_messages_reset_f_on_segments();
+$$
+LANGUAGE SQL EXECUTE ON MASTER;
+
+CREATE FUNCTION __yagp_stat_messages_f_on_master()
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'yagp_stat_messages'
+LANGUAGE C STRICT VOLATILE EXECUTE ON MASTER;
+
+CREATE FUNCTION __yagp_stat_messages_f_on_segments()
+RETURNS SETOF record
+AS 'MODULE_PATHNAME', 'yagp_stat_messages'
+LANGUAGE C STRICT VOLATILE EXECUTE ON ALL SEGMENTS;
+
+CREATE VIEW yagp_stat_messages AS
+  SELECT C.*
+       FROM __yagp_stat_messages_f_on_master() as C (
+    segid int,
+    total_messages bigint,
+    send_failures bigint,
+    connection_failures bigint,
+    other_errors bigint,
+    max_message_size int
+       )
+  UNION ALL
+  SELECT C.*
+       FROM __yagp_stat_messages_f_on_segments() as C (
+    segid int,
+    total_messages bigint,
+    send_failures bigint,
+    connection_failures bigint,
+    other_errors bigint,
+    max_message_size int
+       )
+ORDER BY segid;
diff --git a/src/UDSConnector.cpp b/src/UDSConnector.cpp
index 339a5d4f374..b9088205250 100644
--- a/src/UDSConnector.cpp
+++ b/src/UDSConnector.cpp
@@ -1,5 +1,6 @@
 #include "UDSConnector.h"
 #include "Config.h"
+#include "YagpStat.h"
 
 #include <string>
 #include <unistd.h>
@@ -15,9 +16,7 @@ extern "C" {
 #include "cdb/cdbvars.h"
 }
 
-UDSConnector::UDSConnector() : uds_path("unix://" + Config::uds_path()) {
-  GOOGLE_PROTOBUF_VERIFY_VERSION;
-}
+UDSConnector::UDSConnector() { GOOGLE_PROTOBUF_VERIFY_VERSION; }
 
 static void inline log_tracing_failure(const yagpcc::SetQueryReq &req,
                                        const std::string &event) {
@@ -31,7 +30,7 @@ bool UDSConnector::report_query(const yagpcc::SetQueryReq 
&req,
                                 const std::string &event) {
   sockaddr_un address;
   address.sun_family = AF_UNIX;
-  strcpy(address.sun_path, uds_path.c_str());
+  strcpy(address.sun_path, Config::uds_path().c_str());
   bool success = true;
   auto sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
   if (sockfd != -1) {
@@ -58,12 +57,16 @@ bool UDSConnector::report_query(const yagpcc::SetQueryReq 
&req,
         if (sent < 0) {
           log_tracing_failure(req, event);
           success = false;
+          YagpStat::report_bad_send(total_size);
+        } else {
+          YagpStat::report_send(total_size);
         }
         pfree(buf);
       } else {
         // log the error and go on
         log_tracing_failure(req, event);
         success = false;
+        YagpStat::report_bad_connection();
       }
     } else {
       // That's a very important error that should never happen, so make it
@@ -72,12 +75,14 @@ bool UDSConnector::report_query(const yagpcc::SetQueryReq 
&req,
               (errmsg("Unable to create non-blocking socket connection %s",
                       strerror(errno))));
       success = false;
+      YagpStat::report_error();
     }
     close(sockfd);
   } else {
     // log the error and go on
     log_tracing_failure(req, event);
     success = false;
+    YagpStat::report_error();
   }
   return success;
 }
\ No newline at end of file
diff --git a/src/UDSConnector.h b/src/UDSConnector.h
index 574653023e6..42e0aa20968 100644
--- a/src/UDSConnector.h
+++ b/src/UDSConnector.h
@@ -7,7 +7,4 @@ class UDSConnector {
 public:
   UDSConnector();
   bool report_query(const yagpcc::SetQueryReq &req, const std::string &event);
-
-private:
-  const std::string uds_path;
 };
\ No newline at end of file
diff --git a/src/YagpStat.cpp b/src/YagpStat.cpp
new file mode 100644
index 00000000000..879cde85212
--- /dev/null
+++ b/src/YagpStat.cpp
@@ -0,0 +1,91 @@
+#include "YagpStat.h"
+
+#include <algorithm>
+
+extern "C" {
+#include "postgres.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+}
+
+namespace {
+struct ProtectedData {
+  slock_t mutex;
+  YagpStat::Data data;
+};
+shmem_startup_hook_type prev_shmem_startup_hook = NULL;
+ProtectedData *data = nullptr;
+
+void yagp_shmem_startup() {
+  if (prev_shmem_startup_hook)
+    prev_shmem_startup_hook();
+  LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+  bool found;
+  data = reinterpret_cast<ProtectedData *>(
+      ShmemInitStruct("yagp_stat_messages", sizeof(ProtectedData), &found));
+  if (!found) {
+    SpinLockInit(&data->mutex);
+    data->data = YagpStat::Data();
+  }
+  LWLockRelease(AddinShmemInitLock);
+}
+
+class LockGuard {
+public:
+  LockGuard(slock_t *mutex) : mutex_(mutex) { SpinLockAcquire(mutex_); }
+  ~LockGuard() { SpinLockRelease(mutex_); }
+
+private:
+  slock_t *mutex_;
+};
+} // namespace
+
+void YagpStat::init() {
+  if (!process_shared_preload_libraries_in_progress)
+    return;
+  RequestAddinShmemSpace(sizeof(ProtectedData));
+  prev_shmem_startup_hook = shmem_startup_hook;
+  shmem_startup_hook = yagp_shmem_startup;
+}
+
+void YagpStat::deinit() { shmem_startup_hook = prev_shmem_startup_hook; }
+
+void YagpStat::reset() {
+  LockGuard lg(&data->mutex);
+  data->data = YagpStat::Data();
+}
+
+void YagpStat::report_send(int32_t msg_size) {
+  LockGuard lg(&data->mutex);
+  data->data.total++;
+  data->data.max_message_size = std::max(msg_size, 
data->data.max_message_size);
+}
+
+void YagpStat::report_bad_connection() {
+  LockGuard lg(&data->mutex);
+  data->data.total++;
+  data->data.failed_connects++;
+}
+
+void YagpStat::report_bad_send(int32_t msg_size) {
+  LockGuard lg(&data->mutex);
+  data->data.total++;
+  data->data.failed_sends++;
+  data->data.max_message_size = std::max(msg_size, 
data->data.max_message_size);
+}
+
+void YagpStat::report_error() {
+  LockGuard lg(&data->mutex);
+  data->data.total++;
+  data->data.failed_other++;
+}
+
+YagpStat::Data YagpStat::get_stats() {
+  LockGuard lg(&data->mutex);
+  return data->data;
+}
+
+bool YagpStat::loaded() { return data != nullptr; }
diff --git a/src/YagpStat.h b/src/YagpStat.h
new file mode 100644
index 00000000000..110b1fdcbb1
--- /dev/null
+++ b/src/YagpStat.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <cstdint>
+
+class YagpStat {
+public:
+  struct Data {
+    int64_t total, failed_sends, failed_connects, failed_other;
+    int32_t max_message_size;
+  };
+
+  static void init();
+  static void deinit();
+  static void reset();
+  static void report_send(int32_t msg_size);
+  static void report_bad_connection();
+  static void report_bad_send(int32_t msg_size);
+  static void report_error();
+  static Data get_stats();
+  static bool loaded();
+};
\ No newline at end of file
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index 66ba6547ce2..37f80385a6b 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -1,16 +1,17 @@
 extern "C" {
 #include "postgres.h"
+#include "funcapi.h"
 #include "executor/executor.h"
 #include "utils/elog.h"
+#include "utils/builtins.h"
 #include "utils/metrics_utils.h"
-
 #include "cdb/cdbexplain.h"
 #include "cdb/cdbvars.h"
-
 #include "tcop/utility.h"
 }
 
 #include "Config.h"
+#include "YagpStat.h"
 #include "EventSender.h"
 #include "hook_wrappers.h"
 #include "stat_statements_parser/pg_stat_statements_ya_parser.h"
@@ -39,6 +40,7 @@ static inline EventSender *get_sender() {
 
 void hooks_init() {
   Config::init();
+  YagpStat::init();
   previous_ExecutorStart_hook = ExecutorStart_hook;
   ExecutorStart_hook = ya_ExecutorStart_hook;
   previous_ExecutorRun_hook = ExecutorRun_hook;
@@ -62,6 +64,7 @@ void hooks_deinit() {
   if (sender) {
     delete sender;
   }
+  YagpStat::deinit();
 }
 
 void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) {
@@ -150,4 +153,49 @@ void ya_query_info_collect_hook(QueryMetricsStatus status, 
void *arg) {
   if (previous_query_info_collect_hook) {
     (*previous_query_info_collect_hook)(status, arg);
   }
+}
+
+static void check_stats_loaded() {
+  if (!YagpStat::loaded()) {
+    ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                    errmsg("yagp_hooks_collector must be loaded via "
+                           "shared_preload_libraries")));
+  }
+}
+
+void yagp_functions_reset() {
+  check_stats_loaded();
+  YagpStat::reset();
+}
+
+Datum yagp_functions_get(FunctionCallInfo fcinfo) {
+  const int ATTNUM = 6;
+  check_stats_loaded();
+  auto stats = YagpStat::get_stats();
+  TupleDesc tupdesc = CreateTemplateTupleDesc(ATTNUM, false);
+  TupleDescInitEntry(tupdesc, (AttrNumber)1, "segid", INT4OID, -1 /* typmod */,
+                     0 /* attdim */);
+  TupleDescInitEntry(tupdesc, (AttrNumber)2, "total_messages", INT8OID,
+                     -1 /* typmod */, 0 /* attdim */);
+  TupleDescInitEntry(tupdesc, (AttrNumber)3, "send_failures", INT8OID,
+                     -1 /* typmod */, 0 /* attdim */);
+  TupleDescInitEntry(tupdesc, (AttrNumber)4, "connection_failures", INT8OID,
+                     -1 /* typmod */, 0 /* attdim */);
+  TupleDescInitEntry(tupdesc, (AttrNumber)5, "other_errors", INT8OID,
+                     -1 /* typmod */, 0 /* attdim */);
+  TupleDescInitEntry(tupdesc, (AttrNumber)6, "max_message_size", INT4OID,
+                     -1 /* typmod */, 0 /* attdim */);
+  tupdesc = BlessTupleDesc(tupdesc);
+  Datum values[ATTNUM];
+  bool nulls[ATTNUM];
+  MemSet(nulls, 0, sizeof(nulls));
+  values[0] = Int32GetDatum(GpIdentity.segindex);
+  values[1] = Int64GetDatum(stats.total);
+  values[2] = Int64GetDatum(stats.failed_sends);
+  values[3] = Int64GetDatum(stats.failed_connects);
+  values[4] = Int64GetDatum(stats.failed_other);
+  values[5] = Int32GetDatum(stats.max_message_size);
+  HeapTuple tuple = heap_form_tuple(tupdesc, values, nulls);
+  Datum result = HeapTupleGetDatum(tuple);
+  PG_RETURN_DATUM(result);
 }
\ No newline at end of file
diff --git a/src/hook_wrappers.h b/src/hook_wrappers.h
index 815fcb7cd51..c158f42cf1d 100644
--- a/src/hook_wrappers.h
+++ b/src/hook_wrappers.h
@@ -6,6 +6,8 @@ extern "C" {
 
 extern void hooks_init();
 extern void hooks_deinit();
+extern void yagp_functions_reset();
+extern Datum yagp_functions_get(FunctionCallInfo fcinfo);
 
 #ifdef __cplusplus
 }
diff --git a/src/yagp_hooks_collector.c b/src/yagp_hooks_collector.c
index 69475ea5079..2a9e7328e6d 100644
--- a/src/yagp_hooks_collector.c
+++ b/src/yagp_hooks_collector.c
@@ -1,6 +1,6 @@
 #include "postgres.h"
 #include "cdb/cdbvars.h"
-#include "fmgr.h"
+#include "utils/builtins.h"
 
 #include "hook_wrappers.h"
 
@@ -8,6 +8,8 @@ PG_MODULE_MAGIC;
 
 void _PG_init(void);
 void _PG_fini(void);
+PG_FUNCTION_INFO_V1(yagp_stat_messages_reset);
+PG_FUNCTION_INFO_V1(yagp_stat_messages);
 
 void _PG_init(void) {
   if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) {
@@ -20,3 +22,12 @@ void _PG_fini(void) {
     hooks_deinit();
   }
 }
+
+Datum yagp_stat_messages_reset(PG_FUNCTION_ARGS) {
+  yagp_functions_reset();
+  PG_RETURN_VOID();
+}
+
+Datum yagp_stat_messages(PG_FUNCTION_ARGS) {
+  return yagp_functions_get(fcinfo);
+}
\ No newline at end of file
diff --git a/yagp-hooks-collector.control b/yagp_hooks_collector.control
similarity index 61%
rename from yagp-hooks-collector.control
rename to yagp_hooks_collector.control
index 82c189a88fc..b5539dd6462 100644
--- a/yagp-hooks-collector.control
+++ b/yagp_hooks_collector.control
@@ -1,5 +1,5 @@
-# yagp-hooks-collector extension
+# yagp_hooks_collector extension
 comment = 'Intercept query and plan execution hooks and report them to Yandex 
GPCC agents'
 default_version = '1.0'
-module_pathname = '$libdir/yagp-hooks-collector'
+module_pathname = '$libdir/yagp_hooks_collector'
 superuser = true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to