This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 2bab980321fbdd16967829a1d157867671198324
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Thu Nov 7 13:09:44 2024 +0300

    [yagp_hooks_collector] Split EventSender into submodules
    
    Factor out ProtoUtils, ProcStats, and PgUtils from EventSender.
---
 src/EventSender.cpp | 275 +---------------------------------------------------
 src/PgUtils.cpp     |  94 ++++++++++++++++++
 src/PgUtils.h       |  16 +++
 src/ProtoUtils.cpp  | 185 +++++++++++++++++++++++++++++++++++
 src/ProtoUtils.h    |  16 +++
 5 files changed, 315 insertions(+), 271 deletions(-)

diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index 7d2d5a1a2c2..cdb21ef7aa6 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -1,287 +1,21 @@
 #include "Config.h"
-#include "ProcStats.h"
 #include "UDSConnector.h"
-#include <ctime>
 
-#define typeid __typeid
-#define operator __operator
 extern "C" {
 #include "postgres.h"
 
 #include "access/hash.h"
-#include "access/xact.h"
-#include "commands/dbcommands.h"
-#include "commands/explain.h"
-#include "commands/resgroupcmds.h"
 #include "executor/executor.h"
 #include "utils/elog.h"
-#include "utils/workfile_mgr.h"
 
 #include "cdb/cdbdisp.h"
 #include "cdb/cdbexplain.h"
-#include "cdb/cdbinterconnect.h"
 #include "cdb/cdbvars.h"
-
-#include "stat_statements_parser/pg_stat_statements_ya_parser.h"
-#include "tcop/utility.h"
 }
-#undef typeid
-#undef operator
 
 #include "EventSender.h"
-
-namespace {
-
-std::string *get_user_name() {
-  const char *username = GetConfigOption("session_authorization", false, 
false);
-  // username is not to be freed
-  return username ? new std::string(username) : nullptr;
-}
-
-std::string *get_db_name() {
-  char *dbname = get_database_name(MyDatabaseId);
-  std::string *result = nullptr;
-  if (dbname) {
-    result = new std::string(dbname);
-    pfree(dbname);
-  }
-  return result;
-}
-
-std::string *get_rg_name() {
-  auto groupId = ResGroupGetGroupIdBySessionId(MySessionState->sessionId);
-  if (!OidIsValid(groupId))
-    return nullptr;
-  char *rgname = GetResGroupNameForId(groupId);
-  if (rgname == nullptr)
-    return nullptr;
-  return new std::string(rgname);
-}
-
-/**
- * Things get tricky with nested queries.
- * a) A nested query on master is a real query optimized and executed from
- * master. An example would be `select some_insert_function();`, where
- * some_insert_function does something like `insert into tbl values (1)`. 
Master
- * will create two statements. Outer select statement and inner insert 
statement
- * with nesting level 1.
- * For segments both statements are top-level statements with nesting level 0.
- * b) A nested query on segment is something executed as sub-statement on
- * segment. An example would be `select a from tbl where is_good_value(b);`. In
- * this case master will issue one top-level statement, but segments will 
change
- * contexts for UDF execution and execute  is_good_value(b) once for each tuple
- * as a nested query. Creating massive load on gpcc agent.
- *
- * Hence, here is a decision:
- * 1) ignore all queries that are nested on segments
- * 2) record (if enabled) all queries that are nested on master
- * NODE: The truth is, we can't really ignore nested master queries, because
- * segment sees those as top-level.
- */
-
-inline bool is_top_level_query(QueryDesc *query_desc, int nesting_level) {
-  return (query_desc->gpmon_pkt &&
-          query_desc->gpmon_pkt->u.qexec.key.tmid == 0) ||
-         nesting_level == 0;
-}
-
-inline bool nesting_is_valid(QueryDesc *query_desc, int nesting_level) {
-  return (Gp_session_role == GP_ROLE_DISPATCH &&
-          Config::report_nested_queries()) ||
-         is_top_level_query(query_desc, nesting_level);
-}
-
-bool need_report_nested_query() {
-  return Config::report_nested_queries() && Gp_session_role == 
GP_ROLE_DISPATCH;
-}
-
-inline bool filter_query(QueryDesc *query_desc) {
-  return gp_command_count == 0 || query_desc->sourceText == nullptr ||
-         !Config::enable_collector() || Config::filter_user(get_user_name());
-}
-
-inline bool need_collect(QueryDesc *query_desc, int nesting_level) {
-  return !filter_query(query_desc) &&
-         nesting_is_valid(query_desc, nesting_level);
-}
-
-google::protobuf::Timestamp current_ts() {
-  google::protobuf::Timestamp current_ts;
-  struct timeval tv;
-  gettimeofday(&tv, nullptr);
-  current_ts.set_seconds(tv.tv_sec);
-  current_ts.set_nanos(static_cast<int32_t>(tv.tv_usec * 1000));
-  return current_ts;
-}
-
-void set_query_key(yagpcc::QueryKey *key, QueryDesc *query_desc) {
-  key->set_ccnt(gp_command_count);
-  key->set_ssid(gp_session_id);
-  int32 tmid = 0;
-  gpmon_gettmid(&tmid);
-  key->set_tmid(tmid);
-}
-
-void set_segment_key(yagpcc::SegmentKey *key, QueryDesc *query_desc) {
-  key->set_dbid(GpIdentity.dbid);
-  key->set_segindex(GpIdentity.segindex);
-}
-
-ExplainState get_explain_state(QueryDesc *query_desc, bool costs) {
-  ExplainState es;
-  ExplainInitState(&es);
-  es.costs = costs;
-  es.verbose = true;
-  es.format = EXPLAIN_FORMAT_TEXT;
-  ExplainBeginOutput(&es);
-  ExplainPrintPlan(&es, query_desc);
-  ExplainEndOutput(&es);
-  return es;
-}
-
-inline std::string char_to_trimmed_str(const char *str, size_t len) {
-  return std::string(str, std::min(len, Config::max_text_size()));
-}
-
-void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
-  if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->plannedstmt) {
-    auto qi = req->mutable_query_info();
-    qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER
-                          ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER
-                          : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER);
-    MemoryContext oldcxt =
-        MemoryContextSwitchTo(query_desc->estate->es_query_cxt);
-    auto es = get_explain_state(query_desc, true);
-    MemoryContextSwitchTo(oldcxt);
-    *qi->mutable_plan_text() = char_to_trimmed_str(es.str->data, es.str->len);
-    StringInfo norm_plan = gen_normplan(es.str->data);
-    *qi->mutable_template_plan_text() =
-        char_to_trimmed_str(norm_plan->data, norm_plan->len);
-    qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, 
norm_plan->len));
-    qi->set_query_id(query_desc->plannedstmt->queryId);
-    pfree(es.str->data);
-    pfree(norm_plan->data);
-  }
-}
-
-void set_query_text(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
-  if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->sourceText) {
-    auto qi = req->mutable_query_info();
-    *qi->mutable_query_text() = char_to_trimmed_str(
-        query_desc->sourceText, strlen(query_desc->sourceText));
-    char *norm_query = gen_normquery(query_desc->sourceText);
-    *qi->mutable_template_query_text() =
-        char_to_trimmed_str(norm_query, strlen(norm_query));
-  }
-}
-
-void clear_big_fields(yagpcc::SetQueryReq *req) {
-  if (Gp_session_role == GP_ROLE_DISPATCH) {
-    auto qi = req->mutable_query_info();
-    qi->clear_plan_text();
-    qi->clear_template_plan_text();
-    qi->clear_query_text();
-    qi->clear_template_query_text();
-  }
-}
-
-void set_query_info(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
-  if (Gp_session_role == GP_ROLE_DISPATCH) {
-    auto qi = req->mutable_query_info();
-    qi->set_allocated_username(get_user_name());
-    qi->set_allocated_databasename(get_db_name());
-    qi->set_allocated_rsgname(get_rg_name());
-  }
-}
-
-void set_qi_nesting_level(yagpcc::SetQueryReq *req, int nesting_level) {
-  auto aqi = req->mutable_add_info();
-  aqi->set_nested_level(nesting_level);
-}
-
-void set_qi_slice_id(yagpcc::SetQueryReq *req) {
-  auto aqi = req->mutable_add_info();
-  aqi->set_slice_id(currentSliceId);
-}
-
-void set_qi_error_message(yagpcc::SetQueryReq *req) {
-  auto aqi = req->mutable_add_info();
-  auto error = elog_message();
-  *aqi->mutable_error_message() = char_to_trimmed_str(error, strlen(error));
-}
-
-void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics,
-                                QueryDesc *query_desc, int nested_calls,
-                                double nested_time) {
-  auto instrument = query_desc->planstate->instrument;
-  if (instrument) {
-    metrics->set_ntuples(instrument->ntuples);
-    metrics->set_nloops(instrument->nloops);
-    metrics->set_tuplecount(instrument->tuplecount);
-    metrics->set_firsttuple(instrument->firsttuple);
-    metrics->set_startup(instrument->startup);
-    metrics->set_total(instrument->total);
-    auto &buffusage = instrument->bufusage;
-    metrics->set_shared_blks_hit(buffusage.shared_blks_hit);
-    metrics->set_shared_blks_read(buffusage.shared_blks_read);
-    metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied);
-    metrics->set_shared_blks_written(buffusage.shared_blks_written);
-    metrics->set_local_blks_hit(buffusage.local_blks_hit);
-    metrics->set_local_blks_read(buffusage.local_blks_read);
-    metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied);
-    metrics->set_local_blks_written(buffusage.local_blks_written);
-    metrics->set_temp_blks_read(buffusage.temp_blks_read);
-    metrics->set_temp_blks_written(buffusage.temp_blks_written);
-    metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time));
-    metrics->set_blk_write_time(
-        INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time));
-  }
-  if (query_desc->estate && query_desc->estate->motionlayer_context) {
-    MotionLayerState *mlstate =
-        (MotionLayerState *)query_desc->estate->motionlayer_context;
-    metrics->mutable_sent()->set_total_bytes(mlstate->stat_total_bytes_sent);
-    metrics->mutable_sent()->set_tuple_bytes(mlstate->stat_tuple_bytes_sent);
-    metrics->mutable_sent()->set_chunks(mlstate->stat_total_chunks_sent);
-    metrics->mutable_received()->set_total_bytes(
-        mlstate->stat_total_bytes_recvd);
-    metrics->mutable_received()->set_tuple_bytes(
-        mlstate->stat_tuple_bytes_recvd);
-    metrics->mutable_received()->set_chunks(mlstate->stat_total_chunks_recvd);
-  }
-  metrics->set_inherited_calls(nested_calls);
-  metrics->set_inherited_time(nested_time);
-}
-
-void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc,
-                    int nested_calls, double nested_time) {
-  if (query_desc->planstate && query_desc->planstate->instrument) {
-    set_metric_instrumentation(metrics->mutable_instrumentation(), query_desc,
-                               nested_calls, nested_time);
-  }
-  fill_self_stats(metrics->mutable_systemstat());
-  metrics->mutable_systemstat()->set_runningtimeseconds(
-      time(NULL) - metrics->mutable_systemstat()->runningtimeseconds());
-  metrics->mutable_spill()->set_filecount(
-      WorkfileTotalFilesCreated() - metrics->mutable_spill()->filecount());
-  metrics->mutable_spill()->set_totalbytes(
-      WorkfileTotalBytesWritten() - metrics->mutable_spill()->totalbytes());
-}
-
-yagpcc::SetQueryReq create_query_req(QueryDesc *query_desc,
-                                     yagpcc::QueryStatus status) {
-  yagpcc::SetQueryReq req;
-  req.set_query_status(status);
-  *req.mutable_datetime() = current_ts();
-  set_query_key(req.mutable_query_key(), query_desc);
-  set_segment_key(req.mutable_segment_key(), query_desc);
-  return req;
-}
-
-double protots_to_double(const google::protobuf::Timestamp &ts) {
-  return double(ts.seconds()) + double(ts.nanos()) / 1000000000.0;
-}
-
-} // namespace
+#include "PgUtils.h"
+#include "ProtoUtils.h"
 
 void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) {
   if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
@@ -404,10 +138,9 @@ void EventSender::collect_query_submit(QueryDesc 
*query_desc) {
     auto *query = get_query_message(query_desc);
     query->state = QueryState::SUBMIT;
     auto query_msg = query->message;
-    *query_msg =
-        create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT);
+    *query_msg = create_query_req(yagpcc::QueryStatus::QUERY_STATUS_SUBMIT);
     *query_msg->mutable_submit_time() = current_ts();
-    set_query_info(query_msg, query_desc);
+    set_query_info(query_msg);
     set_qi_nesting_level(query_msg, query_desc->gpmon_pkt->u.qexec.key.tmid);
     set_qi_slice_id(query_msg);
     set_query_text(query_msg, query_desc);
diff --git a/src/PgUtils.cpp b/src/PgUtils.cpp
new file mode 100644
index 00000000000..528426e6c64
--- /dev/null
+++ b/src/PgUtils.cpp
@@ -0,0 +1,94 @@
+#include "PgUtils.h"
+#include "Config.h"
+
+extern "C" {
+#include "utils/guc.h"
+#include "commands/dbcommands.h"
+#include "commands/resgroupcmds.h"
+#include "cdb/cdbvars.h"
+}
+
+std::string *get_user_name() {
+  const char *username = GetConfigOption("session_authorization", false, 
false);
+  // username is not to be freed
+  return username ? new std::string(username) : nullptr;
+}
+
+std::string *get_db_name() {
+  char *dbname = get_database_name(MyDatabaseId);
+  std::string *result = nullptr;
+  if (dbname) {
+    result = new std::string(dbname);
+    pfree(dbname);
+  }
+  return result;
+}
+
+std::string *get_rg_name() {
+  auto groupId = ResGroupGetGroupIdBySessionId(MySessionState->sessionId);
+  if (!OidIsValid(groupId))
+    return nullptr;
+  char *rgname = GetResGroupNameForId(groupId);
+  if (rgname == nullptr)
+    return nullptr;
+  return new std::string(rgname);
+}
+
+/**
+ * Things get tricky with nested queries.
+ * a) A nested query on master is a real query optimized and executed from
+ * master. An example would be `select some_insert_function();`, where
+ * some_insert_function does something like `insert into tbl values (1)`. 
Master
+ * will create two statements. Outer select statement and inner insert 
statement
+ * with nesting level 1.
+ * For segments both statements are top-level statements with nesting level 0.
+ * b) A nested query on segment is something executed as sub-statement on
+ * segment. An example would be `select a from tbl where is_good_value(b);`. In
+ * this case master will issue one top-level statement, but segments will 
change
+ * contexts for UDF execution and execute  is_good_value(b) once for each tuple
+ * as a nested query. Creating massive load on gpcc agent.
+ *
+ * Hence, here is a decision:
+ * 1) ignore all queries that are nested on segments
+ * 2) record (if enabled) all queries that are nested on master
+ * NODE: The truth is, we can't really ignore nested master queries, because
+ * segment sees those as top-level.
+ */
+
+bool is_top_level_query(QueryDesc *query_desc, int nesting_level) {
+  return (query_desc->gpmon_pkt &&
+          query_desc->gpmon_pkt->u.qexec.key.tmid == 0) ||
+         nesting_level == 0;
+}
+
+bool nesting_is_valid(QueryDesc *query_desc, int nesting_level) {
+  return (Gp_session_role == GP_ROLE_DISPATCH &&
+          Config::report_nested_queries()) ||
+         is_top_level_query(query_desc, nesting_level);
+}
+
+bool need_report_nested_query() {
+  return Config::report_nested_queries() && Gp_session_role == 
GP_ROLE_DISPATCH;
+}
+
+bool filter_query(QueryDesc *query_desc) {
+  return gp_command_count == 0 || query_desc->sourceText == nullptr ||
+         !Config::enable_collector() || Config::filter_user(get_user_name());
+}
+
+bool need_collect(QueryDesc *query_desc, int nesting_level) {
+  return !filter_query(query_desc) &&
+         nesting_is_valid(query_desc, nesting_level);
+}
+
+ExplainState get_explain_state(QueryDesc *query_desc, bool costs) {
+  ExplainState es;
+  ExplainInitState(&es);
+  es.costs = costs;
+  es.verbose = true;
+  es.format = EXPLAIN_FORMAT_TEXT;
+  ExplainBeginOutput(&es);
+  ExplainPrintPlan(&es, query_desc);
+  ExplainEndOutput(&es);
+  return es;
+}
diff --git a/src/PgUtils.h b/src/PgUtils.h
new file mode 100644
index 00000000000..85b1eb833cd
--- /dev/null
+++ b/src/PgUtils.h
@@ -0,0 +1,16 @@
+extern "C" {
+#include "postgres.h"
+#include "commands/explain.h"
+}
+
+#include <string>
+
+std::string *get_user_name();
+std::string *get_db_name();
+std::string *get_rg_name();
+bool is_top_level_query(QueryDesc *query_desc, int nesting_level);
+bool nesting_is_valid(QueryDesc *query_desc, int nesting_level);
+bool need_report_nested_query();
+bool filter_query(QueryDesc *query_desc);
+bool need_collect(QueryDesc *query_desc, int nesting_level);
+ExplainState get_explain_state(QueryDesc *query_desc, bool costs);
diff --git a/src/ProtoUtils.cpp b/src/ProtoUtils.cpp
new file mode 100644
index 00000000000..e1be25b8b1e
--- /dev/null
+++ b/src/ProtoUtils.cpp
@@ -0,0 +1,185 @@
+#include "ProtoUtils.h"
+#include "PgUtils.h"
+#include "ProcStats.h"
+#include "Config.h"
+
+#define typeid __typeid
+#define operator __operator
+extern "C" {
+#include "postgres.h"
+#include "access/hash.h"
+#include "cdb/cdbinterconnect.h"
+#include "cdb/cdbvars.h"
+#include "gpmon/gpmon.h"
+#include "utils/workfile_mgr.h"
+
+#include "stat_statements_parser/pg_stat_statements_ya_parser.h"
+}
+#undef typeid
+#undef operator
+
+#include <ctime>
+#include <string>
+
+google::protobuf::Timestamp current_ts() {
+  google::protobuf::Timestamp current_ts;
+  struct timeval tv;
+  gettimeofday(&tv, nullptr);
+  current_ts.set_seconds(tv.tv_sec);
+  current_ts.set_nanos(static_cast<int32_t>(tv.tv_usec * 1000));
+  return current_ts;
+}
+
+void set_query_key(yagpcc::QueryKey *key) {
+  key->set_ccnt(gp_command_count);
+  key->set_ssid(gp_session_id);
+  int32 tmid = 0;
+  gpmon_gettmid(&tmid);
+  key->set_tmid(tmid);
+}
+
+void set_segment_key(yagpcc::SegmentKey *key) {
+  key->set_dbid(GpIdentity.dbid);
+  key->set_segindex(GpIdentity.segindex);
+}
+
+inline std::string char_to_trimmed_str(const char *str, size_t len) {
+  return std::string(str, std::min(len, Config::max_text_size()));
+}
+
+void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
+  if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->plannedstmt) {
+    auto qi = req->mutable_query_info();
+    qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER
+                          ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER
+                          : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER);
+    MemoryContext oldcxt =
+        MemoryContextSwitchTo(query_desc->estate->es_query_cxt);
+    auto es = get_explain_state(query_desc, true);
+    MemoryContextSwitchTo(oldcxt);
+    *qi->mutable_plan_text() = char_to_trimmed_str(es.str->data, es.str->len);
+    StringInfo norm_plan = gen_normplan(es.str->data);
+    *qi->mutable_template_plan_text() =
+        char_to_trimmed_str(norm_plan->data, norm_plan->len);
+    qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, 
norm_plan->len));
+    qi->set_query_id(query_desc->plannedstmt->queryId);
+    pfree(es.str->data);
+    pfree(norm_plan->data);
+  }
+}
+
+void set_query_text(yagpcc::SetQueryReq *req, QueryDesc *query_desc) {
+  if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->sourceText) {
+    auto qi = req->mutable_query_info();
+    *qi->mutable_query_text() = char_to_trimmed_str(
+        query_desc->sourceText, strlen(query_desc->sourceText));
+    char *norm_query = gen_normquery(query_desc->sourceText);
+    *qi->mutable_template_query_text() =
+        char_to_trimmed_str(norm_query, strlen(norm_query));
+  }
+}
+
+void clear_big_fields(yagpcc::SetQueryReq *req) {
+  if (Gp_session_role == GP_ROLE_DISPATCH) {
+    auto qi = req->mutable_query_info();
+    qi->clear_plan_text();
+    qi->clear_template_plan_text();
+    qi->clear_query_text();
+    qi->clear_template_query_text();
+  }
+}
+
+void set_query_info(yagpcc::SetQueryReq *req) {
+  if (Gp_session_role == GP_ROLE_DISPATCH) {
+    auto qi = req->mutable_query_info();
+    qi->set_allocated_username(get_user_name());
+    qi->set_allocated_databasename(get_db_name());
+    qi->set_allocated_rsgname(get_rg_name());
+  }
+}
+
+void set_qi_nesting_level(yagpcc::SetQueryReq *req, int nesting_level) {
+  auto aqi = req->mutable_add_info();
+  aqi->set_nested_level(nesting_level);
+}
+
+void set_qi_slice_id(yagpcc::SetQueryReq *req) {
+  auto aqi = req->mutable_add_info();
+  aqi->set_slice_id(currentSliceId);
+}
+
+void set_qi_error_message(yagpcc::SetQueryReq *req) {
+  auto aqi = req->mutable_add_info();
+  auto error = elog_message();
+  *aqi->mutable_error_message() = char_to_trimmed_str(error, strlen(error));
+}
+
+void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics,
+                                QueryDesc *query_desc, int nested_calls,
+                                double nested_time) {
+  auto instrument = query_desc->planstate->instrument;
+  if (instrument) {
+    metrics->set_ntuples(instrument->ntuples);
+    metrics->set_nloops(instrument->nloops);
+    metrics->set_tuplecount(instrument->tuplecount);
+    metrics->set_firsttuple(instrument->firsttuple);
+    metrics->set_startup(instrument->startup);
+    metrics->set_total(instrument->total);
+    auto &buffusage = instrument->bufusage;
+    metrics->set_shared_blks_hit(buffusage.shared_blks_hit);
+    metrics->set_shared_blks_read(buffusage.shared_blks_read);
+    metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied);
+    metrics->set_shared_blks_written(buffusage.shared_blks_written);
+    metrics->set_local_blks_hit(buffusage.local_blks_hit);
+    metrics->set_local_blks_read(buffusage.local_blks_read);
+    metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied);
+    metrics->set_local_blks_written(buffusage.local_blks_written);
+    metrics->set_temp_blks_read(buffusage.temp_blks_read);
+    metrics->set_temp_blks_written(buffusage.temp_blks_written);
+    metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time));
+    metrics->set_blk_write_time(
+        INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time));
+  }
+  if (query_desc->estate && query_desc->estate->motionlayer_context) {
+    MotionLayerState *mlstate =
+        (MotionLayerState *)query_desc->estate->motionlayer_context;
+    metrics->mutable_sent()->set_total_bytes(mlstate->stat_total_bytes_sent);
+    metrics->mutable_sent()->set_tuple_bytes(mlstate->stat_tuple_bytes_sent);
+    metrics->mutable_sent()->set_chunks(mlstate->stat_total_chunks_sent);
+    metrics->mutable_received()->set_total_bytes(
+        mlstate->stat_total_bytes_recvd);
+    metrics->mutable_received()->set_tuple_bytes(
+        mlstate->stat_tuple_bytes_recvd);
+    metrics->mutable_received()->set_chunks(mlstate->stat_total_chunks_recvd);
+  }
+  metrics->set_inherited_calls(nested_calls);
+  metrics->set_inherited_time(nested_time);
+}
+
+void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc,
+                    int nested_calls, double nested_time) {
+  if (query_desc->planstate && query_desc->planstate->instrument) {
+    set_metric_instrumentation(metrics->mutable_instrumentation(), query_desc,
+                               nested_calls, nested_time);
+  }
+  fill_self_stats(metrics->mutable_systemstat());
+  metrics->mutable_systemstat()->set_runningtimeseconds(
+      time(NULL) - metrics->mutable_systemstat()->runningtimeseconds());
+  metrics->mutable_spill()->set_filecount(
+      WorkfileTotalFilesCreated() - metrics->mutable_spill()->filecount());
+  metrics->mutable_spill()->set_totalbytes(
+      WorkfileTotalBytesWritten() - metrics->mutable_spill()->totalbytes());
+}
+
+yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status) {
+  yagpcc::SetQueryReq req;
+  req.set_query_status(status);
+  *req.mutable_datetime() = current_ts();
+  set_query_key(req.mutable_query_key());
+  set_segment_key(req.mutable_segment_key());
+  return req;
+}
+
+double protots_to_double(const google::protobuf::Timestamp &ts) {
+  return double(ts.seconds()) + double(ts.nanos()) / 1000000000.0;
+}
\ No newline at end of file
diff --git a/src/ProtoUtils.h b/src/ProtoUtils.h
new file mode 100644
index 00000000000..38aa75611b2
--- /dev/null
+++ b/src/ProtoUtils.h
@@ -0,0 +1,16 @@
+#include "protos/yagpcc_set_service.pb.h"
+
+struct QueryDesc;
+
+google::protobuf::Timestamp current_ts();
+void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc);
+void set_query_text(yagpcc::SetQueryReq *req, QueryDesc *query_desc);
+void clear_big_fields(yagpcc::SetQueryReq *req);
+void set_query_info(yagpcc::SetQueryReq *req);
+void set_qi_nesting_level(yagpcc::SetQueryReq *req, int nesting_level);
+void set_qi_slice_id(yagpcc::SetQueryReq *req);
+void set_qi_error_message(yagpcc::SetQueryReq *req);
+void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc,
+                    int nested_calls, double nested_time);
+yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status);
+double protots_to_double(const google::protobuf::Timestamp &ts);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to