This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 46dca0331cd5118209de14dcb081584b592c2fff
Author: NJrslv <[email protected]>
AuthorDate: Mon Jul 14 16:14:49 2025 +0300

    [yagp_hooks_collector] Add utility statement tracking and metrics 
documentation
    
    Hook into ProcessUtility to emit submit and done events for DDL.
    Add metrics documentation (metric.md).  Change namespace to avoid
    GPOS conflicts.  Report incomplete queries at extension shutdown.
    Clean up stray files.
---
 metric.md                   | 125 ++++++++++++++
 src/Config.cpp              |  12 +-
 src/EventSender.cpp         | 407 ++++++++++++++++++++++++--------------------
 src/EventSender.h           |  94 ++++++++--
 src/PgUtils.cpp             |  10 +-
 src/ProtoUtils.cpp          |  22 +--
 src/UDSConnector.cpp        |   4 +-
 src/hook_wrappers.cpp       |   2 +-
 src/memory/gpdbwrappers.cpp | 163 +++++++++++++-----
 src/memory/gpdbwrappers.h   |  85 +--------
 10 files changed, 572 insertions(+), 352 deletions(-)

diff --git a/metric.md b/metric.md
new file mode 100644
index 00000000000..2d198391a67
--- /dev/null
+++ b/metric.md
@@ -0,0 +1,125 @@
+## YAGP Hooks Collector Metrics
+
+### States  
+A Postgres process goes through 4 executor functions to execute a query:  
+1) `ExecutorStart()` - resource allocation for the query.  
+2) `ExecutorRun()` - query execution.  
+3) `ExecutorFinish()` - cleanup.  
+4) `ExecutorEnd()` - cleanup.  
+
+yagp-hooks-collector sends messages with 4 states, from _Dispatcher_ and/or 
_Execute_ processes: `submit`, `start`, `end`, `done`, in this order:  
+```
+submit -> ExecutorStart() -> start -> ExecutorRun() -> ExecutorFinish() -> end 
-> ExecutorEnd() -> done
+```
+
+### Key Points  
+- Some queries may skip the _end_ state, then the _end_ statistics is sent 
during _done_.  
+- If a query finishes with an error (`METRICS_QUERY_ERROR`), or is cancelled 
(`METRICS_QUERY_CANCELLED`), statistics is sent at _done_.  
+- Some statistics is calculated as the difference between the current global 
metric and the previous. The initial snapshot is taken at submit, and at 
_end_/_done_ the diff is calculated.  
+- Nested queries on _Dispatcher_ become top-level on _Execute_.  
+- Each process (_Dispatcher_/_Execute_) sends its own statistics.  
+
+### Notations  
+- **S** = Submit event.  
+- **T** = Start event.  
+- **E** = End event.  
+- **D** = Done event.  
+- **DIFF** = current_value - submit_value (submit event).  
+- **ABS** = Absolute value, or where diff is not applicable, the value taken.  
+- **Local*** - Statistics that starts counting from zero for each new query. A 
nested query is also considered new.  
+
+### Statistics Table
+
+| Proto Field                  | Type   | When    | DIFF/ABS | Local* | Scope  
 | Dispatcher | Execute | Units   | Notes                                       
        |
+| :--------------------------- | :----- | :------ | :------- | ------ | 
:------ | :--------: | :-----: | :------ | 
:-------------------------------------------------- |
+| **SystemStat**               |        |         |          |        |        
 |            |         |         |                                             
        |
+| `runningTimeSeconds`         | double | E, D    | DIFF     | -      | Node   
 |     +      |    +    | seconds | Wall clock time                             
        |
+| `userTimeSeconds`            | double | E, D    | DIFF     | -      | Node   
 |     +      |    +    | seconds | /proc/pid/stat utime                        
        |
+| `kernelTimeSeconds`          | double | E, D    | DIFF     | -      | Node   
 |     +      |    +    | seconds | /proc/pid/stat stime                        
        |
+| `vsize`                      | uint64 | E, D    | ABS      | -      | Node   
 |     +      |    +    | pages   | /proc/pid/stat vsize                        
        |
+| `rss`                        | uint64 | E, D    | ABS      | -      | Node   
 |     +      |    +    | pages   | /proc/pid/stat rss                          
        |
+| `VmSizeKb`                   | uint64 | E, D    | ABS      | -      | Node   
 |     +      |    +    | KB      | /proc/pid/status VmSize                     
        |
+| `VmPeakKb`                   | uint64 | E, D    | ABS      | -      | Node   
 |     +      |    +    | KB      | /proc/pid/status VmPeak                     
        |
+| `rchar`                      | uint64 | E, D    | DIFF     | -      | Node   
 |     +      |    +    | bytes   | /proc/pid/io rchar                          
        |
+| `wchar`                      | uint64 | E, D    | DIFF     | -      | Node   
 |     +      |    +    | bytes   | /proc/pid/io wchar                          
        |
+| `syscr`                      | uint64 | E, D    | DIFF     | -      | Node   
 |     +      |    +    | count   | /proc/pid/io syscr                          
        |
+| `syscw`                      | uint64 | E, D    | DIFF     | -      | Node   
 |     +      |    +    | count   | /proc/pid/io syscw                          
        |
+| `read_bytes`                 | uint64 | E, D    | DIFF     | -      | Node   
 |     +      |    +    | bytes   | /proc/pid/io read_bytes                     
        |
+| `write_bytes`                | uint64 | E, D    | DIFF     | -      | Node   
 |     +      |    +    | bytes   | /proc/pid/io write_bytes                    
        |
+| `cancelled_write_bytes`      | uint64 | E, D    | DIFF     | -      | Node   
 |     +      |    +    | bytes   | /proc/pid/io cancelled_write_bytes          
        |
+| **MetricInstrumentation**    |        |         |          |        |        
 |            |         |         |                                             
        |
+| `ntuples`                    | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | tuples  | Accumulated total tuples                    
        |
+| `nloops`                     | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | count   | Number of cycles                            
        |
+| `tuplecount`                 | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | tuples  | Accumulated tuples per cycle                
        |
+| `firsttuple`                 | double | E, D    | ABS      | +      | Node   
 |     +      |    +    | seconds | Time for first tuple of this cycle          
        |
+| `startup`                    | double | E, D    | ABS      | +      | Node   
 |     +      |    +    | seconds | Start time of current iteration             
        |
+| `total`                      | double | E, D    | ABS      | +      | Node   
 |     +      |    +    | seconds | Total time taken                            
        |
+| `shared_blks_hit`            | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Shared buffer blocks found in cache         
        |
+| `shared_blks_read`           | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Shared buffer blocks read from disk         
        |
+| `shared_blks_dirtied`        | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Shared blocks dirtied                       
        |
+| `shared_blks_written`        | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Dirty shared buffer blocks written to disk  
        |
+| `local_blks_hit`             | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Local buffer hits                           
        |
+| `local_blks_read`            | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Disk blocks read                            
        |
+| `local_blks_dirtied`         | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Local blocks dirtied                        
        |
+| `local_blks_written`         | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Local blocks written to disk                
        |
+| `temp_blks_read`             | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Temp file blocks read                       
        |
+| `temp_blks_written`          | uint64 | E, D    | ABS      | +      | Node   
 |     +      |    +    | blocks  | Temp file blocks written                    
        |
+| `blk_read_time`              | double | E, D    | ABS      | +      | Node   
 |     +      |    +    | seconds | Time reading data blocks                    
        |
+| `blk_write_time`             | double | E, D    | ABS      | +      | Node   
 |     +      |    +    | seconds | Time writing data blocks                    
        |
+| `inherited_calls`            | uint64 | E, D    | ABS      | -      | Node   
 |     +      |    +    | count   | Nested query count (YAGPCC-specific)        
        |
+| `inherited_time`             | double | E, D    | ABS      | -      | Node   
 |     +      |    +    | seconds | Nested query time (YAGPCC-specific)         
        |
+| **NetworkStat (sent)**       |        |         |          |        |        
 |            |         |         |                                             
        |
+| `sent.total_bytes`           | uint32 | D       | ABS      | -      | Node   
 |     +      |    +    | bytes   | Bytes sent, including headers               
        |
+| `sent.tuple_bytes`           | uint32 | D       | ABS      | -      | Node   
 |     +      |    +    | bytes   | Bytes of pure tuple-data sent               
        |
+| `sent.chunks`                | uint32 | D       | ABS      | -      | Node   
 |     +      |    +    | count   | Tuple-chunks sent                           
        |
+| **NetworkStat (received)**   |        |         |          |        |        
 |            |         |         |                                             
        |
+| `received.total_bytes`       | uint32 | D       | ABS      | -      | Node   
 |     +      |    +    | bytes   | Bytes of pure tuple-data received           
        |
+| `received.tuple_bytes`       | uint32 | D       | ABS      | -      | Node   
 |     +      |    +    | bytes   | Bytes of pure tuple-data received           
        |
+| `received.chunks`            | uint32 | D       | ABS      | -      | Node   
 |     +      |    +    | count   | Tuple-chunks received                       
        |
+| **InterconnectStat**         |        |         |          |        |        
 |            |         |         |                                             
        |
+| `total_recv_queue_size`      | uint64 | D       | DIFF     | -      | Node   
 |     +      |    +    | bytes   | Receive queue size sum                      
        |
+| `recv_queue_size_counting_t` | uint64 | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Counting times when computing 
total_recv_queue_size |
+| `total_capacity`             | uint64 | D       | DIFF     | -      | Node   
 |     +      |    +    | bytes   | the capacity sum for sent packets           
        |
+| `capacity_counting_time`     | uint64 | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | counting times used to compute 
total_capacity       |
+| `total_buffers`              | uint64 | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Available buffers                           
        |
+| `buffer_counting_time`       | uint64 | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | counting times when compute total_buffers   
        |
+| `active_connections_num`     | uint64 | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Active connections                          
        |
+| `retransmits`                | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Packet retransmits                          
        |
+| `startup_cached_pkt_num`     | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Startup cached packets                      
        |
+| `mismatch_num`               | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Mismatched packets received                 
        |
+| `crc_errors`                 | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | CRC errors                                  
        |
+| `snd_pkt_num`                | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Packets sent                                
        |
+| `recv_pkt_num`               | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Packets received                            
        |
+| `disordered_pkt_num`         | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Out-of-order packets                        
        |
+| `duplicated_pkt_num`         | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Duplicate packets                           
        |
+| `recv_ack_num`               | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | ACKs received                               
        |
+| `status_query_msg_num`       | int64  | D       | DIFF     | -      | Node   
 |     +      |    +    | count   | Status query messages sent                  
        |
+| **SpillInfo**                |        |         |          |        |        
 |            |         |         |                                             
        |
+| `fileCount`                  | int32  | E, D    | DIFF     | -      | Node   
 |     +      |    +    | count   | Spill (temp) files created                  
        |
+| `totalBytes`                 | int64  | E, D    | DIFF     | -      | Node   
 |     +      |    +    | bytes   | Spill bytes written                         
        |
+| **QueryInfo**                |        |         |          |        |        
 |            |         |         |                                             
        |
+| `generator`                  | enum   | T, E, D | ABS      | -      | 
Cluster |     +      |    -    | enum    | Planner/Optimizer                    
               |
+| `query_id`                   | uint64 | T, E, D | ABS      | -      | 
Cluster |     +      |    -    | id      | Query ID                             
               |
+| `plan_id`                    | uint64 | T, E, D | ABS      | -      | 
Cluster |     +      |    -    | id      | Hash of normalized plan              
               |
+| `query_text`                 | string | S       | ABS      | -      | 
Cluster |     +      |    -    | text    | Query text                           
               |
+| `plan_text`                  | string | T       | ABS      | -      | 
Cluster |     +      |    -    | text    | EXPLAIN text                         
               |
+| `template_query_text`        | string | S       | ABS      | -      | 
Cluster |     +      |    -    | text    | Normalized query text                
               |
+| `template_plan_text`         | string | T       | ABS      | -      | 
Cluster |     +      |    -    | text    | Normalized plan text                 
               |
+| `userName`                   | string | All     | ABS      | -      | 
Cluster |     +      |    -    | text    | Session user                         
               |
+| `databaseName`               | string | All     | ABS      | -      | 
Cluster |     +      |    -    | text    | Database name                        
               |
+| `rsgname`                    | string | All     | ABS      | -      | 
Cluster |     +      |    -    | text    | Resource group name                  
               |
+| `analyze_text`               | string | D       | ABS      | -      | 
Cluster |     +      |    -    | text    | EXPLAIN ANALYZE JSON                 
               |
+| **AdditionalQueryInfo**      |        |         |          |        |        
 |            |         |         |                                             
        |
+| `nested_level`               | int64  | All     | ABS      | -      | Node   
 |     +      |    +    | count   | Current nesting level                       
        |
+| `error_message`              | string | D       | ABS      | -      | Node   
 |     +      |    +    | text    | Error message                               
        |
+| `slice_id`                   | int64  | All     | ABS      | -      | Node   
 |     +      |    +    | id      | Slice ID                                    
        |
+| **QueryKey**                 |        |         |          |        |        
 |            |         |         |                                             
        |
+| `tmid`                       | int32  | All     | ABS      | -      | Node   
 |     +      |    +    | id      | Time ID                                     
        |
+| `ssid`                       | int32  | All     | ABS      | -      | Node   
 |     +      |    +    | id      | Session ID                                  
        |
+| `ccnt`                       | int32  | All     | ABS      | -      | Node   
 |     +      |    +    | count   | Command counter                             
        |
+| **SegmentKey**               |        |         |          |        |        
 |            |         |         |                                             
        |
+| `dbid`                       | int32  | All     | ABS      | -      | Node   
 |     +      |    +    | id      | Database ID                                 
        |
+| `segment_index`              | int32  | All     | ABS      | -      | Node   
 |     +      |    +    | id      | Segment index (-1=coordinator)              
        |
+
+---
+
diff --git a/src/Config.cpp b/src/Config.cpp
index a1289a48891..aef09fc7d73 100644
--- a/src/Config.cpp
+++ b/src/Config.cpp
@@ -29,15 +29,15 @@ static void update_ignored_users(const char 
*new_guc_ignored_users) {
       std::make_unique<std::unordered_set<std::string>>();
   if (new_guc_ignored_users != nullptr && new_guc_ignored_users[0] != '\0') {
     /* Need a modifiable copy of string */
-    char *rawstring = gpdb::pstrdup(new_guc_ignored_users);
+    char *rawstring = ya_gpdb::pstrdup(new_guc_ignored_users);
     List *elemlist;
     ListCell *l;
 
     /* Parse string into list of identifiers */
-    if (!gpdb::split_identifier_string(rawstring, ',', &elemlist)) {
+    if (!ya_gpdb::split_identifier_string(rawstring, ',', &elemlist)) {
       /* syntax error in list */
-      gpdb::pfree(rawstring);
-      gpdb::list_free(elemlist);
+      ya_gpdb::pfree(rawstring);
+      ya_gpdb::list_free(elemlist);
       ereport(
           LOG,
           (errcode(ERRCODE_SYNTAX_ERROR),
@@ -48,8 +48,8 @@ static void update_ignored_users(const char 
*new_guc_ignored_users) {
     foreach (l, elemlist) {
       new_ignored_users_set->insert((char *)lfirst(l));
     }
-    gpdb::pfree(rawstring);
-    gpdb::list_free(elemlist);
+    ya_gpdb::pfree(rawstring);
+    ya_gpdb::list_free(elemlist);
   }
   ignored_users_set = std::move(new_ignored_users_set);
 }
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index 8711c4cbd4f..133d409b574 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -8,6 +8,7 @@ extern "C" {
 
 #include "executor/executor.h"
 #include "utils/elog.h"
+#include "utils/guc.h"
 
 #include "cdb/cdbexplain.h"
 #include "cdb/cdbvars.h"
@@ -27,6 +28,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus 
status, void *arg) {
   if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
     return;
   }
+  auto *query_desc = reinterpret_cast<QueryDesc *>(arg);
   switch (status) {
   case METRICS_PLAN_NODE_INITIALIZE:
   case METRICS_PLAN_NODE_EXECUTING:
@@ -34,8 +36,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus 
status, void *arg) {
     // TODO
     break;
   case METRICS_QUERY_SUBMIT:
-    // don't collect anything here. We will fake this call in ExecutorStart as
-    // it really makes no difference. Just complicates things
+    collect_query_submit(query_desc);
     break;
   case METRICS_QUERY_START:
     // no-op: executor_after_start is enough
@@ -49,7 +50,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus 
status, void *arg) {
   case METRICS_QUERY_ERROR:
   case METRICS_QUERY_CANCELED:
   case METRICS_INNER_QUERY_DONE:
-    collect_query_done(reinterpret_cast<QueryDesc *>(arg), status);
+    collect_query_done(query_desc, status);
     break;
   default:
     ereport(FATAL, (errmsg("Unknown query status: %d", status)));
@@ -60,15 +61,15 @@ void EventSender::executor_before_start(QueryDesc 
*query_desc, int eflags) {
   if (!connector) {
     return;
   }
-  if (is_top_level_query(query_desc, nesting_level)) {
-    nested_timing = 0;
-    nested_calls = 0;
+  if (filter_query(query_desc)) {
+    return;
+  }
+  if (!qdesc_submitted(query_desc)) {
+    collect_query_submit(query_desc);
   }
-  Config::sync();
   if (!need_collect(query_desc, nesting_level)) {
     return;
   }
-  collect_query_submit(query_desc);
   if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze() &&
       (eflags & EXEC_FLAG_EXPLAIN_ONLY) == 0) {
     query_desc->instrument_options |= INSTRUMENT_BUFFERS;
@@ -80,167 +81,194 @@ void EventSender::executor_before_start(QueryDesc 
*query_desc, int eflags) {
         instr_time starttime;
         INSTR_TIME_SET_CURRENT(starttime);
         query_desc->showstatctx =
-            gpdb::cdbexplain_showExecStatsBegin(query_desc, starttime);
+            ya_gpdb::cdbexplain_showExecStatsBegin(query_desc, starttime);
       }
     }
   }
 }
 
 void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) 
{
-  if (!connector) {
+  if (!connector || !need_collect(query_desc, nesting_level)) {
     return;
   }
-  if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) {
-    if (!filter_query(query_desc)) {
-      auto *query = get_query_message(query_desc);
-      auto query_msg = query->message;
-      *query_msg->mutable_start_time() = current_ts();
-      if (!nesting_is_valid(query_desc, nesting_level)) {
-        return;
-      }
-      update_query_state(query_desc, query, QueryState::START);
-      set_query_plan(query_msg, query_desc);
-      if (need_collect_analyze()) {
-        // Set up to track total elapsed time during query run.
-        // Make sure the space is allocated in the per-query
-        // context so it will go away at executor_end.
-        if (query_desc->totaltime == NULL) {
-          MemoryContext oldcxt =
-              gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt);
-          query_desc->totaltime = gpdb::instr_alloc(1, INSTRUMENT_ALL);
-          gpdb::mem_ctx_switch_to(oldcxt);
-        }
-      }
-      yagpcc::GPMetrics stats;
-      std::swap(stats, *query_msg->mutable_query_metrics());
-      if (connector->report_query(*query_msg, "started")) {
-        clear_big_fields(query_msg);
-      }
-      std::swap(stats, *query_msg->mutable_query_metrics());
+  if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
+    return;
+  }
+  auto &query = get_query(query_desc);
+  auto query_msg = query.message.get();
+  *query_msg->mutable_start_time() = current_ts();
+  update_query_state(query, QueryState::START);
+  set_query_plan(query_msg, query_desc);
+  if (need_collect_analyze()) {
+    // Set up to track total elapsed time during query run.
+    // Make sure the space is allocated in the per-query
+    // context so it will go away at executor_end.
+    if (query_desc->totaltime == NULL) {
+      MemoryContext oldcxt =
+          ya_gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt);
+      query_desc->totaltime = ya_gpdb::instr_alloc(1, INSTRUMENT_ALL);
+      ya_gpdb::mem_ctx_switch_to(oldcxt);
     }
   }
+  yagpcc::GPMetrics stats;
+  std::swap(stats, *query_msg->mutable_query_metrics());
+  if (connector->report_query(*query_msg, "started")) {
+    clear_big_fields(query_msg);
+  }
+  std::swap(stats, *query_msg->mutable_query_metrics());
 }
 
 void EventSender::executor_end(QueryDesc *query_desc) {
-  if (!connector ||
-      (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE)) {
+  if (!connector || !need_collect(query_desc, nesting_level)) {
     return;
   }
-  if (!filter_query(query_desc)) {
-    auto *query = get_query_message(query_desc);
-    auto query_msg = query->message;
-    *query_msg->mutable_end_time() = current_ts();
-    if (nesting_is_valid(query_desc, nesting_level)) {
-      if (query->state == UNKNOWN &&
-          // Yet another greenplum weirdness: thats actually a nested query
-          // which is being committed/rollbacked. Treat it accordingly.
-          !need_report_nested_query()) {
-        return;
-      }
-      update_query_state(query_desc, query, QueryState::END);
-      if (is_top_level_query(query_desc, nesting_level)) {
-        set_gp_metrics(query_msg->mutable_query_metrics(), query_desc,
-                       nested_calls, nested_timing);
-      } else {
-        set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0);
-      }
-      if (connector->report_query(*query_msg, "ended")) {
-        clear_big_fields(query_msg);
-      }
-    }
+  if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
+    return;
+  }
+  auto &query = get_query(query_desc);
+  auto *query_msg = query.message.get();
+  *query_msg->mutable_end_time() = current_ts();
+  update_query_state(query, QueryState::END);
+  if (is_top_level_query(query_desc, nesting_level)) {
+    set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 
nested_calls,
+                   nested_timing);
+  } else {
+    set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0);
+  }
+  if (connector->report_query(*query_msg, "ended")) {
+    clear_big_fields(query_msg);
   }
 }
 
 void EventSender::collect_query_submit(QueryDesc *query_desc) {
-  if (connector && need_collect(query_desc, nesting_level)) {
-    auto *query = get_query_message(query_desc);
-    query->state = QueryState::SUBMIT;
-    auto query_msg = query->message;
-    *query_msg = create_query_req(yagpcc::QueryStatus::QUERY_STATUS_SUBMIT);
-    *query_msg->mutable_submit_time() = current_ts();
-    set_query_info(query_msg);
-    set_qi_nesting_level(query_msg, query_desc->gpmon_pkt->u.qexec.key.tmid);
-    set_qi_slice_id(query_msg);
-    set_query_text(query_msg, query_desc);
-    if (connector->report_query(*query_msg, "submit")) {
-      clear_big_fields(query_msg);
-    }
-    // take initial metrics snapshot so that we can safely take diff afterwards
-    // in END or DONE events.
-    set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0);
+  if (!connector) {
+    return;
+  }
+  Config::sync();
+  // Register qkey for a nested query we won't report,
+  // so we can detect nesting_level > 0 and skip reporting at end/done.
+  if (!need_report_nested_query() && nesting_level > 0) {
+    QueryKey::register_qkey(query_desc, nesting_level);
+    return;
+  }
+  if (is_top_level_query(query_desc, nesting_level)) {
+    nested_timing = 0;
+    nested_calls = 0;
+  }
+  if (!need_collect(query_desc, nesting_level)) {
+    return;
+  }
+  submit_query(query_desc);
+  auto &query = get_query(query_desc);
+  auto *query_msg = query.message.get();
+  *query_msg = create_query_req(yagpcc::QueryStatus::QUERY_STATUS_SUBMIT);
+  *query_msg->mutable_submit_time() = current_ts();
+  set_query_info(query_msg);
+  set_qi_nesting_level(query_msg, nesting_level);
+  set_qi_slice_id(query_msg);
+  set_query_text(query_msg, query_desc);
+  if (connector->report_query(*query_msg, "submit")) {
+    clear_big_fields(query_msg);
+  }
+  // take initial metrics snapshot so that we can safely take diff afterwards
+  // in END or DONE events.
+  set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0);
 #ifdef IC_TEARDOWN_HOOK
-    // same for interconnect statistics
-    ic_metrics_collect();
-    set_ic_stats(query_msg->mutable_query_metrics()->mutable_instrumentation(),
-                 &ic_statistics);
+  // same for interconnect statistics
+  ic_metrics_collect();
+  set_ic_stats(query_msg->mutable_query_metrics()->mutable_instrumentation(),
+               &ic_statistics);
 #endif
+}
+
+void EventSender::report_query_done(QueryDesc *query_desc, QueryItem &query,
+                                    QueryMetricsStatus status) {
+  yagpcc::QueryStatus query_status;
+  std::string msg;
+  switch (status) {
+  case METRICS_QUERY_DONE:
+  case METRICS_INNER_QUERY_DONE:
+    query_status = yagpcc::QueryStatus::QUERY_STATUS_DONE;
+    msg = "done";
+    break;
+  case METRICS_QUERY_ERROR:
+    query_status = yagpcc::QueryStatus::QUERY_STATUS_ERROR;
+    msg = "error";
+    break;
+  case METRICS_QUERY_CANCELING:
+    // at the moment we don't track this event, but I`ll leave this code
+    // here just in case
+    Assert(false);
+    query_status = yagpcc::QueryStatus::QUERY_STATUS_CANCELLING;
+    msg = "cancelling";
+    break;
+  case METRICS_QUERY_CANCELED:
+    query_status = yagpcc::QueryStatus::QUERY_STATUS_CANCELED;
+    msg = "cancelled";
+    break;
+  default:
+    ereport(FATAL,
+            (errmsg("Unexpected query status in query_done hook: %d", 
status)));
   }
+  auto prev_state = query.state;
+  update_query_state(query, QueryState::DONE,
+                     query_status == yagpcc::QueryStatus::QUERY_STATUS_DONE);
+  auto query_msg = query.message.get();
+  query_msg->set_query_status(query_status);
+  if (status == METRICS_QUERY_ERROR) {
+    set_qi_error_message(query_msg);
+  }
+  if (prev_state == START) {
+    // We've missed ExecutorEnd call due to query cancel or error. It's
+    // fine, but now we need to collect and report execution stats
+    *query_msg->mutable_end_time() = current_ts();
+    set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 
nested_calls,
+                   nested_timing);
+  }
+#ifdef IC_TEARDOWN_HOOK
+  ic_metrics_collect();
+  set_ic_stats(query_msg->mutable_query_metrics()->mutable_instrumentation(),
+               &ic_statistics);
+#endif
+  connector->report_query(*query_msg, msg);
 }
 
 void EventSender::collect_query_done(QueryDesc *query_desc,
                                      QueryMetricsStatus status) {
-  if (connector && !filter_query(query_desc)) {
-    auto *query = get_query_message(query_desc);
-    if (query->state != UNKNOWN || need_report_nested_query()) {
-      if (nesting_is_valid(query_desc, nesting_level)) {
-        yagpcc::QueryStatus query_status;
-        std::string msg;
-        switch (status) {
-        case METRICS_QUERY_DONE:
-        case METRICS_INNER_QUERY_DONE:
-          query_status = yagpcc::QueryStatus::QUERY_STATUS_DONE;
-          msg = "done";
-          break;
-        case METRICS_QUERY_ERROR:
-          query_status = yagpcc::QueryStatus::QUERY_STATUS_ERROR;
-          msg = "error";
-          break;
-        case METRICS_QUERY_CANCELING:
-          // at the moment we don't track this event, but I`ll leave this code
-          // here just in case
-          Assert(false);
-          query_status = yagpcc::QueryStatus::QUERY_STATUS_CANCELLING;
-          msg = "cancelling";
-          break;
-        case METRICS_QUERY_CANCELED:
-          query_status = yagpcc::QueryStatus::QUERY_STATUS_CANCELED;
-          msg = "cancelled";
-          break;
-        default:
-          ereport(FATAL,
-                  (errmsg("Unexpected query status in query_done hook: %d",
-                          status)));
-        }
-        auto prev_state = query->state;
-        update_query_state(query_desc, query, QueryState::DONE,
-                           query_status ==
-                               yagpcc::QueryStatus::QUERY_STATUS_DONE);
-        auto query_msg = query->message;
-        query_msg->set_query_status(query_status);
-        if (status == METRICS_QUERY_ERROR) {
-          set_qi_error_message(query_msg);
-        }
-        if (prev_state == START) {
-          // We've missed ExecutorEnd call due to query cancel or error. It's
-          // fine, but now we need to collect and report execution stats
-          *query_msg->mutable_end_time() = current_ts();
-          set_gp_metrics(query_msg->mutable_query_metrics(), query_desc,
-                         nested_calls, nested_timing);
-        }
-#ifdef IC_TEARDOWN_HOOK
-        ic_metrics_collect();
-        set_ic_stats(
-            query_msg->mutable_query_metrics()->mutable_instrumentation(),
-            &ic_statistics);
-#endif
-        connector->report_query(*query_msg, msg);
-      }
-      update_nested_counters(query_desc);
+  if (!connector || !need_collect(query_desc, nesting_level)) {
+    return;
+  }
+
+  // Skip sending done message if query errored before submit.
+  if (!qdesc_submitted(query_desc)) {
+    if (status != METRICS_QUERY_ERROR) {
+      ereport(WARNING, (errmsg("YAGPCC trying to process DONE hook for "
+                               "unsubmitted and unerrored query")));
+      ereport(DEBUG3,
+              (errmsg("YAGPCC query sourceText: %s", query_desc->sourceText)));
     }
-    query_msgs.erase({query_desc->gpmon_pkt->u.qexec.key.ccnt,
-                      query_desc->gpmon_pkt->u.qexec.key.tmid});
-    gpdb::pfree(query_desc->gpmon_pkt);
+    return;
+  }
+
+  if (queries.empty()) {
+    ereport(WARNING, (errmsg("YAGPCC cannot find query to process DONE 
hook")));
+    ereport(DEBUG3,
+            (errmsg("YAGPCC query sourceText: %s", query_desc->sourceText)));
+    return;
   }
+  auto &query = get_query(query_desc);
+
+  bool report = need_report_nested_query() ||
+                is_top_level_query(query_desc, nesting_level);
+  if (report)
+    report_query_done(query_desc, query, status);
+
+  if (need_report_nested_query())
+    update_nested_counters(query_desc);
+
+  queries.erase(QueryKey::from_qdesc(query_desc));
+  pfree(query_desc->yagp_query_key);
+  query_desc->yagp_query_key = NULL;
 }
 
 void EventSender::ic_metrics_collect() {
@@ -283,20 +311,15 @@ void EventSender::analyze_stats_collect(QueryDesc 
*query_desc) {
   if (!need_collect(query_desc, nesting_level)) {
     return;
   }
-  auto query = get_query_message(query_desc);
-  auto query_msg = query->message;
+  auto &query = get_query(query_desc);
+  auto *query_msg = query.message.get();
   *query_msg->mutable_end_time() = current_ts();
-  // Yet another greenplum weirdness: thats actually a nested query
-  // which is being committed/rollbacked. Treat it accordingly.
-  if (query->state == UNKNOWN && !need_report_nested_query()) {
-    return;
-  }
   if (!query_desc->totaltime || !need_collect_analyze()) {
     return;
   }
   // Make sure stats accumulation is done.
   // (Note: it's okay if several levels of hook all do this.)
-  gpdb::instr_end_loop(query_desc->totaltime);
+  ya_gpdb::instr_end_loop(query_desc->totaltime);
 
   double ms = query_desc->totaltime->total * 1000.0;
   if (ms >= Config::min_analyze_time()) {
@@ -318,26 +341,26 @@ EventSender::EventSender() {
 }
 
 EventSender::~EventSender() {
-  delete connector;
-  for (auto iter = query_msgs.begin(); iter != query_msgs.end(); ++iter) {
-    delete iter->second.message;
+  for (const auto &[qkey, _] : queries) {
+    ereport(LOG,
+            (errmsg("YAGPCC query with missing done event: "
+                    "tmid=%d ssid=%d ccnt=%d nlvl=%d",
+                    qkey.tmid, qkey.ssid, qkey.ccnt, qkey.nesting_level)));
   }
+  delete connector;
 }
 
 // That's basically a very simplistic state machine to fix or highlight any 
bugs
 // coming from GP
-void EventSender::update_query_state(QueryDesc *query_desc, QueryItem *query,
-                                     QueryState new_state, bool success) {
-  if (query->state == UNKNOWN) {
-    collect_query_submit(query_desc);
-  }
+void EventSender::update_query_state(QueryItem &query, QueryState new_state,
+                                     bool success) {
   switch (new_state) {
   case QueryState::SUBMIT:
     Assert(false);
     break;
   case QueryState::START:
-    if (query->state == QueryState::SUBMIT) {
-      
query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
+    if (query.state == QueryState::SUBMIT) {
+      query.message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
     } else {
       Assert(false);
     }
@@ -346,40 +369,52 @@ void EventSender::update_query_state(QueryDesc 
*query_desc, QueryItem *query,
     // Example of below assert triggering: CURSOR closes before ever being
     // executed Assert(query->state == QueryState::START ||
     // IsAbortInProgress());
-    query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END);
+    query.message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END);
     break;
   case QueryState::DONE:
-    Assert(query->state == QueryState::END || !success);
-    query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE);
+    Assert(query.state == QueryState::END || !success);
+    query.message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE);
     break;
   default:
     Assert(false);
   }
-  query->state = new_state;
+  query.state = new_state;
 }
 
-EventSender::QueryItem *EventSender::get_query_message(QueryDesc *query_desc) {
-  if (query_desc->gpmon_pkt == nullptr ||
-      query_msgs.find({query_desc->gpmon_pkt->u.qexec.key.ccnt,
-                       query_desc->gpmon_pkt->u.qexec.key.tmid}) ==
-          query_msgs.end()) {
-    query_desc->gpmon_pkt =
-        (gpmon_packet_t *)gpdb::palloc0(sizeof(gpmon_packet_t));
-    query_desc->gpmon_pkt->u.qexec.key.ccnt = gp_command_count;
-    query_desc->gpmon_pkt->u.qexec.key.tmid = nesting_level;
-    query_msgs.insert({{gp_command_count, nesting_level},
-                       QueryItem(UNKNOWN, new yagpcc::SetQueryReq())});
-  }
-  return &query_msgs.at({query_desc->gpmon_pkt->u.qexec.key.ccnt,
-                         query_desc->gpmon_pkt->u.qexec.key.tmid});
+EventSender::QueryItem &EventSender::get_query(QueryDesc *query_desc) {
+  if (!qdesc_submitted(query_desc)) {
+    ereport(WARNING,
+            (errmsg("YAGPCC attempting to get query that was not submitted")));
+    ereport(DEBUG3,
+            (errmsg("YAGPCC query sourceText: %s", query_desc->sourceText)));
+    throw std::runtime_error("Attempting to get query that was not submitted");
+  }
+  return queries.find(QueryKey::from_qdesc(query_desc))->second;
+}
+
+void EventSender::submit_query(QueryDesc *query_desc) {
+  if (query_desc->yagp_query_key) {
+    ereport(WARNING,
+            (errmsg("YAGPCC trying to submit already submitted query")));
+    ereport(DEBUG3,
+            (errmsg("YAGPCC query sourceText: %s", query_desc->sourceText)));
+  }
+  QueryKey::register_qkey(query_desc, nesting_level);
+  auto key = QueryKey::from_qdesc(query_desc);
+  auto [_, inserted] = queries.emplace(key, QueryItem(QueryState::SUBMIT));
+  if (!inserted) {
+    ereport(WARNING, (errmsg("YAGPCC duplicate query submit detected")));
+    ereport(DEBUG3,
+            (errmsg("YAGPCC query sourceText: %s", query_desc->sourceText)));
+  }
 }
 
 void EventSender::update_nested_counters(QueryDesc *query_desc) {
   if (!is_top_level_query(query_desc, nesting_level)) {
-    auto query_msg = get_query_message(query_desc);
+    auto &query = get_query(query_desc);
     nested_calls++;
-    double end_time = protots_to_double(query_msg->message->end_time());
-    double start_time = protots_to_double(query_msg->message->start_time());
+    double end_time = protots_to_double(query.message->end_time());
+    double start_time = protots_to_double(query.message->start_time());
     if (end_time >= start_time) {
       nested_timing += end_time - start_time;
     } else {
@@ -391,6 +426,12 @@ void EventSender::update_nested_counters(QueryDesc 
*query_desc) {
   }
 }
 
-EventSender::QueryItem::QueryItem(EventSender::QueryState st,
-                                  yagpcc::SetQueryReq *msg)
-    : state(st), message(msg) {}
+bool EventSender::qdesc_submitted(QueryDesc *query_desc) {
+  if (query_desc->yagp_query_key == NULL) {
+    return false;
+  }
+  return queries.find(QueryKey::from_qdesc(query_desc)) != queries.end();
+}
+
+EventSender::QueryItem::QueryItem(QueryState st)
+    : message(std::make_unique<yagpcc::SetQueryReq>()), state(st) {}
diff --git a/src/EventSender.h b/src/EventSender.h
index f3dd1d2a528..4071d580ff9 100644
--- a/src/EventSender.h
+++ b/src/EventSender.h
@@ -1,6 +1,8 @@
 #pragma once
 
+#include <memory>
 #include <unordered_map>
+#include <tuple>
 
 #define typeid __typeid
 extern "C" {
@@ -11,12 +13,75 @@ extern "C" {
 }
 #undef typeid
 
+#include "memory/gpdbwrappers.h"
+
 class UDSConnector;
 struct QueryDesc;
 namespace yagpcc {
 class SetQueryReq;
 }
 
+#include <cstdint>
+
+struct QueryKey {
+  int tmid;
+  int ssid;
+  int ccnt;
+  int nesting_level;
+  uintptr_t query_desc_addr;
+
+  bool operator==(const QueryKey &other) const {
+    return std::tie(tmid, ssid, ccnt, nesting_level, query_desc_addr) ==
+           std::tie(other.tmid, other.ssid, other.ccnt, other.nesting_level,
+                    other.query_desc_addr);
+  }
+
+  static void register_qkey(QueryDesc *query_desc, size_t nesting_level) {
+    query_desc->yagp_query_key =
+        (YagpQueryKey *)ya_gpdb::palloc0(sizeof(YagpQueryKey));
+    int32 tmid;
+    gpmon_gettmid(&tmid);
+    query_desc->yagp_query_key->tmid = tmid;
+    query_desc->yagp_query_key->ssid = gp_session_id;
+    query_desc->yagp_query_key->ccnt = gp_command_count;
+    query_desc->yagp_query_key->nesting_level = nesting_level;
+    query_desc->yagp_query_key->query_desc_addr = (uintptr_t)query_desc;
+  }
+
+  static QueryKey from_qdesc(QueryDesc *query_desc) {
+    return {
+        .tmid = query_desc->yagp_query_key->tmid,
+        .ssid = query_desc->yagp_query_key->ssid,
+        .ccnt = query_desc->yagp_query_key->ccnt,
+        .nesting_level = query_desc->yagp_query_key->nesting_level,
+        .query_desc_addr = query_desc->yagp_query_key->query_desc_addr,
+    };
+  }
+};
+
+// 
https://www.boost.org/doc/libs/1_35_0/doc/html/boost/hash_combine_id241013.html
+template <class T> inline void hash_combine(std::size_t &seed, const T &v) {
+  std::hash<T> hasher;
+  seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+}
+
+namespace std {
+template <> struct hash<QueryKey> {
+  size_t operator()(const QueryKey &k) const noexcept {
+    size_t seed = hash<uint32_t>{}(k.tmid);
+    hash_combine(seed, k.ssid);
+    hash_combine(seed, k.ccnt);
+    hash_combine(seed, k.nesting_level);
+    uintptr_t addr = k.query_desc_addr;
+    if constexpr (SIZE_MAX < UINTPTR_MAX) {
+      addr %= SIZE_MAX;
+    }
+    hash_combine(seed, addr);
+    return seed;
+  }
+};
+} // namespace std
+
 class EventSender {
 public:
   void executor_before_start(QueryDesc *query_desc, int eflags);
@@ -31,30 +96,25 @@ public:
   ~EventSender();
 
 private:
-  enum QueryState { UNKNOWN, SUBMIT, START, END, DONE };
+  enum QueryState { SUBMIT, START, END, DONE };
 
   struct QueryItem {
-    QueryState state = QueryState::UNKNOWN;
-    yagpcc::SetQueryReq *message = nullptr;
+    std::unique_ptr<yagpcc::SetQueryReq> message;
+    QueryState state;
 
-    QueryItem(QueryState st, yagpcc::SetQueryReq *msg);
-  };
-
-  struct pair_hash {
-    std::size_t operator()(const std::pair<int, int> &p) const {
-      auto h1 = std::hash<int>{}(p.first);
-      auto h2 = std::hash<int>{}(p.second);
-      return h1 ^ h2;
-    }
+    explicit QueryItem(QueryState st);
   };
 
-  void update_query_state(QueryDesc *query_desc, QueryItem *query,
-                          QueryState new_state, bool success = true);
-  QueryItem *get_query_message(QueryDesc *query_desc);
+  void update_query_state(QueryItem &query, QueryState new_state,
+                          bool success = true);
+  QueryItem &get_query(QueryDesc *query_desc);
+  void submit_query(QueryDesc *query_desc);
   void collect_query_submit(QueryDesc *query_desc);
+  void report_query_done(QueryDesc *query_desc, QueryItem &query,
+                         QueryMetricsStatus status);
   void collect_query_done(QueryDesc *query_desc, QueryMetricsStatus status);
-  void cleanup_messages();
   void update_nested_counters(QueryDesc *query_desc);
+  bool qdesc_submitted(QueryDesc *query_desc);
 
   UDSConnector *connector = nullptr;
   int nesting_level = 0;
@@ -63,5 +123,5 @@ private:
 #ifdef IC_TEARDOWN_HOOK
   ICStatistics ic_statistics;
 #endif
-  std::unordered_map<std::pair<int, int>, QueryItem, pair_hash> query_msgs;
+  std::unordered_map<QueryKey, QueryItem> queries;
 };
\ No newline at end of file
diff --git a/src/PgUtils.cpp b/src/PgUtils.cpp
index f36cd030a39..929f0cf2681 100644
--- a/src/PgUtils.cpp
+++ b/src/PgUtils.cpp
@@ -60,14 +60,14 @@ std::string get_rg_name() {
  */
 
 bool is_top_level_query(QueryDesc *query_desc, int nesting_level) {
-  return (query_desc->gpmon_pkt &&
-          query_desc->gpmon_pkt->u.qexec.key.tmid == 0) ||
-         nesting_level == 0;
+  if (query_desc->yagp_query_key == NULL) {
+    return nesting_level == 0;
+  }
+  return query_desc->yagp_query_key->nesting_level == 0;
 }
 
 bool nesting_is_valid(QueryDesc *query_desc, int nesting_level) {
-  return (Gp_session_role == GP_ROLE_DISPATCH &&
-          Config::report_nested_queries()) ||
+  return need_report_nested_query() ||
          is_top_level_query(query_desc, nesting_level);
 }
 
diff --git a/src/ProtoUtils.cpp b/src/ProtoUtils.cpp
index 6dc39278bcd..4655433c806 100644
--- a/src/ProtoUtils.cpp
+++ b/src/ProtoUtils.cpp
@@ -58,21 +58,21 @@ void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc 
*query_desc) {
                           ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER
                           : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER);
     MemoryContext oldcxt =
-        gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt);
-    ExplainState es = gpdb::get_explain_state(query_desc, true);
+        ya_gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt);
+    ExplainState es = ya_gpdb::get_explain_state(query_desc, true);
     if (es.str) {
       *qi->mutable_plan_text() = char_to_trimmed_str(es.str->data, es.str->len,
                                                      Config::max_plan_size());
-      StringInfo norm_plan = gpdb::gen_normplan(es.str->data);
+      StringInfo norm_plan = ya_gpdb::gen_normplan(es.str->data);
       *qi->mutable_template_plan_text() = char_to_trimmed_str(
           norm_plan->data, norm_plan->len, Config::max_plan_size());
       qi->set_plan_id(
           hash_any((unsigned char *)norm_plan->data, norm_plan->len));
       qi->set_query_id(query_desc->plannedstmt->queryId);
-      gpdb::pfree(es.str->data);
-      gpdb::pfree(norm_plan->data);
+      ya_gpdb::pfree(es.str->data);
+      ya_gpdb::pfree(norm_plan->data);
     }
-    gpdb::mem_ctx_switch_to(oldcxt);
+    ya_gpdb::mem_ctx_switch_to(oldcxt);
   }
 }
 
@@ -82,7 +82,7 @@ void set_query_text(yagpcc::SetQueryReq *req, QueryDesc 
*query_desc) {
     *qi->mutable_query_text() = char_to_trimmed_str(
         query_desc->sourceText, strlen(query_desc->sourceText),
         Config::max_text_size());
-    char *norm_query = gpdb::gen_normquery(query_desc->sourceText);
+    char *norm_query = ya_gpdb::gen_normquery(query_desc->sourceText);
     *qi->mutable_template_query_text() = char_to_trimmed_str(
         norm_query, strlen(norm_query), Config::max_text_size());
   }
@@ -234,10 +234,10 @@ void set_analyze_plan_text_json(QueryDesc *query_desc,
     return;
   }
   MemoryContext oldcxt =
-      gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt);
-  ExplainState es = gpdb::get_analyze_state_json(
+      ya_gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt);
+  ExplainState es = ya_gpdb::get_analyze_state_json(
       query_desc, query_desc->instrument_options && Config::enable_analyze());
-  gpdb::mem_ctx_switch_to(oldcxt);
+  ya_gpdb::mem_ctx_switch_to(oldcxt);
   if (es.str) {
     // Remove last line break.
     if (es.str->len > 0 && es.str->data[es.str->len - 1] == '\n') {
@@ -251,6 +251,6 @@ void set_analyze_plan_text_json(QueryDesc *query_desc,
     auto trimmed_analyze =
         char_to_trimmed_str(es.str->data, es.str->len, 
Config::max_plan_size());
     req->mutable_query_info()->set_analyze_text(trimmed_analyze);
-    gpdb::pfree(es.str->data);
+    ya_gpdb::pfree(es.str->data);
   }
 }
\ No newline at end of file
diff --git a/src/UDSConnector.cpp b/src/UDSConnector.cpp
index b5b70836db4..f8c4586126d 100644
--- a/src/UDSConnector.cpp
+++ b/src/UDSConnector.cpp
@@ -44,7 +44,7 @@ bool UDSConnector::report_query(const yagpcc::SetQueryReq 
&req,
       if (connect(sockfd, (sockaddr *)&address, sizeof(address)) != -1) {
         auto data_size = req.ByteSize();
         auto total_size = data_size + sizeof(uint32_t);
-        uint8_t *buf = (uint8_t *)gpdb::palloc(total_size);
+        uint8_t *buf = (uint8_t *)ya_gpdb::palloc(total_size);
         uint32_t *size_payload = (uint32_t *)buf;
         *size_payload = data_size;
         req.SerializeWithCachedSizesToArray(buf + sizeof(uint32_t));
@@ -67,7 +67,7 @@ bool UDSConnector::report_query(const yagpcc::SetQueryReq 
&req,
         } else {
           YagpStat::report_send(total_size);
         }
-        gpdb::pfree(buf);
+        ya_gpdb::pfree(buf);
       } else {
         // log the error and go on
         log_tracing_failure(req, event);
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index 25a85f086d1..d76b7c64e10 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -229,7 +229,7 @@ Datum yagp_functions_get(FunctionCallInfo fcinfo) {
   values[3] = Int64GetDatum(stats.failed_connects);
   values[4] = Int64GetDatum(stats.failed_other);
   values[5] = Int32GetDatum(stats.max_message_size);
-  HeapTuple tuple = gpdb::heap_form_tuple(tupdesc, values, nulls);
+  HeapTuple tuple = ya_gpdb::heap_form_tuple(tupdesc, values, nulls);
   Datum result = HeapTupleGetDatum(tuple);
   PG_RETURN_DATUM(result);
 }
\ No newline at end of file
diff --git a/src/memory/gpdbwrappers.cpp b/src/memory/gpdbwrappers.cpp
index 1fba702a9f5..9d579a91a30 100644
--- a/src/memory/gpdbwrappers.cpp
+++ b/src/memory/gpdbwrappers.cpp
@@ -16,27 +16,104 @@ extern "C" {
 #include "stat_statements_parser/pg_stat_statements_ya_parser.h"
 }
 
-void *gpdb::palloc(Size size) { return detail::wrap_throw(::palloc, size); }
+namespace {
 
-void *gpdb::palloc0(Size size) { return detail::wrap_throw(::palloc0, size); }
+template <bool Throws, typename Func, typename... Args>
+auto wrap(Func &&func, Args &&...args) noexcept(!Throws)
+    -> decltype(func(std::forward<Args>(args)...)) {
 
-char *gpdb::pstrdup(const char *str) {
-  return detail::wrap_throw(::pstrdup, str);
+  using RetType = decltype(func(std::forward<Args>(args)...));
+
+  // Empty struct for void return type.
+  struct VoidResult {};
+  using ResultHolder = std::conditional_t<std::is_void_v<RetType>, VoidResult,
+                                          std::optional<RetType>>;
+
+  bool success;
+  ErrorData *edata;
+  ResultHolder result_holder;
+
+  PG_TRY();
+  {
+    if constexpr (!std::is_void_v<RetType>) {
+      result_holder.emplace(func(std::forward<Args>(args)...));
+    } else {
+      func(std::forward<Args>(args)...);
+    }
+    edata = NULL;
+    success = true;
+  }
+  PG_CATCH();
+  {
+    MemoryContext oldctx = MemoryContextSwitchTo(TopMemoryContext);
+    edata = CopyErrorData();
+    MemoryContextSwitchTo(oldctx);
+    FlushErrorState();
+    success = false;
+  }
+  PG_END_TRY();
+
+  if (!success) {
+    std::string err;
+    if (edata && edata->message) {
+      err = std::string(edata->message);
+    } else {
+      err = "Unknown error occurred";
+    }
+
+    if (edata) {
+      FreeErrorData(edata);
+    }
+
+    if constexpr (Throws) {
+      throw std::runtime_error(err);
+    }
+
+    if constexpr (!std::is_void_v<RetType>) {
+      return RetType{};
+    } else {
+      return;
+    }
+  }
+
+  if constexpr (!std::is_void_v<RetType>) {
+    return *std::move(result_holder);
+  } else {
+    return;
+  }
+}
+
+template <typename Func, typename... Args>
+auto wrap_throw(Func &&func, Args &&...args)
+    -> decltype(func(std::forward<Args>(args)...)) {
+  return wrap<true>(std::forward<Func>(func), std::forward<Args>(args)...);
 }
 
-char *gpdb::get_database_name(Oid dbid) noexcept {
-  return detail::wrap_noexcept(::get_database_name, dbid);
+template <typename Func, typename... Args>
+auto wrap_noexcept(Func &&func, Args &&...args) noexcept
+    -> decltype(func(std::forward<Args>(args)...)) {
+  return wrap<false>(std::forward<Func>(func), std::forward<Args>(args)...);
+}
+} // namespace
+
+void *ya_gpdb::palloc(Size size) { return wrap_throw(::palloc, size); }
+
+void *ya_gpdb::palloc0(Size size) { return wrap_throw(::palloc0, size); }
+
+char *ya_gpdb::pstrdup(const char *str) { return wrap_throw(::pstrdup, str); }
+
+char *ya_gpdb::get_database_name(Oid dbid) noexcept {
+  return wrap_noexcept(::get_database_name, dbid);
 }
 
-bool gpdb::split_identifier_string(char *rawstring, char separator,
-                                   List **namelist) noexcept {
-  return detail::wrap_noexcept(SplitIdentifierString, rawstring, separator,
-                               namelist);
+bool ya_gpdb::split_identifier_string(char *rawstring, char separator,
+                                      List **namelist) noexcept {
+  return wrap_noexcept(SplitIdentifierString, rawstring, separator, namelist);
 }
 
-ExplainState gpdb::get_explain_state(QueryDesc *query_desc,
-                                     bool costs) noexcept {
-  return detail::wrap_noexcept([&]() {
+ExplainState ya_gpdb::get_explain_state(QueryDesc *query_desc,
+                                        bool costs) noexcept {
+  return wrap_noexcept([&]() {
     ExplainState es;
     ExplainInitState(&es);
     es.costs = costs;
@@ -49,9 +126,9 @@ ExplainState gpdb::get_explain_state(QueryDesc *query_desc,
   });
 }
 
-ExplainState gpdb::get_analyze_state_json(QueryDesc *query_desc,
-                                          bool analyze) noexcept {
-  return detail::wrap_noexcept([&]() {
+ExplainState ya_gpdb::get_analyze_state_json(QueryDesc *query_desc,
+                                             bool analyze) noexcept {
+  return wrap_noexcept([&]() {
     ExplainState es;
     ExplainInitState(&es);
     es.analyze = analyze;
@@ -70,79 +147,77 @@ ExplainState gpdb::get_analyze_state_json(QueryDesc 
*query_desc,
   });
 }
 
-Instrumentation *gpdb::instr_alloc(size_t n, int instrument_options) {
-  return detail::wrap_throw(InstrAlloc, n, instrument_options);
+Instrumentation *ya_gpdb::instr_alloc(size_t n, int instrument_options) {
+  return wrap_throw(InstrAlloc, n, instrument_options);
 }
 
-HeapTuple gpdb::heap_form_tuple(TupleDesc tupleDescriptor, Datum *values,
-                                bool *isnull) {
+HeapTuple ya_gpdb::heap_form_tuple(TupleDesc tupleDescriptor, Datum *values,
+                                   bool *isnull) {
   if (!tupleDescriptor || !values || !isnull)
     throw std::runtime_error(
         "Invalid input parameters for heap tuple formation");
 
-  return detail::wrap_throw(::heap_form_tuple, tupleDescriptor, values, 
isnull);
+  return wrap_throw(::heap_form_tuple, tupleDescriptor, values, isnull);
 }
 
-void gpdb::pfree(void *pointer) noexcept {
+void ya_gpdb::pfree(void *pointer) noexcept {
   // Note that ::pfree asserts that pointer != NULL.
   if (!pointer)
     return;
 
-  detail::wrap_noexcept(::pfree, pointer);
+  wrap_noexcept(::pfree, pointer);
 }
 
-MemoryContext gpdb::mem_ctx_switch_to(MemoryContext context) noexcept {
+MemoryContext ya_gpdb::mem_ctx_switch_to(MemoryContext context) noexcept {
   return MemoryContextSwitchTo(context);
 }
 
-const char *gpdb::get_config_option(const char *name, bool missing_ok,
-                                    bool restrict_superuser) noexcept {
+const char *ya_gpdb::get_config_option(const char *name, bool missing_ok,
+                                       bool restrict_superuser) noexcept {
   if (!name)
     return nullptr;
 
-  return detail::wrap_noexcept(GetConfigOption, name, missing_ok,
-                               restrict_superuser);
+  return wrap_noexcept(GetConfigOption, name, missing_ok, restrict_superuser);
 }
 
-void gpdb::list_free(List *list) noexcept {
+void ya_gpdb::list_free(List *list) noexcept {
   if (!list)
     return;
 
-  detail::wrap_noexcept(::list_free, list);
+  wrap_noexcept(::list_free, list);
 }
 
 CdbExplain_ShowStatCtx *
-gpdb::cdbexplain_showExecStatsBegin(QueryDesc *query_desc,
-                                    instr_time starttime) {
+ya_gpdb::cdbexplain_showExecStatsBegin(QueryDesc *query_desc,
+                                       instr_time starttime) {
   if (!query_desc)
     throw std::runtime_error("Invalid query descriptor");
 
-  return detail::wrap_throw(::cdbexplain_showExecStatsBegin, query_desc,
-                            starttime);
+  return wrap_throw(::cdbexplain_showExecStatsBegin, query_desc, starttime);
 }
 
-void gpdb::instr_end_loop(Instrumentation *instr) {
+void ya_gpdb::instr_end_loop(Instrumentation *instr) {
   if (!instr)
     throw std::runtime_error("Invalid instrumentation pointer");
 
-  detail::wrap_throw(::InstrEndLoop, instr);
+  wrap_throw(::InstrEndLoop, instr);
 }
 
-char *gpdb::gen_normquery(const char *query) {
-  return detail::wrap_throw(::gen_normquery, query);
+char *ya_gpdb::gen_normquery(const char *query) {
+  return wrap_throw(::gen_normquery, query);
 }
 
-StringInfo gpdb::gen_normplan(const char *exec_plan) {
+StringInfo ya_gpdb::gen_normplan(const char *exec_plan) {
   if (!exec_plan)
     throw std::runtime_error("Invalid execution plan string");
 
-  return detail::wrap_throw(::gen_normplan, exec_plan);
+  return wrap_throw(::gen_normplan, exec_plan);
 }
 
-char *gpdb::get_rg_name_for_id(Oid group_id) {
-  return detail::wrap_throw(GetResGroupNameForId, group_id);
+char *ya_gpdb::get_rg_name_for_id(Oid group_id) {
+  return wrap_throw(GetResGroupNameForId, group_id);
 }
 
-Oid gpdb::get_rg_id_by_session_id(int session_id) {
-  return detail::wrap_throw(ResGroupGetGroupIdBySessionId, session_id);
+Oid ya_gpdb::get_rg_id_by_session_id(int session_id) {
+  return wrap_throw(ResGroupGetGroupIdBySessionId, session_id);
 }
\ No newline at end of file
diff --git a/src/memory/gpdbwrappers.h b/src/memory/gpdbwrappers.h
index 437a5dd5d29..ad7ae96c362 100644
--- a/src/memory/gpdbwrappers.h
+++ b/src/memory/gpdbwrappers.h
@@ -16,88 +16,7 @@ extern "C" {
 #include <utility>
 #include <string>
 
-namespace gpdb {
-namespace detail {
-
-template <bool Throws, typename Func, typename... Args>
-auto wrap(Func &&func, Args &&...args) noexcept(!Throws)
-    -> decltype(func(std::forward<Args>(args)...)) {
-
-  using RetType = decltype(func(std::forward<Args>(args)...));
-
-  // Empty struct for void return type.
-  struct VoidResult {};
-  using ResultHolder = std::conditional_t<std::is_void_v<RetType>, VoidResult,
-                                          std::optional<RetType>>;
-
-  bool success;
-  ErrorData *edata;
-  ResultHolder result_holder;
-
-  PG_TRY();
-  {
-    if constexpr (!std::is_void_v<RetType>) {
-      result_holder.emplace(func(std::forward<Args>(args)...));
-    } else {
-      func(std::forward<Args>(args)...);
-    }
-    edata = NULL;
-    success = true;
-  }
-  PG_CATCH();
-  {
-    MemoryContext oldctx = MemoryContextSwitchTo(TopMemoryContext);
-    edata = CopyErrorData();
-    MemoryContextSwitchTo(oldctx);
-    FlushErrorState();
-    success = false;
-  }
-  PG_END_TRY();
-
-  if (!success) {
-    std::string err;
-    if (edata && edata->message) {
-      err = std::string(edata->message);
-    } else {
-      err = "Unknown error occurred";
-    }
-
-    if (edata) {
-      FreeErrorData(edata);
-    }
-
-    if constexpr (Throws) {
-      throw std::runtime_error(err);
-    }
-
-    if constexpr (!std::is_void_v<RetType>) {
-      return RetType{};
-    } else {
-      return;
-    }
-  }
-
-  if constexpr (!std::is_void_v<RetType>) {
-    return *std::move(result_holder);
-  } else {
-    return;
-  }
-}
-
-template <typename Func, typename... Args>
-auto wrap_throw(Func &&func, Args &&...args)
-    -> decltype(func(std::forward<Args>(args)...)) {
-  return detail::wrap<true>(std::forward<Func>(func),
-                            std::forward<Args>(args)...);
-}
-
-template <typename Func, typename... Args>
-auto wrap_noexcept(Func &&func, Args &&...args) noexcept
-    -> decltype(func(std::forward<Args>(args)...)) {
-  return detail::wrap<false>(std::forward<Func>(func),
-                             std::forward<Args>(args)...);
-}
-} // namespace detail
+namespace ya_gpdb {
 
 // Functions that call palloc().
 // Make sure correct memory context is set.
@@ -128,4 +47,4 @@ const char *get_config_option(const char *name, bool 
missing_ok,
 void list_free(List *list) noexcept;
 Oid get_rg_id_by_session_id(int session_id);
 
-} // namespace gpdb
+} // namespace ya_gpdb


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to