This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit c09ea5d21a11604993fc3a6bb2ece50b1a007698
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Mon Oct 2 12:54:32 2023 +0300

    [yagp_hooks_collector] Replace GRPC transport with protobuf-over-UDS
    
    Remove GRPC dependency.  Serialize metrics as protobuf messages and
    deliver them over a Unix domain socket.  Replace server-side message
    queue with incremental per-query message building.  Add clang-format
    configuration.  Use deprecated protobuf API for bionic compatibility.
---
 .clang-format                   |   2 +
 protos/yagpcc_set_service.proto |  23 ++-----
 src/EventSender.cpp             | 115 +++++++++++++++++++++-------------
 src/EventSender.h               |  10 ++-
 src/GrpcConnector.cpp           | 133 ----------------------------------------
 src/GrpcConnector.h             |  15 -----
 src/UDSConnector.cpp            |  83 +++++++++++++++++++++++++
 src/UDSConnector.h              |  13 ++++
 8 files changed, 183 insertions(+), 211 deletions(-)

diff --git a/.clang-format b/.clang-format
new file mode 100644
index 00000000000..99130575c9a
--- /dev/null
+++ b/.clang-format
@@ -0,0 +1,2 @@
+BasedOnStyle: LLVM
+SortIncludes: false
diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto
index 93c2f5a01d1..e8fc7aaa99d 100644
--- a/protos/yagpcc_set_service.proto
+++ b/protos/yagpcc_set_service.proto
@@ -9,23 +9,6 @@ package yagpcc;
 option java_outer_classname = "SegmentYAGPCCAS";
 option go_package = 
"a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/agent_segment;greenplum";
 
-service SetQueryInfo {
-    rpc SetMetricPlanNode (SetPlanNodeReq) returns (MetricResponse) {}
-
-    rpc SetMetricQuery (SetQueryReq) returns (MetricResponse) {}
-}
-
-message MetricResponse {
-    MetricResponseStatusCode error_code = 1;
-    string error_text = 2;
-}
-
-enum MetricResponseStatusCode {
-    METRIC_RESPONSE_STATUS_CODE_UNSPECIFIED = 0;
-    METRIC_RESPONSE_STATUS_CODE_SUCCESS = 1;
-    METRIC_RESPONSE_STATUS_CODE_ERROR = 2;
-}
-
 message SetQueryReq {
     QueryStatus           query_status = 1;
     google.protobuf.Timestamp datetime = 2;
@@ -34,6 +17,9 @@ message SetQueryReq {
     QueryInfo               query_info = 5;
     GPMetrics            query_metrics = 6;
     repeated MetricPlan      plan_tree = 7;
+    google.protobuf.Timestamp submit_time = 8;
+    google.protobuf.Timestamp start_time  = 9;
+    google.protobuf.Timestamp end_time    = 10;
 }
 
 message SetPlanNodeReq {
@@ -43,4 +29,7 @@ message SetPlanNodeReq {
     SegmentKey             segment_key = 4;
     GPMetrics             node_metrics = 5;
     MetricPlan               plan_node = 6;
+    google.protobuf.Timestamp submit_time = 7;
+    google.protobuf.Timestamp start_time  = 8;
+    google.protobuf.Timestamp end_time    = 9;
 }
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index 9146078fd0e..834553a6187 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -1,6 +1,6 @@
 #include "Config.h"
-#include "GrpcConnector.h"
 #include "ProcStats.h"
+#include "UDSConnector.h"
 #include <chrono>
 #include <ctime>
 
@@ -15,7 +15,6 @@ extern "C" {
 #include "commands/resgroupcmds.h"
 #include "executor/executor.h"
 #include "utils/elog.h"
-#include "utils/metrics_utils.h"
 #include "utils/workfile_mgr.h"
 
 #include "cdb/cdbdisp.h"
@@ -102,33 +101,46 @@ void set_plan_text(std::string *plan_text, QueryDesc 
*query_desc) {
   *plan_text = std::string(es.str->data, es.str->len);
 }
 
-void set_query_plan(yagpcc::QueryInfo *qi, QueryDesc *query_desc) {
-  qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER
-                        ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER
-                        : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER);
-  set_plan_text(qi->mutable_plan_text(), query_desc);
-  StringInfo norm_plan = gen_normplan(qi->plan_text().c_str());
-  *qi->mutable_template_plan_text() = std::string(norm_plan->data);
-  qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, norm_plan->len));
+void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
+  if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->plannedstmt) {
+    auto qi = req->mutable_query_info();
+    qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER
+                          ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER
+                          : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER);
+    set_plan_text(qi->mutable_plan_text(), query_desc);
+    StringInfo norm_plan = gen_normplan(qi->plan_text().c_str());
+    *qi->mutable_template_plan_text() = std::string(norm_plan->data);
+    qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, 
norm_plan->len));
+    // TODO: For now assume queryid equal to planid, which is wrong. The
+    // reason for doing so this bug
+    // https://github.com/greenplum-db/gpdb/pull/15385 (ORCA loses
+    // pg_stat_statements` queryid during planning phase). Need to fix it
+    // upstream, cherry-pick and bump gp
+    // qi->set_query_id(query_desc->plannedstmt->queryId);
+    qi->set_query_id(qi->plan_id());
+  }
 }
 
-void set_query_text(yagpcc::QueryInfo *qi, QueryDesc *query_desc) {
-  *qi->mutable_query_text() = query_desc->sourceText;
-  char *norm_query = gen_normquery(query_desc->sourceText);
-  *qi->mutable_template_query_text() = std::string(norm_query);
+void set_query_text(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
+  if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->sourceText) {
+    auto qi = req->mutable_query_info();
+    *qi->mutable_query_text() = query_desc->sourceText;
+    char *norm_query = gen_normquery(query_desc->sourceText);
+    *qi->mutable_template_query_text() = std::string(norm_query);
+  }
 }
 
-void set_query_info(yagpcc::SetQueryReq *req, QueryDesc *query_desc,
-                    bool with_text, bool with_plan) {
+void clear_big_fields(yagpcc::SetQueryReq *req) {
+  if (Gp_session_role == GP_ROLE_DISPATCH) {
+    auto qi = req->mutable_query_info();
+    qi->clear_plan_text();
+    qi->clear_query_text();
+  }
+}
+
+void set_query_info(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
   if (Gp_session_role == GP_ROLE_DISPATCH) {
     auto qi = req->mutable_query_info();
-    if (query_desc->sourceText && with_text) {
-      set_query_text(qi, query_desc);
-    }
-    if (query_desc->plannedstmt && with_plan) {
-      set_query_plan(qi, query_desc);
-      qi->set_query_id(query_desc->plannedstmt->queryId);
-    }
     qi->set_allocated_username(get_user_name());
     qi->set_allocated_databasename(get_db_name());
     qi->set_allocated_rsgname(get_rg_name());
@@ -245,6 +257,10 @@ void EventSender::executor_before_start(QueryDesc 
*query_desc,
   if (!need_collect()) {
     return;
   }
+  if (query_msg->has_query_key()) {
+    connector->report_query(*query_msg, "previous query");
+    query_msg->Clear();
+  }
   query_start_time = std::chrono::high_resolution_clock::now();
   WorkfileResetBackendStats();
   if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze()) {
@@ -268,10 +284,12 @@ void EventSender::executor_after_start(QueryDesc 
*query_desc, int /* eflags*/) {
   }
   if ((Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) &&
       need_collect()) {
-    auto req =
-        create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_START);
-    set_query_info(&req, query_desc, false, true);
-    connector->set_metric_query(req, "started");
+    query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
+    *query_msg->mutable_start_time() = current_ts();
+    set_query_plan(query_msg, query_desc);
+    if (connector->report_query(*query_msg, "started")) {
+      clear_big_fields(query_msg);
+    }
   }
 }
 
@@ -284,21 +302,21 @@ void EventSender::executor_end(QueryDesc *query_desc) {
     return;
   }
   /* TODO: when querying via CURSOR this call freezes. Need to investigate.
-     To reproduce - uncomment it and run installchecks. It will freeze around 
join test.
-     Needs investigation
-    
+     To reproduce - uncomment it and run installchecks. It will freeze around
+  join test. Needs investigation
+
     if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze() &&
       Config::enable_cdbstats() && query_desc->estate->dispatcherState &&
       query_desc->estate->dispatcherState->primaryResults) {
     cdbdisp_checkDispatchResult(query_desc->estate->dispatcherState,
                                 DISPATCH_WAIT_NONE);
   }*/
-  auto req =
-      create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_END);
-  // NOTE: there are no cummulative spillinfo stats AFAIU, so no need to
-  // gather it here. It only makes sense when doing regular stat checks.
-  set_gp_metrics(req.mutable_query_metrics(), query_desc);
-  connector->set_metric_query(req, "ended");
+  query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END);
+  *query_msg->mutable_end_time() = current_ts();
+  set_gp_metrics(query_msg->mutable_query_metrics(), query_desc);
+  if (connector->report_query(*query_msg, "ended")) {
+    query_msg->Clear();
+  }
 }
 
 void EventSender::collect_query_submit(QueryDesc *query_desc) {
@@ -306,10 +324,14 @@ void EventSender::collect_query_submit(QueryDesc 
*query_desc) {
     return;
   }
   if (need_collect()) {
-    auto req =
+    *query_msg =
         create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT);
-    set_query_info(&req, query_desc, true, false);
-    connector->set_metric_query(req, "submit");
+    *query_msg->mutable_submit_time() = current_ts();
+    set_query_info(query_msg, query_desc);
+    set_query_text(query_msg, query_desc);
+    if (connector->report_query(*query_msg, "submit")) {
+      clear_big_fields(query_msg);
+    }
   }
 }
 
@@ -319,20 +341,25 @@ void EventSender::collect_query_done(QueryDesc 
*query_desc,
     return;
   }
   if (need_collect()) {
-    auto req =
-        create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_DONE);
-    connector->set_metric_query(req, status);
+    query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE);
+    if (connector->report_query(*query_msg, status)) {
+      clear_big_fields(query_msg);
+    }
   }
 }
 
 EventSender::EventSender() {
   if (Config::enable_collector() && !Config::filter_user(get_user_name())) {
+    query_msg = new yagpcc::SetQueryReq();
     try {
-      connector = new GrpcConnector();
+      connector = new UDSConnector();
     } catch (const std::exception &e) {
       ereport(INFO, (errmsg("Unable to start query tracing %s", e.what())));
     }
   }
 }
 
-EventSender::~EventSender() { delete connector; }
\ No newline at end of file
+EventSender::~EventSender() {
+  delete query_msg;
+  delete connector;
+}
\ No newline at end of file
diff --git a/src/EventSender.h b/src/EventSender.h
index 2af8b7ffa03..161bf6ce037 100644
--- a/src/EventSender.h
+++ b/src/EventSender.h
@@ -1,9 +1,14 @@
 #pragma once
 
 #include <memory>
+#include <queue>
 #include <string>
 
-class GrpcConnector;
+extern "C" {
+#include "utils/metrics_utils.h"
+}
+
+class UDSConnector;
 struct QueryDesc;
 namespace yagpcc {
 class SetQueryReq;
@@ -23,6 +28,7 @@ public:
 private:
   void collect_query_submit(QueryDesc *query_desc);
   void collect_query_done(QueryDesc *query_desc, const std::string &status);
-  GrpcConnector *connector = nullptr;
+  UDSConnector *connector = nullptr;
   int nesting_level = 0;
+  yagpcc::SetQueryReq *query_msg;
 };
\ No newline at end of file
diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp
deleted file mode 100644
index 73c1944fa04..00000000000
--- a/src/GrpcConnector.cpp
+++ /dev/null
@@ -1,133 +0,0 @@
-#include "GrpcConnector.h"
-#include "Config.h"
-#include "yagpcc_set_service.grpc.pb.h"
-
-#include <atomic>
-#include <condition_variable>
-#include <grpc++/channel.h>
-#include <grpc++/grpc++.h>
-#include <mutex>
-#include <pthread.h>
-#include <signal.h>
-#include <string>
-#include <thread>
-
-extern "C" {
-#include "postgres.h"
-#include "cdb/cdbvars.h"
-}
-
-/*
- * Set up the thread signal mask, we don't want to run our signal handlers
- * in downloading and uploading threads.
- */
-static void MaskThreadSignals() {
-  sigset_t sigs;
-
-  if (pthread_equal(main_tid, pthread_self())) {
-    ereport(ERROR, (errmsg("thread_mask is called from main thread!")));
-    return;
-  }
-
-  sigemptyset(&sigs);
-
-  /* make our thread to ignore these signals (which should allow that they be
-   * delivered to the main thread) */
-  sigaddset(&sigs, SIGHUP);
-  sigaddset(&sigs, SIGINT);
-  sigaddset(&sigs, SIGTERM);
-  sigaddset(&sigs, SIGALRM);
-  sigaddset(&sigs, SIGUSR1);
-  sigaddset(&sigs, SIGUSR2);
-
-  pthread_sigmask(SIG_BLOCK, &sigs, NULL);
-}
-
-class GrpcConnector::Impl {
-public:
-  Impl() : SOCKET_FILE("unix://" + Config::uds_path()) {
-    GOOGLE_PROTOBUF_VERIFY_VERSION;
-    channel =
-        grpc::CreateChannel(SOCKET_FILE, grpc::InsecureChannelCredentials());
-    stub = yagpcc::SetQueryInfo::NewStub(channel);
-    connected = true;
-    reconnected = false;
-    done = false;
-    reconnect_thread = std::thread(&Impl::reconnect, this);
-  }
-
-  ~Impl() {
-    done = true;
-    cv.notify_one();
-    reconnect_thread.join();
-  }
-
-  yagpcc::MetricResponse set_metric_query(const yagpcc::SetQueryReq &req,
-                                          const std::string &event) {
-    yagpcc::MetricResponse response;
-    if (!connected) {
-      response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR);
-      response.set_error_text(
-          "Not tracing this query because grpc connection has been lost");
-      return response;
-    } else if (reconnected) {
-      reconnected = false;
-      ereport(LOG, (errmsg("GRPC connection is restored")));
-    }
-    grpc::ClientContext context;
-    int timeout = Gp_role == GP_ROLE_DISPATCH ? 500 : 250;
-    auto deadline =
-        std::chrono::system_clock::now() + std::chrono::milliseconds(timeout);
-    context.set_deadline(deadline);
-    grpc::Status status = (stub->SetMetricQuery)(&context, req, &response);
-    if (!status.ok()) {
-      response.set_error_text("GRPC error: " + status.error_message() + "; " +
-                              status.error_details());
-      response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR);
-      ereport(LOG, (errmsg("Query {%d-%d-%d} %s tracing failed with error %s",
-                           req.query_key().tmid(), req.query_key().ssid(),
-                           req.query_key().ccnt(), event.c_str(),
-                           response.error_text().c_str())));
-      connected = false;
-      reconnected = false;
-      cv.notify_one();
-    }
-
-    return response;
-  }
-
-private:
-  const std::string SOCKET_FILE;
-  std::unique_ptr<yagpcc::SetQueryInfo::Stub> stub;
-  std::shared_ptr<grpc::Channel> channel;
-  std::atomic_bool connected, reconnected, done;
-  std::thread reconnect_thread;
-  std::condition_variable cv;
-  std::mutex mtx;
-
-  void reconnect() {
-    MaskThreadSignals();
-    while (!done) {
-      {
-        std::unique_lock<std::mutex> lock(mtx);
-        cv.wait(lock);
-      }
-      while (!connected && !done) {
-        auto deadline =
-            std::chrono::system_clock::now() + std::chrono::milliseconds(100);
-        connected = channel->WaitForConnected(deadline);
-        reconnected = connected.load();
-      }
-    }
-  }
-};
-
-GrpcConnector::GrpcConnector() { impl = new Impl(); }
-
-GrpcConnector::~GrpcConnector() { delete impl; }
-
-yagpcc::MetricResponse
-GrpcConnector::set_metric_query(const yagpcc::SetQueryReq &req,
-                                const std::string &event) {
-  return impl->set_metric_query(req, event);
-}
\ No newline at end of file
diff --git a/src/GrpcConnector.h b/src/GrpcConnector.h
deleted file mode 100644
index 6571c626dfd..00000000000
--- a/src/GrpcConnector.h
+++ /dev/null
@@ -1,15 +0,0 @@
-#pragma once
-
-#include "protos/yagpcc_set_service.pb.h"
-
-class GrpcConnector {
-public:
-  GrpcConnector();
-  ~GrpcConnector();
-  yagpcc::MetricResponse set_metric_query(const yagpcc::SetQueryReq &req,
-                                          const std::string &event);
-
-private:
-  class Impl;
-  Impl *impl;
-};
\ No newline at end of file
diff --git a/src/UDSConnector.cpp b/src/UDSConnector.cpp
new file mode 100644
index 00000000000..339a5d4f374
--- /dev/null
+++ b/src/UDSConnector.cpp
@@ -0,0 +1,83 @@
+#include "UDSConnector.h"
+#include "Config.h"
+
+#include <string>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/types.h>
+#include <sys/fcntl.h>
+#include <chrono>
+#include <thread>
+
+extern "C" {
+#include "postgres.h"
+#include "cdb/cdbvars.h"
+}
+
+UDSConnector::UDSConnector() : uds_path("unix://" + Config::uds_path()) {
+  GOOGLE_PROTOBUF_VERIFY_VERSION;
+}
+
+static void inline log_tracing_failure(const yagpcc::SetQueryReq &req,
+                                       const std::string &event) {
+  ereport(LOG,
+          (errmsg("Query {%d-%d-%d} %s tracing failed with error %s",
+                  req.query_key().tmid(), req.query_key().ssid(),
+                  req.query_key().ccnt(), event.c_str(), strerror(errno))));
+}
+
+bool UDSConnector::report_query(const yagpcc::SetQueryReq &req,
+                                const std::string &event) {
+  sockaddr_un address;
+  address.sun_family = AF_UNIX;
+  strcpy(address.sun_path, uds_path.c_str());
+  bool success = true;
+  auto sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (sockfd != -1) {
+    if (fcntl(sockfd, F_SETFL, O_NONBLOCK) != -1) {
+      if (connect(sockfd, (sockaddr *)&address, sizeof(address)) != -1) {
+        auto data_size = req.ByteSize();
+        auto total_size = data_size + sizeof(uint32_t);
+        uint8_t *buf = (uint8_t *)palloc(total_size);
+        uint32_t *size_payload = (uint32_t *)buf;
+        *size_payload = data_size;
+        req.SerializeWithCachedSizesToArray(buf + sizeof(uint32_t));
+        int64_t sent = 0, sent_total = 0;
+        do {
+          sent = send(sockfd, buf + sent_total, total_size - sent_total,
+                      MSG_DONTWAIT);
+          sent_total += sent;
+        } while (
+            sent > 0 && size_t(sent_total) != total_size &&
+            // the line below is a small throttling hack:
+            // if a message does not fit a single packet, we take a nap
+            // before sending the next one.
+            // Otherwise, MSG_DONTWAIT send might overflow the UDS
+            (std::this_thread::sleep_for(std::chrono::milliseconds(1)), true));
+        if (sent < 0) {
+          log_tracing_failure(req, event);
+          success = false;
+        }
+        pfree(buf);
+      } else {
+        // log the error and go on
+        log_tracing_failure(req, event);
+        success = false;
+      }
+    } else {
+      // That's a very important error that should never happen, so make it
+      // visible to an end-user and admins.
+      ereport(WARNING,
+              (errmsg("Unable to create non-blocking socket connection %s",
+                      strerror(errno))));
+      success = false;
+    }
+    close(sockfd);
+  } else {
+    // log the error and go on
+    log_tracing_failure(req, event);
+    success = false;
+  }
+  return success;
+}
\ No newline at end of file
diff --git a/src/UDSConnector.h b/src/UDSConnector.h
new file mode 100644
index 00000000000..574653023e6
--- /dev/null
+++ b/src/UDSConnector.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include "protos/yagpcc_set_service.pb.h"
+#include <queue>
+
+class UDSConnector {
+public:
+  UDSConnector();
+  bool report_query(const yagpcc::SetQueryReq &req, const std::string &event);
+
+private:
+  const std::string uds_path;
+};
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to