This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 1c1bf580b06764892cb30df704f8c0c4afecb19b
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Tue Jun 13 16:51:40 2023 +0300

    [yagp_hooks_collector] Fix EventSender and GrpcConnector in forked processes
    
    Delay initialization of static singletons and GRPC connections to
    actual query handling time rather than _PG_init, since both are
    incompatible with fork().
---
 debian/control        |  4 ++--
 src/EventSender.cpp   | 10 ++--------
 src/EventSender.h     |  6 ++----
 src/GrpcConnector.cpp | 33 ++++++++++++++++++++++-----------
 src/hook_wrappers.cpp | 33 +++++++++++++++++++++++----------
 5 files changed, 51 insertions(+), 35 deletions(-)

diff --git a/debian/control b/debian/control
index c740a8590ca..07176e94be5 100644
--- a/debian/control
+++ b/debian/control
@@ -2,10 +2,10 @@ Source: greenplum-6-yagpcc-hooks
 Section: misc
 Priority: optional
 Maintainer: Maxim Smyatkin <[email protected]>
-Build-Depends: make, gcc, g++, debhelper (>=9), greenplum-db-6 (>=6.19.3), 
protobuf-compiler, protobuf-compiler-grpc, libgrpc++1, libgrpc++-dev
+Build-Depends: make, gcc, g++, debhelper (>=9), greenplum-db-6 (>=6.19.3), 
ya-grpc (=1.46-57-50820-02384e3918-yandex)
 Standards-Version: 3.9.8
 
 Package: greenplum-6-yagpcc-hooks
 Architecture: any
-Depends: ${misc:Depends}, ${shlibs:Depends}, greenplum-db-6 (>=6.19.3)
+Depends: ${misc:Depends}, ${shlibs:Depends}, greenplum-db-6 (>=6.19.3), 
ya-grpc (=1.46-57-50820-02384e3918-yandex)
 Description: Greenplum extension to send query execution metrics to yandex 
command center agent
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index b1f85cf9f1e..ec966e8686c 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -329,12 +329,6 @@ void EventSender::send_query_info(yagpcc::SetQueryReq *req,
   }
 }
 
-EventSender *EventSender::instance() {
-  static EventSender sender;
-  return &sender;
-}
+EventSender::EventSender() { connector = std::make_unique<GrpcConnector>(); }
 
-EventSender::EventSender() {
-  Config::init();
-  connector = std::make_unique<GrpcConnector>();
-}
\ No newline at end of file
+EventSender::~EventSender() { connector.release(); }
\ No newline at end of file
diff --git a/src/EventSender.h b/src/EventSender.h
index 9e2ef992f81..92e6937a690 100644
--- a/src/EventSender.h
+++ b/src/EventSender.h
@@ -17,15 +17,13 @@ public:
   void query_metrics_collect(QueryMetricsStatus status, void *arg);
   void incr_depth() { nesting_level++; }
   void decr_depth() { nesting_level--; }
-  static EventSender *instance();
+  EventSender();
+  ~EventSender();
 
 private:
   void collect_query_submit(QueryDesc *query_desc);
   void collect_query_done(QueryDesc *query_desc, const std::string &status);
-
-  EventSender();
   void send_query_info(yagpcc::SetQueryReq *req, const std::string &event);
   std::unique_ptr<GrpcConnector> connector;
-
   int nesting_level = 0;
 };
\ No newline at end of file
diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp
index 276c9ceb8a8..966bfb4a780 100644
--- a/src/GrpcConnector.cpp
+++ b/src/GrpcConnector.cpp
@@ -10,14 +10,17 @@
 #include <string>
 #include <thread>
 
-extern "C" {
+extern "C"
+{
 #include "postgres.h"
 #include "cdb/cdbvars.h"
 }
 
-class GrpcConnector::Impl {
+class GrpcConnector::Impl
+{
 public:
-  Impl() : SOCKET_FILE("unix://" + Config::uds_path()) {
+  Impl() : SOCKET_FILE("unix://" + Config::uds_path())
+  {
     GOOGLE_PROTOBUF_VERIFY_VERSION;
     channel =
         grpc::CreateChannel(SOCKET_FILE, grpc::InsecureChannelCredentials());
@@ -27,15 +30,18 @@ public:
     reconnect_thread = std::thread(&Impl::reconnect, this);
   }
 
-  ~Impl() {
+  ~Impl()
+  {
     done = true;
     cv.notify_one();
     reconnect_thread.join();
   }
 
-  yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req) {
+  yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req)
+  {
     yagpcc::MetricResponse response;
-    if (!connected) {
+    if (!connected)
+    {
       response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR);
       response.set_error_text(
           "Not tracing this query connection to agent has been lost");
@@ -47,7 +53,8 @@ public:
         std::chrono::system_clock::now() + std::chrono::milliseconds(timeout);
     context.set_deadline(deadline);
     grpc::Status status = (stub->SetMetricQuery)(&context, req, &response);
-    if (!status.ok()) {
+    if (!status.ok())
+    {
       response.set_error_text("Connection lost: " + status.error_message() +
                               "; " + status.error_details());
       response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR);
@@ -68,13 +75,16 @@ private:
   std::mutex mtx;
   bool done;
 
-  void reconnect() {
-    while (!done) {
+  void reconnect()
+  {
+    while (!done)
+    {
       {
         std::unique_lock<std::mutex> lock(mtx);
         cv.wait(lock);
       }
-      while (!connected && !done) {
+      while (!connected && !done)
+      {
         auto deadline =
             std::chrono::system_clock::now() + std::chrono::milliseconds(100);
         connected = channel->WaitForConnected(deadline);
@@ -88,6 +98,7 @@ GrpcConnector::GrpcConnector() { impl = new Impl(); }
 GrpcConnector::~GrpcConnector() { delete impl; }
 
 yagpcc::MetricResponse
-GrpcConnector::set_metric_query(yagpcc::SetQueryReq req) {
+GrpcConnector::set_metric_query(yagpcc::SetQueryReq req)
+{
   return impl->set_metric_query(req);
 }
\ No newline at end of file
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index a904dc9bafd..66ba6547ce2 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -28,7 +28,17 @@ static void ya_ExecutorFinish_hook(QueryDesc *query_desc);
 static void ya_ExecutorEnd_hook(QueryDesc *query_desc);
 static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg);
 
+static EventSender *sender = nullptr;
+
+static inline EventSender *get_sender() {
+  if (!sender) {
+    sender = new EventSender();
+  }
+  return sender;
+}
+
 void hooks_init() {
+  Config::init();
   previous_ExecutorStart_hook = ExecutorStart_hook;
   ExecutorStart_hook = ya_ExecutorStart_hook;
   previous_ExecutorRun_hook = ExecutorRun_hook;
@@ -49,11 +59,14 @@ void hooks_deinit() {
   ExecutorEnd_hook = previous_ExecutorEnd_hook;
   query_info_collect_hook = previous_query_info_collect_hook;
   stat_statements_parser_deinit();
+  if (sender) {
+    delete sender;
+  }
 }
 
 void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) {
   PG_TRY();
-  { EventSender::instance()->executor_before_start(query_desc, eflags); }
+  { get_sender()->executor_before_start(query_desc, eflags); }
   PG_CATCH();
   {
     ereport(WARNING,
@@ -66,7 +79,7 @@ void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) 
{
     standard_ExecutorStart(query_desc, eflags);
   }
   PG_TRY();
-  { EventSender::instance()->executor_after_start(query_desc, eflags); }
+  { get_sender()->executor_after_start(query_desc, eflags); }
   PG_CATCH();
   {
     ereport(WARNING,
@@ -77,36 +90,36 @@ void ya_ExecutorStart_hook(QueryDesc *query_desc, int 
eflags) {
 
 void ya_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction,
                          long count) {
-  EventSender::instance()->incr_depth();
+  get_sender()->incr_depth();
   PG_TRY();
   {
     if (previous_ExecutorRun_hook)
       previous_ExecutorRun_hook(query_desc, direction, count);
     else
       standard_ExecutorRun(query_desc, direction, count);
-    EventSender::instance()->decr_depth();
+    get_sender()->decr_depth();
   }
   PG_CATCH();
   {
-    EventSender::instance()->decr_depth();
+    get_sender()->decr_depth();
     PG_RE_THROW();
   }
   PG_END_TRY();
 }
 
 void ya_ExecutorFinish_hook(QueryDesc *query_desc) {
-  EventSender::instance()->incr_depth();
+  get_sender()->incr_depth();
   PG_TRY();
   {
     if (previous_ExecutorFinish_hook)
       previous_ExecutorFinish_hook(query_desc);
     else
       standard_ExecutorFinish(query_desc);
-    EventSender::instance()->decr_depth();
+    get_sender()->decr_depth();
   }
   PG_CATCH();
   {
-    EventSender::instance()->decr_depth();
+    get_sender()->decr_depth();
     PG_RE_THROW();
   }
   PG_END_TRY();
@@ -114,7 +127,7 @@ void ya_ExecutorFinish_hook(QueryDesc *query_desc) {
 
 void ya_ExecutorEnd_hook(QueryDesc *query_desc) {
   PG_TRY();
-  { EventSender::instance()->executor_end(query_desc); }
+  { get_sender()->executor_end(query_desc); }
   PG_CATCH();
   { ereport(WARNING, (errmsg("EventSender failed in ya_ExecutorEnd_hook"))); }
   PG_END_TRY();
@@ -127,7 +140,7 @@ void ya_ExecutorEnd_hook(QueryDesc *query_desc) {
 
 void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg) {
   PG_TRY();
-  { EventSender::instance()->query_metrics_collect(status, arg); }
+  { get_sender()->query_metrics_collect(status, arg); }
   PG_CATCH();
   {
     ereport(WARNING,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to