This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 029897c883255a7f63b6f8c10443341760040fe9
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Thu Mar 23 15:15:26 2023 +0300

    [yagp_hooks_collector] Add extension skeleton with GRPC transport
    
    Add yagp_hooks_collector, a shared-preload module that hooks into
    ExecutorStart and ExecutorFinish to capture query lifecycle events.
    Includes Makefile with protobuf code generation, GRPC-based delivery,
    QueryInfo generation (query text, plan text, query_id, plan_id,
    session metadata), and basic protobuf message filling.
---
 .gitignore                                         |   7 +-
 Makefile                                           |   2 -
 protos/yagpcc_metrics.proto                        | 130 ++++
 protos/yagpcc_plan.proto                           | 570 +++++++++++++++
 protos/yagpcc_set_service.proto                    |  45 ++
 sql/yagp-hooks-collector--1.0.sql                  |   2 +
 sql/yagp-hooks-collector--unpackaged--1.0.sql      |   2 +
 src/EventSender.cpp                                | 189 +++++
 src/EventSender.h                                  |  19 +
 src/GrpcConnector.cpp                              |  55 ++
 src/GrpcConnector.h                                |  15 +
 src/hook_wrappers.cpp                              |  67 ++
 src/hook_wrappers.h                                |  12 +
 src/stat_statements_parser/README.MD               |   1 +
 .../pg_stat_statements_ya_parser.c                 | 771 +++++++++++++++++++++
 .../pg_stat_statements_ya_parser.h                 |  15 +
 src/yagp_hooks_collector.c                         |  22 +
 yagp-hooks-collector.control                       |   5 +
 18 files changed, 1926 insertions(+), 3 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5c21989c4ab..29b40ee096c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -73,4 +73,9 @@ lib*.pc
 /compile_commands.json
 /tmp_install/
 /.cache/
-/install/
\ No newline at end of file
+/install/
+*.o
+*.so
+src/protos/
+.vscode
+compile_commands.json
diff --git a/Makefile b/Makefile
index e9ab3fbf2d4..15c5dabb70e 100644
--- a/Makefile
+++ b/Makefile
@@ -3,14 +3,12 @@
 # to build Postgres with a different make, we have this make file
 # that, as a service, will look for a GNU make and invoke it, or show
 # an error message if none could be found.
-
 # If the user were using GNU make now, this file would not get used
 # because GNU make uses a make file named "GNUmakefile" in preference
 # to "Makefile" if it exists. PostgreSQL is shipped with a
 # "GNUmakefile". If the user hasn't run the configure script yet, the
 # GNUmakefile won't exist yet, so we catch that case as well.
 
-
 # AIX make defaults to building *every* target of the first rule.  Start with
 # a single-target, empty rule to make the other targets non-default.
 all:
diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto
new file mode 100644
index 00000000000..b7e255484c7
--- /dev/null
+++ b/protos/yagpcc_metrics.proto
@@ -0,0 +1,130 @@
+syntax = "proto3";
+
+package yagpcc;
+option java_outer_classname = "SegmentYAGPCCM";
+option go_package = 
"a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/common;greenplum";
+
+enum QueryStatus {
+    QUERY_STATUS_UNSPECIFIED = 0;
+    QUERY_STATUS_SUBMIT = 1;
+    QUERY_STATUS_START = 2;
+    QUERY_STATUS_DONE = 3;
+    QUERY_STATUS_QUERY_DONE = 4;
+    QUERY_STATUS_ERROR = 5;
+    QUERY_STATUS_CANCELLING = 6;
+    QUERY_STATUS_CANCELED = 7;
+    QUERY_STATUS_END = 8;
+}
+
+enum PlanNodeStatus {
+    PLAN_NODE_STATUS_UNSPECIFIED = 0;
+    PLAN_NODE_STATUS_INITIALIZED = 1;
+    PLAN_NODE_STATUS_EXECUTING = 2;
+    PLAN_NODE_STATUS_FINISHED = 3;
+}
+
+message QueryInfo {
+    PlanGenerator generator = 1;
+    uint64 query_id = 2;
+    uint64 plan_id = 3;
+    string queryText = 4;
+    string planText = 5;
+    SessionInfo sessionInfo = 6;
+}
+
+enum PlanGenerator
+{
+    PLAN_GENERATOR_UNSPECIFIED = 0;
+    PLAN_GENERATOR_PLANNER = 1;         /* plan produced by the planner*/
+    PLAN_GENERATOR_OPTIMIZER = 2;       /* plan produced by the optimizer*/
+}
+
+message GPMetrics {
+    SystemStat systemStat = 1;
+    MetricInstrumentation instrumentation = 2;
+    SpillInfo spill = 3;
+}
+
+message QueryInfoHeader {
+    int32 pid = 1;
+    GpId gpIdentity = 2;
+
+    int32 tmid = 3; /* A time identifier for a particular query. All records 
associated with the query will have the same tmid. */
+    int32 ssid = 4; /* The session id as shown by gp_session_id. All records 
associated with the query will have the same ssid */
+    int32 ccnt = 5; /* The command number within this session as shown by 
gp_command_count. All records associated with the query will have the same ccnt 
*/
+    int32 sliceid = 6; /* slice identificator, 0 means general info for the 
whole query */
+}
+
+message GpId {
+    int32              dbid = 1;               /* the dbid of this database */
+    int32              segindex = 2;           /* content indicator: -1 for 
entry database,
+                                                 * 0, ..., n-1 for segment 
database *
+                                                 * a primary and its mirror 
have the same segIndex */
+    GpRole gp_role = 3;
+    GpRole gp_session_role = 4;
+}
+
+enum GpRole
+{
+    GP_ROLE_UNSPECIFIED = 0;
+    GP_ROLE_UTILITY = 1;               /* Operating as a simple database 
engine */
+    GP_ROLE_DISPATCH = 2;              /* Operating as the parallel query 
dispatcher */
+    GP_ROLE_EXECUTE = 3;               /* Operating as a parallel query 
executor */
+    GP_ROLE_UNDEFINED = 4;             /* Should never see this role in use */
+}
+
+message SessionInfo {
+    string sql = 1;
+    string userName = 2;
+    string databaseName = 3;
+    string resourceGroup = 4;
+    string applicationName = 5;
+}
+
+message SystemStat {
+    /* CPU stat*/
+    double runningTimeSeconds = 1;
+    double userTimeSeconds = 2;
+    double kernelTimeSeconds = 3;
+
+    /* Memory stat */
+    uint64 vsize = 4;
+    uint64 rss = 5;
+    uint64 VmSizeKb = 6;
+    uint64 VmPeakKb = 7;
+
+   /* Storage stat */
+    uint64 rchar = 8;
+    uint64 wchar = 9;
+    uint64 syscr = 10;
+    uint64 syscw = 11;
+    uint64 read_bytes = 12;
+    uint64 write_bytes = 13;
+    uint64 cancelled_write_bytes = 14;
+}
+
+message MetricInstrumentation {
+    uint64  ntuples     = 1;    /* Total tuples produced */
+    uint64  nloops      = 2;    /* # of run cycles for this node */
+    uint64  tuplecount  = 3;    /* Tuples emitted so far this cycle */
+    double  firsttuple  = 4;    /* Time for first tuple of this cycle */
+    double  startup     = 5;    /* Total startup time (in seconds) */
+    double  total       = 6;    /* Total total time (in seconds) */
+    uint64 shared_blks_hit     = 7; /* shared blocks stats*/
+    uint64 shared_blks_read    = 8;
+    uint64 shared_blks_dirtied = 9;
+    uint64 shared_blks_written = 10;
+    uint64 local_blks_hit      = 11; /* data read from disks */
+    uint64 local_blks_read     = 12;
+    uint64 local_blks_dirtied  = 13;
+    uint64 local_blks_written  = 14;
+    uint64 temp_blks_read      = 15; /* temporary tables read stat */ 
+    uint64 temp_blks_written   = 16;
+    double blk_read_time       = 17; /* measured read/write time */
+    double blk_write_time      = 18;
+}
+
+message SpillInfo {
+    int32 fileCount = 1;
+    int64 totalBytes = 2;
+}
diff --git a/protos/yagpcc_plan.proto b/protos/yagpcc_plan.proto
new file mode 100644
index 00000000000..962fab4bbdd
--- /dev/null
+++ b/protos/yagpcc_plan.proto
@@ -0,0 +1,570 @@
+syntax = "proto3";
+
+package yagpcc;
+option java_outer_classname = "SegmentYAGPCCP";
+option go_package = 
"a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/common;greenplum";
+
+message MetricPlan {
+    GpdbNodeType   type = 1;
+
+    int32   plan_node_id = 2;
+    int32   parent_plan_node_id = 3; // Valid only for 
QueryInfoMetricQuerySubmit 
+
+    double  startup_cost = 4;  /* cost expended before fetching any tuples */
+    double  total_cost = 5;    /* total cost (assuming all tuples fetched) */
+    double  plan_rows = 6;     /* number of rows plan is expected to emit */
+    int32   plan_width = 7;    /* average row width in bytes */
+
+    int32   arg1 = 8;  // for some nodes it's additional opperand type
+    int32   arg2 = 9;  // for some nodes it's additional opperand type
+
+    MetricMotionInfo motion_info = 10;
+    MetricRelationInfo relation_info = 11;
+    
+    string scan_index_name = 12;
+    ScanDirection scan_direction = 13;
+    MetricSliceInfo slice_info = 14;
+    string statement = 15;
+}
+
+message MetricMotionInfo {
+    MotionType   type = 1;
+    bool         isBroadcast = 2;
+    CdbLocusType locusType = 3;
+
+    int32 sliceId = 4;
+    int32 parentSliceId = 5;
+}
+
+message MetricRelationInfo {
+    int32   oid = 1;
+    string  name = 2;
+    string  schema = 3;
+    string  alias = 4;
+    int32   dynamicScanId = 5;
+}
+
+message MetricSliceInfo {
+    int32       slice = 1;
+    int32       segments = 2;
+    GangType    gangType = 3;
+    int32       gang = 4;
+}
+
+enum ScanDirection
+{
+    SCAN_DIRECTION_UNSPECIFIED = 0;
+    SCAN_DIRECTION_BACKWARD = 1;
+    SCAN_DIRECTION_FORWARD = 2;
+}
+
+/* GangType enumeration is used in several structures related to CDB
+ * slice plan support.
+ */
+enum GangType
+{
+    GANG_TYPE_UNSPECIFIED = 0;
+    GANG_TYPE_UNALLOCATED = 1;       /* a root slice executed by the qDisp */
+    GANG_TYPE_ENTRYDB_READER = 2;    /* a 1-gang with read access to the entry 
db */
+    GANG_TYPE_SINGLETON_READER = 3;  /* a 1-gang to read the segment dbs */
+    GANG_TYPE_PRIMARY_READER = 4;    /* a 1-gang or N-gang to read the segment 
dbs */
+    GANG_TYPE_PRIMARY_WRITER = 5;    /* the N-gang that can update the segment 
dbs */
+}
+
+
+enum CdbLocusType
+{
+    CDB_LOCUS_TYPE_UNSPECIFIED = 0;
+    CDB_LOCUS_TYPE_ENTRY = 1;       /* a single backend process on the entry 
db:
+                                    * usually the qDisp itself, but could be a
+                                    * qExec started by the entry postmaster.
+                                    */
+
+    CDB_LOCUS_TYPE_SINGLE_QE = 2;   /* a single backend process on any db: the
+                                    * qDisp itself, or a qExec started by a
+                                    * segment postmaster or the entry 
postmaster.
+                                    */
+
+    CDB_LOCUS_TYPE_GENERAL = 3;      /* compatible with any locus (data is
+                                     * self-contained in the query plan or
+                                     * generally available in any qExec or 
qDisp) */
+
+    CDB_LOCUS_TYPE_SEGMENT_GENERAL = 4; /* generally available in any qExec, 
but not
+                                         * available in qDisp */
+
+    CDB_LOCUS_TYPE_REPLICATED = 5;    /* replicated over all qExecs of an 
N-gang */
+    CDB_LOCUS_TYPE_HASHED = 6;        /* hash partitioned over all qExecs of 
N-gang */
+    CDB_LOCUS_TYPE_HASHED_OJ = 7;     /* result of hash partitioned outer 
join, NULLs can be anywhere */
+    CDB_LOCUS_TYPE_STREWN = 8;        /* partitioned on no known function */
+    CDB_LOCUS_TYPE_END = 9;           /* = last valid CdbLocusType + 1 */
+}
+
+enum MotionType
+{
+    MOTION_TYPE_UNSPECIFIED = 0;
+    MOTION_TYPE_HASH = 1;                // Use hashing to select a segindex 
destination
+    MOTION_TYPE_FIXED = 2;               // Send tuples to a fixed set of 
segindexes
+    MOTION_TYPE_EXPLICIT = 3;            // Send tuples to the segment 
explicitly specified in their segid column
+}
+
+enum GpdbNodeType {
+       GPDB_NODE_TYPE_UNSPECIFIED = 0;
+       INDEX_INFO = 1;
+       EXPR_CONTEXT = 2;
+       PROJECTION_INFO = 3;
+       JUNK_FILTER = 4;
+       RESULT_REL_INFO = 5;
+       E_STATE = 6;
+       TUPLE_TABLE_SLOT = 7;
+       CDB_PROCESS = 8;
+       SLICE = 9;
+       SLICE_TABLE = 10;
+       CURSOR_POS_INFO = 11;
+       SHARE_NODE_ENTRY = 12;
+       PARTITION_STATE = 13;
+       QUERY_DISPATCH_DESC = 14;
+       OID_ASSIGNMENT = 15;
+       PLAN = 16;
+       SCAN = 17;
+       JOIN = 18;
+       RESULT = 19;
+       MODIFY_TABLE = 20;
+       APPEND = 21;
+       MERGE_APPEND = 22;
+       RECURSIVE_UNION = 23;
+       SEQUENCE = 24;
+       BITMAP_AND = 25;
+       BITMAP_OR = 26;
+       SEQ_SCAN = 27;
+       DYNAMIC_SEQ_SCAN = 28;
+       EXTERNAL_SCAN = 29;
+       INDEX_SCAN = 30;
+       DYNAMIC_INDEX_SCAN = 31;
+       INDEX_ONLY_SCAN = 32;
+       BITMAP_INDEX_SCAN = 33;
+       DYNAMIC_BITMAP_INDEX_SCAN = 34;
+       BITMAP_HEAP_SCAN = 35;
+       DYNAMIC_BITMAP_HEAP_SCAN = 36;
+       TID_SCAN = 37;
+       SUBQUERY_SCAN = 38;
+       FUNCTION_SCAN = 39;
+       TABLE_FUNCTION_SCAN = 40;
+       VALUES_SCAN = 41;
+       CTE_SCAN = 42;
+       WORK_TABLE_SCAN = 43;
+       FOREIGN_SCAN = 44;
+       NEST_LOOP = 45;
+       MERGE_JOIN = 46;
+       HASH_JOIN = 47;
+       MATERIAL = 48;
+       SORT = 49;
+       AGG = 50;
+       WINDOW_AGG = 51;
+       UNIQUE = 52;
+       HASH = 53;
+       SET_OP = 54;
+       LOCK_ROWS = 55;
+       LIMIT = 56;
+       MOTION = 57;
+       SHARE_INPUT_SCAN = 58;
+       REPEAT = 59;
+       DML = 60;
+       SPLIT_UPDATE = 61;
+       ROW_TRIGGER = 62;
+       ASSERT_OP = 63;
+       PARTITION_SELECTOR = 64;
+       PLAN_END = 65;
+       NEST_LOOP_PARAM = 66;
+       PLAN_ROW_MARK = 67;
+       PLAN_INVAL_ITEM = 68;
+       PLAN_STATE = 69;
+       SCAN_STATE = 70;
+       JOIN_STATE = 71;
+       RESULT_STATE = 72;
+       MODIFY_TABLE_STATE = 73;
+       APPEND_STATE = 74;
+       MERGE_APPEND_STATE = 75;
+       RECURSIVE_UNION_STATE = 76;
+       SEQUENCE_STATE = 77;
+       BITMAP_AND_STATE = 78;
+       BITMAP_OR_STATE = 79;
+       SEQ_SCAN_STATE = 80;
+       DYNAMIC_SEQ_SCAN_STATE = 81;
+       EXTERNAL_SCAN_STATE = 82;
+       INDEX_SCAN_STATE = 83;
+       DYNAMIC_INDEX_SCAN_STATE = 84;
+       INDEX_ONLY_SCAN_STATE = 85;
+       BITMAP_INDEX_SCAN_STATE = 86;
+       DYNAMIC_BITMAP_INDEX_SCAN_STATE = 87;
+       BITMAP_HEAP_SCAN_STATE = 88;
+       DYNAMIC_BITMAP_HEAP_SCAN_STATE = 89;
+       TID_SCAN_STATE = 90;
+       SUBQUERY_SCAN_STATE = 91;
+       FUNCTION_SCAN_STATE = 92;
+       TABLE_FUNCTION_STATE = 93;
+       VALUES_SCAN_STATE = 94;
+       CTE_SCAN_STATE = 95;
+       WORK_TABLE_SCAN_STATE = 96;
+       FOREIGN_SCAN_STATE = 97;
+       NEST_LOOP_STATE = 98;
+       MERGE_JOIN_STATE = 99;
+       HASH_JOIN_STATE = 100;
+       MATERIAL_STATE = 101;
+       SORT_STATE = 102;
+       AGG_STATE = 103;
+       WINDOW_AGG_STATE = 104;
+       UNIQUE_STATE = 105;
+       HASH_STATE = 106;
+       SET_OP_STATE = 107;
+       LOCK_ROWS_STATE = 108;
+       LIMIT_STATE = 109;
+       MOTION_STATE = 110;
+       SHARE_INPUT_SCAN_STATE = 111;
+       REPEAT_STATE = 112;
+       DML_STATE = 113;
+       SPLIT_UPDATE_STATE = 114;
+       ROW_TRIGGER_STATE = 115;
+       ASSERT_OP_STATE = 116;
+       PARTITION_SELECTOR_STATE = 117;
+       TUPLE_DESC_NODE = 118;
+       SERIALIZED_PARAM_EXTERN_DATA = 119;
+       ALIAS = 120;
+       RANGE_VAR = 121;
+       EXPR = 122;
+       VAR = 123;
+       CONST = 124;
+       PARAM = 125;
+       AGGREF = 126;
+       WINDOW_FUNC = 127;
+       ARRAY_REF = 128;
+       FUNC_EXPR = 129;
+       NAMED_ARG_EXPR = 130;
+       OP_EXPR = 131;
+       DISTINCT_EXPR = 132;
+       NULL_IF_EXPR = 133;
+       SCALAR_ARRAY_OP_EXPR = 134;
+       BOOL_EXPR = 135;
+       SUB_LINK = 136;
+       SUB_PLAN = 137;
+       ALTERNATIVE_SUB_PLAN = 138;
+       FIELD_SELECT = 139;
+       FIELD_STORE = 140;
+       RELABEL_TYPE = 141;
+       COERCE_VIA_IO = 142;
+       ARRAY_COERCE_EXPR = 143;
+       CONVERT_ROWTYPE_EXPR = 144;
+       COLLATE_EXPR = 145;
+       CASE_EXPR = 146;
+       CASE_WHEN = 147;
+       CASE_TEST_EXPR = 148;
+       ARRAY_EXPR = 149;
+       ROW_EXPR = 150;
+       ROW_COMPARE_EXPR = 151;
+       COALESCE_EXPR = 152;
+       MIN_MAX_EXPR = 153;
+       XML_EXPR = 154;
+       NULL_TEST = 155;
+       BOOLEAN_TEST = 156;
+       COERCE_TO_DOMAIN = 157;
+       COERCE_TO_DOMAIN_VALUES = 158;
+       SET_TO_DEFAULT = 159;
+       CURRENT_OF_EXPR = 160;
+       TARGET_ENTRY = 161;
+       RANGE_TBL_REF = 162;
+       JOIN_EXPR = 163;
+       FROM_EXPR = 164;
+       INTO_CLAUSE = 165;
+       COPY_INTO_CLAUSE = 166;
+       REFRESH_CLAUSE = 167;
+       FLOW = 168;
+       GROUPING = 169;
+       GROUP_ID = 170;
+       DISTRIBUTED_BY = 171;
+       DML_ACTION_EXPR = 172;
+       PART_SELECTED_EXPR = 173;
+       PART_DEFAULT_EXPR = 174;
+       PART_BOUND_EXPR = 175;
+       PART_BOUND_INCLUSION_EXPR = 176;
+       PART_BOUND_OPEN_EXPR = 177;
+       PART_LIST_RULE_EXPR = 178;
+       PART_LIST_NULL_TEST_EXPR = 179;
+       TABLE_OID_INFO = 180;
+       EXPR_STATE = 181;
+       GENERIC_EXPR_STATE = 182;
+       WHOLE_ROW_VAR_EXPR_STATE = 183;
+       AGGREF_EXPR_STATE = 184;
+       WINDOW_FUNC_EXPR_STATE = 185;
+       ARRAY_REF_EXPR_STATE = 186;
+       FUNC_EXPR_STATE = 187;
+       SCALAR_ARRAY_OP_EXPR_STATE = 188;
+       BOOL_EXPR_STATE = 189;
+       SUB_PLAN_STATE = 190;
+       ALTERNATIVE_SUB_PLAN_STATE = 191;
+       FIELD_SELECT_STATE = 192;
+       FIELD_STORE_STATE = 193;
+       COERCE_VIA_IO_STATE = 194;
+       ARRAY_COERCE_EXPR_STATE = 195;
+       CONVERT_ROWTYPE_EXPR_STATE = 196;
+       CASE_EXPR_STATE = 197;
+       CASE_WHEN_STATE = 198;
+       ARRAY_EXPR_STATE = 199;
+       ROW_EXPR_STATE = 200;
+       ROW_COMPARE_EXPR_STATE = 201;
+       COALESCE_EXPR_STATE = 202;
+       MIN_MAX_EXPR_STATE = 203;
+       XML_EXPR_STATE = 204;
+       NULL_TEST_STATE = 205;
+       COERCE_TO_DOMAIN_STATE = 206;
+       DOMAIN_CONSTRAINT_STATE = 207;
+       GROUPING_FUNC_EXPR_STATE = 208;
+       PART_SELECTED_EXPR_STATE = 209;
+       PART_DEFAULT_EXPR_STATE = 210;
+       PART_BOUND_EXPR_STATE = 211;
+       PART_BOUND_INCLUSION_EXPR_STATE = 212;
+       PART_BOUND_OPEN_EXPR_STATE = 213;
+       PART_LIST_RULE_EXPR_STATE = 214;
+       PART_LIST_NULL_TEST_EXPR_STATE = 215;
+       PLANNER_INFO = 216;
+       PLANNER_GLOBAL = 217;
+       REL_OPT_INFO = 218;
+       INDEX_OPT_INFO = 219;
+       PARAM_PATH_INFO = 220;
+       PATH = 221;
+       APPEND_ONLY_PATH = 222;
+       AOCS_PATH = 223;
+       EXTERNAL_PATH = 224;
+       INDEX_PATH = 225;
+       BITMAP_HEAP_PATH = 226;
+       BITMAP_AND_PATH = 227;
+       BITMAP_OR_PATH = 228;
+       NEST_PATH = 229;
+       MERGE_PATH = 230;
+       HASH_PATH = 231;
+       TID_PATH = 232;
+       FOREIGN_PATH = 233;
+       APPEND_PATH = 234;
+       MERGE_APPEND_PATH = 235;
+       RESULT_PATH = 236;
+       MATERIAL_PATH = 237;
+       UNIQUE_PATH = 238;
+       PROJECTION_PATH = 239;
+       EQUIVALENCE_CLASS = 240;
+       EQUIVALENCE_MEMBER = 241;
+       PATH_KEY = 242;
+       RESTRICT_INFO = 243;
+       PLACE_HOLDER_VAR = 244;
+       SPECIAL_JOIN_INFO = 245;
+       LATERAL_JOIN_INFO = 246;
+       APPEND_REL_INFO = 247;
+       PLACE_HOLDER_INFO = 248;
+       MIN_MAX_AGG_INFO = 249;
+       PARTITION = 250;
+       PARTITION_RULE = 251;
+       PARTITION_NODE = 252;
+       PG_PART_RULE = 253;
+       SEGFILE_MAP_NODE = 254;
+       PLANNER_PARAM_ITEM = 255;
+       CDB_MOTION_PATH = 256;
+       PARTITION_SELECTOR_PATH = 257;
+       CDB_REL_COLUMN_INFO = 258;
+       DISTRIBUTION_KEY = 259;
+       MEMORY_CONTEXT = 260;
+       ALLOC_SET_CONTEXT = 261;
+       MEMORY_ACCOUNT = 262;
+       VALUE = 263;
+       INTEGER = 264;
+       FLOAT = 265;
+       STRING = 266;
+       BIT_STRING = 267;
+       NULL_VALUE = 268;
+       LIST = 269;
+       INT_LIST = 270;
+       OID_LIST = 271;
+       QUERY = 272;
+       PLANNED_STMT = 273;
+       INSERT_STMT = 274;
+       DELETE_STMT = 275;
+       UPDATE_STMT = 276;
+       SELECT_STMT = 277;
+       ALTER_TABLE_STMT = 278;
+       ALTER_TABLE_CMD = 279;
+       ALTER_DOMAIN_STMT = 280;
+       SET_OPERATION_STMT = 281;
+       GRANT_STMT = 282;
+       GRANT_ROLE_STMT = 283;
+       ALTER_DEFAULT_PRIVILEGES_STMT = 284;
+       CLOSE_PORTAL_STMT = 285;
+       CLUSTER_STMT = 286;
+       COPY_STMT = 287;
+       CREATE_STMT = 288;
+       SINGLE_ROW_ERROR_DESC = 289;
+       EXT_TABLE_TYPE_DESC = 290;
+       CREATE_EXTERNAL_STMT = 291;
+       DEFINE_STMT = 292;
+       DROP_STMT = 293;
+       TRUNCATE_STMT = 294;
+       COMMENT_STMT = 295;
+       FETCH_STMT = 296;
+       INDEX_STMT = 297;
+       CREATE_FUNCTION_STMT = 298;
+       ALTER_FUNCTION_STMT = 299;
+       DO_STMT = 300;
+       RENAME_STMT = 301;
+       RULE_STMT = 302;
+       NOTIFY_STMT = 303;
+       LISTEN_STMT = 304;
+       UNLISTEN_STMT = 305;
+       TRANSACTION_STMT = 306;
+       VIEW_STMT = 307;
+       LOAD_STMT = 308;
+       CREATE_DOMAIN_STMT = 309;
+       CREATEDB_STMT = 310;
+       DROPDB_STMT = 311;
+       VACUUM_STMT = 312;
+       EXPLAIN_STMT = 313;
+       CREATE_TABLE_AS_STMT = 314;
+       CREATE_SEQ_STMT = 315;
+       ALTER_SEQ_STMT = 316;
+       VARIABLE_SET_STMT = 317;
+       VARIABLE_SHOW_STMT = 318;
+       DISCARD_STMT = 319;
+       CREATE_TRIG_STMT = 320;
+       CREATE_P_LANG_STMT = 321;
+       CREATE_ROLE_STMT = 322;
+       ALTER_ROLE_STMT = 323;
+       DROP_ROLE_STMT = 324;
+       CREATE_QUEUE_STMT = 325;
+       ALTER_QUEUE_STMT = 326;
+       DROP_QUEUE_STMT = 327;
+       CREATE_RESOURCE_GROUP_STMT = 328;
+       DROP_RESOURCE_GROUP_STMT = 329;
+       ALTER_RESOURCE_GROUP_STMT = 330;
+       LOCK_STMT = 331;
+       CONSTRAINTS_SET_STMT = 332;
+       REINDEX_STMT = 333;
+       CHECK_POINT_STMT = 334;
+       CREATE_SCHEMA_STMT = 335;
+       ALTER_DATABASE_STMT = 336;
+       ALTER_DATABASE_SET_STMT = 337;
+       ALTER_ROLE_SET_STMT = 338;
+       CREATE_CONVERSION_STMT = 339;
+       CREATE_CAST_STMT = 340;
+       CREATE_OP_CLASS_STMT = 341;
+       CREATE_OP_FAMILY_STMT = 342;
+       ALTER_OP_FAMILY_STMT = 343;
+       PREPARE_STMT = 344;
+       EXECUTE_STMT = 345;
+       DEALLOCATE_STMT = 346;
+       DECLARE_CURSOR_STMT = 347;
+       CREATE_TABLE_SPACE_STMT = 348;
+       DROP_TABLE_SPACE_STMT = 349;
+       ALTER_OBJECT_SCHEMA_STMT = 350;
+       ALTER_OWNER_STMT = 351;
+       DROP_OWNED_STMT = 352;
+       REASSIGN_OWNED_STMT = 353;
+       COMPOSITE_TYPE_STMT = 354;
+       CREATE_ENUM_STMT = 355;
+       CREATE_RANGE_STMT = 356;
+       ALTER_ENUM_STMT = 357;
+       ALTER_TS_DICTIONARY_STMT = 358;
+       ALTER_TS_CONFIGURATION_STMT = 359;
+       CREATE_FDW_STMT = 360;
+       ALTER_FDW_STMT = 361;
+       CREATE_FOREIGN_SERVER_STMT = 362;
+       ALTER_FOREIGN_SERVER_STMT = 363;
+       CREATE_USER_MAPPING_STMT = 364;
+       ALTER_USER_MAPPING_STMT = 365;
+       DROP_USER_MAPPING_STMT = 366;
+       ALTER_TABLE_SPACE_OPTIONS_STMT = 367;
+       ALTER_TABLE_MOVE_ALL_STMT = 368;
+       SEC_LABEL_STMT = 369;
+       CREATE_FOREIGN_TABLE_STMT = 370;
+       CREATE_EXTENSION_STMT = 371;
+       ALTER_EXTENSION_STMT = 372;
+       ALTER_EXTENSION_CONTENTS_STMT = 373;
+       CREATE_EVENT_TRIG_STMT = 374;
+       ALTER_EVENT_TRIG_STMT = 375;
+       REFRESH_MAT_VIEW_STMT = 376;
+       REPLICA_IDENTITY_STMT = 377;
+       ALTER_SYSTEM_STMT = 378;
+       PARTITION_BY = 379;
+       PARTITION_ELEM = 380;
+       PARTITION_RANGE_ITEM = 381;
+       PARTITION_BOUND_SPEC = 382;
+       PARTITION_SPEC = 383;
+       PARTITION_VALUES_SPEC = 384;
+       ALTER_PARTITION_ID = 385;
+       ALTER_PARTITION_CMD = 386;
+       INHERIT_PARTITION_CMD = 387;
+       CREATE_FILE_SPACE_STMT = 388;
+       FILE_SPACE_ENTRY = 389;
+       DROP_FILE_SPACE_STMT = 390;
+       TABLE_VALUE_EXPR = 391;
+       DENY_LOGIN_INTERVAL = 392;
+       DENY_LOGIN_POINT = 393;
+       ALTER_TYPE_STMT = 394;
+       SET_DISTRIBUTION_CMD = 395;
+       EXPAND_STMT_SPEC = 396;
+       A_EXPR = 397;
+       COLUMN_REF = 398;
+       PARAM_REF = 399;
+       A_CONST = 400;
+       FUNC_CALL = 401;
+       A_STAR = 402;
+       A_INDICES = 403;
+       A_INDIRECTION = 404;
+       A_ARRAY_EXPR = 405;
+       RES_TARGET = 406;
+       TYPE_CAST = 407;
+       COLLATE_CLAUSE = 408;
+       SORT_BY = 409;
+       WINDOW_DEF = 410;
+       RANGE_SUBSELECT = 411;
+       RANGE_FUNCTION = 412;
+       TYPE_NAME = 413;
+       COLUMN_DEF = 414;
+       INDEX_ELEM = 415;
+       CONSTRAINT = 416;
+       DEF_ELEM = 417;
+       RANGE_TBL_ENTRY = 418;
+       RANGE_TBL_FUNCTION = 419;
+       WITH_CHECK_OPTION = 420;
+       GROUPING_CLAUSE = 421;
+       GROUPING_FUNC = 422;
+       SORT_GROUP_CLAUSE = 423;
+       WINDOW_CLAUSE = 424;
+       PRIV_GRANTEE = 425;
+       FUNC_WITH_ARGS = 426;
+       ACCESS_PRIV = 427;
+       CREATE_OP_CLASS_ITEM = 428;
+       TABLE_LIKE_CLAUSE = 429;
+       FUNCTION_PARAMETER = 430;
+       LOCKING_CLAUSE = 431;
+       ROW_MARK_CLAUSE = 432;
+       XML_SERIALIZE = 433;
+       WITH_CLAUSE = 434;
+       COMMON_TABLE_EXPR = 435;
+       COLUMN_REFERENCE_STORAGE_DIRECTIVE = 436;
+       IDENTIFY_SYSTEM_CMD = 437;
+       BASE_BACKUP_CMD = 438;
+       CREATE_REPLICATION_SLOT_CMD = 439;
+       DROP_REPLICATION_SLOT_CMD = 440;
+       START_REPLICATION_CMD = 441;
+       TIME_LINE_HISTORY_CMD = 442;
+       TRIGGER_DATA = 443;
+       EVENT_TRIGGER_DATA = 444;
+       RETURN_SET_INFO = 445;
+       WINDOW_OBJECT_DATA = 446;
+       TID_BITMAP = 447;
+       INLINE_CODE_BLOCK = 448;
+       FDW_ROUTINE = 449;
+       STREAM_BITMAP = 450;
+       FORMATTER_DATA = 451;
+       EXT_PROTOCOL_DATA = 452;
+       EXT_PROTOCOL_VALIDATOR_DATA = 453;
+       SELECTED_PARTS = 454;
+       COOKED_CONSTRAINT = 455;
+       CDB_EXPLAIN_STAT_HDR = 456;
+       GP_POLICY = 457;
+       RETRIEVE_STMT = 458;
+}
diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto
new file mode 100644
index 00000000000..0bef72891ee
--- /dev/null
+++ b/protos/yagpcc_set_service.proto
@@ -0,0 +1,45 @@
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+
+import "protos/yagpcc_metrics.proto";
+import "protos/yagpcc_plan.proto";
+
+package yagpcc;
+option java_outer_classname = "SegmentYAGPCCAS";
+option go_package = 
"a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/agent_segment;greenplum";
+
+service SetQueryInfo {
+    rpc SetMetricPlanNode (SetPlanNodeReq) returns (MetricResponse) {}
+
+    rpc SetMetricQuery (SetQueryReq) returns (MetricResponse) {}
+}
+
+message MetricResponse {
+    MetricResponseStatusCode error_code = 1;
+    string error_text = 2;
+}
+
+enum MetricResponseStatusCode {
+    METRIC_RESPONSE_STATUS_CODE_UNSPECIFIED = 0;
+    METRIC_RESPONSE_STATUS_CODE_SUCCESS = 1;
+    METRIC_RESPONSE_STATUS_CODE_ERROR = 2;
+}
+
+message SetQueryReq {
+    QueryStatus query_status = 1;
+    google.protobuf.Timestamp datetime = 2;
+
+    QueryInfoHeader header = 3;
+    QueryInfo query_info = 4;
+    GPMetrics query_metrics = 5;
+    repeated MetricPlan plan_tree = 6;
+}
+
+message SetPlanNodeReq {
+    PlanNodeStatus node_status = 1;
+    google.protobuf.Timestamp datetime = 2;
+    QueryInfoHeader header = 3;
+    GPMetrics node_metrics = 4;
+    MetricPlan plan_node = 5;
+}
diff --git a/sql/yagp-hooks-collector--1.0.sql 
b/sql/yagp-hooks-collector--1.0.sql
new file mode 100644
index 00000000000..f9ab15fb400
--- /dev/null
+++ b/sql/yagp-hooks-collector--1.0.sql
@@ -0,0 +1,2 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use '''CREATE EXTENSION "yagp-hooks-collector"''' to load this file. 
\quit
diff --git a/sql/yagp-hooks-collector--unpackaged--1.0.sql 
b/sql/yagp-hooks-collector--unpackaged--1.0.sql
new file mode 100644
index 00000000000..0441c97bd84
--- /dev/null
+++ b/sql/yagp-hooks-collector--unpackaged--1.0.sql
@@ -0,0 +1,2 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use '''CREATE EXTENSION "uuid-cb" FROM unpackaged''' to load this file. 
\quit
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
new file mode 100644
index 00000000000..bb4765adeb1
--- /dev/null
+++ b/src/EventSender.cpp
@@ -0,0 +1,189 @@
+#include "EventSender.h"
+#include "GrpcConnector.h"
+#include "protos/yagpcc_set_service.pb.h"
+#include <ctime>
+
+extern "C"
+{
+#include "postgres.h"
+#include "utils/metrics_utils.h"
+#include "utils/elog.h"
+#include "executor/executor.h"
+#include "commands/explain.h"
+#include "commands/dbcommands.h"
+#include "commands/resgroupcmds.h"
+
+#include "cdb/cdbvars.h"
+#include "cdb/cdbexplain.h"
+
+#include "tcop/utility.h"
+#include "pg_stat_statements_ya_parser.h"
+}
+
+namespace
+{
+std::string* get_user_name()
+{
+    const char *username = GetConfigOption("session_authorization", false, 
false);
+    return username ? new std::string(username) : nullptr;
+}
+
+std::string* get_db_name()
+{
+    char *dbname = get_database_name(MyDatabaseId);
+    std::string* result = dbname ? new std::string(dbname) : nullptr;
+    pfree(dbname);
+    return result;
+}
+
+std::string* get_rg_name()
+{
+    auto userId = GetUserId();
+    if (!OidIsValid(userId))
+        return nullptr;
+    auto groupId = GetResGroupIdForRole(userId);
+    if (!OidIsValid(groupId))
+        return nullptr;
+    char *rgname = GetResGroupNameForId(groupId);
+    if (rgname == nullptr)
+        return nullptr;
+    pfree(rgname);
+    return new std::string(rgname);
+}
+
+std::string* get_app_name()
+{
+    return application_name ? new std::string(application_name) : nullptr;
+}
+
+int get_cur_slice_id(QueryDesc *desc)
+{
+    if (!desc->estate)
+    {
+        return 0;
+    }
+    return LocallyExecutingSliceIndex(desc->estate);
+}
+
+google::protobuf::Timestamp current_ts()
+{
+    google::protobuf::Timestamp current_ts;
+    struct timeval tv;
+    gettimeofday(&tv, nullptr);
+    current_ts.set_seconds(tv.tv_sec);
+    current_ts.set_nanos(static_cast<int32_t>(tv.tv_usec * 1000));
+    return current_ts;
+}
+
+void set_header(yagpcc::QueryInfoHeader *header, QueryDesc *queryDesc)
+{
+    header->set_pid(MyProcPid);
+    auto gpId = header->mutable_gpidentity();
+    gpId->set_dbid(GpIdentity.dbid);
+    gpId->set_segindex(GpIdentity.segindex);
+    gpId->set_gp_role(static_cast<yagpcc::GpRole>(Gp_role));
+    gpId->set_gp_session_role(static_cast<yagpcc::GpRole>(Gp_session_role));
+    header->set_ssid(gp_session_id);
+    header->set_ccnt(gp_command_count);
+    header->set_sliceid(get_cur_slice_id(queryDesc));
+    int32 tmid = 0;
+    gpmon_gettmid(&tmid);
+    header->set_tmid(tmid);
+}
+
+void set_session_info(yagpcc::SessionInfo *si, QueryDesc *queryDesc)
+{
+    if (queryDesc->sourceText)
+        *si->mutable_sql() = std::string(queryDesc->sourceText);
+    si->set_allocated_applicationname(get_app_name());
+    si->set_allocated_databasename(get_db_name());
+    si->set_allocated_resourcegroup(get_rg_name());
+    si->set_allocated_username(get_user_name());
+}
+
+ExplainState get_explain_state(QueryDesc *queryDesc, bool costs)
+{
+    ExplainState es;
+    ExplainInitState(&es);
+    es.costs = costs;
+    es.verbose = true;
+    es.format = EXPLAIN_FORMAT_TEXT;
+    ExplainBeginOutput(&es);
+    ExplainPrintPlan(&es, queryDesc);
+    ExplainEndOutput(&es);
+    return es;
+}
+
+void set_plan_text(std::string *plan_text, QueryDesc *queryDesc)
+{
+    auto es = get_explain_state(queryDesc, true);
+    *plan_text = std::string(es.str->data, es.str->len);
+}
+
+void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *queryDesc)
+{
+    set_session_info(qi->mutable_sessioninfo(), queryDesc);
+    if (queryDesc->sourceText)
+        *qi->mutable_querytext() = queryDesc->sourceText;
+    if (queryDesc->plannedstmt)
+    {
+        qi->set_generator(queryDesc->plannedstmt->planGen == PLANGEN_OPTIMIZER
+                                ? 
yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER
+                                : 
yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER);
+        set_plan_text(qi->mutable_plantext(), queryDesc);
+        qi->set_plan_id(get_plan_id(queryDesc));
+        qi->set_query_id(queryDesc->plannedstmt->queryId);
+    }
+}
+} // namespace
+
+void EventSender::ExecutorStart(QueryDesc *queryDesc, int /* eflags*/)
+{
+    elog(DEBUG1, "Query %s start recording", queryDesc->sourceText);
+    yagpcc::SetQueryReq req;
+    req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
+    *req.mutable_datetime() = current_ts();
+    set_header(req.mutable_header(), queryDesc);
+    set_query_info(req.mutable_query_info(), queryDesc);
+    auto result = connector->set_metric_query(req);
+    if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR)
+    {
+        elog(WARNING, "Query %s start reporting failed with an error %s",
+             queryDesc->sourceText, result.error_text().c_str());
+    }
+    else
+    {
+        elog(DEBUG1, "Query %s start successful", queryDesc->sourceText);
+    }
+}
+
+void EventSender::ExecutorFinish(QueryDesc *queryDesc)
+{
+    elog(DEBUG1, "Query %s finish recording", queryDesc->sourceText);
+    yagpcc::SetQueryReq req;
+    req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE);
+    *req.mutable_datetime() = current_ts();
+    set_header(req.mutable_header(), queryDesc);
+    set_query_info(req.mutable_query_info(), queryDesc);
+    auto result = connector->set_metric_query(req);
+    if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR)
+    {
+        elog(WARNING, "Query %s finish reporting failed with an error %s",
+             queryDesc->sourceText, result.error_text().c_str());
+    }
+    else
+    {
+        elog(DEBUG1, "Query %s finish successful", queryDesc->sourceText);
+    }
+}
+
+EventSender *EventSender::instance()
+{
+    static EventSender sender;
+    return &sender;
+}
+
+EventSender::EventSender()
+{
+    connector = std::make_unique<GrpcConnector>();
+}
\ No newline at end of file
diff --git a/src/EventSender.h b/src/EventSender.h
new file mode 100644
index 00000000000..70868f6c757
--- /dev/null
+++ b/src/EventSender.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <memory>
+
+class GrpcConnector;
+
+struct QueryDesc;
+
+class EventSender
+{
+public:
+    void ExecutorStart(QueryDesc *queryDesc, int eflags);
+    void ExecutorFinish(QueryDesc *queryDesc);
+    static EventSender *instance();
+
+private:
+    EventSender();
+    std::unique_ptr<GrpcConnector> connector;
+};
\ No newline at end of file
diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp
new file mode 100644
index 00000000000..7329f392010
--- /dev/null
+++ b/src/GrpcConnector.cpp
@@ -0,0 +1,55 @@
+#include "GrpcConnector.h"
+#include "yagpcc_set_service.grpc.pb.h"
+
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/channel.h>
+#include <string>
+
+class GrpcConnector::Impl
+{
+public:
+    Impl()
+    {
+        GOOGLE_PROTOBUF_VERIFY_VERSION;
+        this->stub = yagpcc::SetQueryInfo::NewStub(grpc::CreateChannel(
+            SOCKET_FILE, grpc::InsecureChannelCredentials()));
+    }
+
+    yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req)
+    {
+        yagpcc::MetricResponse response;
+        grpc::ClientContext context;
+        auto deadline = std::chrono::system_clock::now() + 
std::chrono::milliseconds(50);
+        context.set_deadline(deadline);
+
+        grpc::Status status = (stub->SetMetricQuery)(&context, req, &response);
+
+        if (!status.ok())
+        {
+            response.set_error_text("Connection lost: " + 
status.error_message() + "; " + status.error_details());
+            response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR);
+        }
+
+        return response;
+    }
+
+private:
+    const std::string SOCKET_FILE = "unix:///tmp/yagpcc_agent.sock";
+    const std::string TCP_ADDRESS = "127.0.0.1:1432";
+    std::unique_ptr<yagpcc::SetQueryInfo::Stub> stub;
+};
+
+GrpcConnector::GrpcConnector()
+{
+    impl = new Impl();
+}
+
+GrpcConnector::~GrpcConnector()
+{
+    delete impl;
+}
+
+yagpcc::MetricResponse GrpcConnector::set_metric_query(yagpcc::SetQueryReq req)
+{
+    return impl->set_metric_query(req);
+}
\ No newline at end of file
diff --git a/src/GrpcConnector.h b/src/GrpcConnector.h
new file mode 100644
index 00000000000..dc0f21706a3
--- /dev/null
+++ b/src/GrpcConnector.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include "yagpcc_set_service.pb.h"
+
+class GrpcConnector
+{
+public:
+    GrpcConnector();
+    ~GrpcConnector();
+    yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req);
+
+private:
+    class Impl;
+    Impl *impl;
+};
\ No newline at end of file
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
new file mode 100644
index 00000000000..9f3200c006f
--- /dev/null
+++ b/src/hook_wrappers.cpp
@@ -0,0 +1,67 @@
+#include "hook_wrappers.h"
+#include "EventSender.h"
+
+extern "C"
+{
+#include "postgres.h"
+#include "utils/metrics_utils.h"
+#include "utils/elog.h"
+#include "executor/executor.h"
+
+#include "cdb/cdbvars.h"
+#include "cdb/cdbexplain.h"
+
+#include "tcop/utility.h"
+}
+
+#include "stat_statements_parser/pg_stat_statements_ya_parser.h"
+
+static ExecutorStart_hook_type previous_ExecutorStart_hook = nullptr;
+static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr;
+
+static void ya_ExecutorStart_hook(QueryDesc *queryDesc, int eflags);
+static void ya_ExecutorFinish_hook(QueryDesc *queryDesc);
+
+#define REPLACE_HOOK(hookName)      \
+    previous_##hookName = hookName; \
+    hookName = ya_##hookName;
+
+void hooks_init()
+{
+    REPLACE_HOOK(ExecutorStart_hook);
+    REPLACE_HOOK(ExecutorFinish_hook);
+    stat_statements_parser_init();
+}
+
+void hooks_deinit()
+{
+    ExecutorStart_hook = previous_ExecutorStart_hook;
+    ExecutorFinish_hook = ExecutorFinish_hook;
+    stat_statements_parser_deinit();
+}
+
+#define CREATE_HOOK_WRAPPER(hookName, ...)                                 \
+    PG_TRY();                                                              \
+    {                                                                      \
+        EventSender::instance()->hookName(__VA_ARGS__);                    \
+    }                                                                      \
+    PG_CATCH();                                                            \
+    {                                                                      \
+        ereport(WARNING, (errmsg("EventSender failed in %s", #hookName))); \
+        PG_RE_THROW();                                                     \
+    }                                                                      \
+    PG_END_TRY();                                                          \
+    if (previous_##hookName##_hook)                                        \
+        (*previous_##hookName##_hook)(__VA_ARGS__);                        \
+    else                                                                   \
+        standard_##hookName(__VA_ARGS__);
+
+void ya_ExecutorStart_hook(QueryDesc *queryDesc, int eflags)
+{
+    CREATE_HOOK_WRAPPER(ExecutorStart, queryDesc, eflags);
+}
+
+void ya_ExecutorFinish_hook(QueryDesc *queryDesc)
+{
+    CREATE_HOOK_WRAPPER(ExecutorFinish, queryDesc);
+}
\ No newline at end of file
diff --git a/src/hook_wrappers.h b/src/hook_wrappers.h
new file mode 100644
index 00000000000..815fcb7cd51
--- /dev/null
+++ b/src/hook_wrappers.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern void hooks_init();
+extern void hooks_deinit();
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/src/stat_statements_parser/README.MD 
b/src/stat_statements_parser/README.MD
new file mode 100644
index 00000000000..291e31a3099
--- /dev/null
+++ b/src/stat_statements_parser/README.MD
@@ -0,0 +1 @@
+This directory contains a slightly modified subset of pg_stat_statements for 
PG v9.4 to be used in query and plan ID generation.
diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.c 
b/src/stat_statements_parser/pg_stat_statements_ya_parser.c
new file mode 100644
index 00000000000..f14742337bd
--- /dev/null
+++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.c
@@ -0,0 +1,771 @@
+#include "postgres.h"
+
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "access/hash.h"
+#include "executor/instrument.h"
+#include "executor/execdesc.h"
+#include "funcapi.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "parser/analyze.h"
+#include "parser/parsetree.h"
+#include "parser/scanner.h"
+#include "parser/gram.h"
+#include "pgstat.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/spin.h"
+#include "tcop/utility.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+
+#include "pg_stat_statements_ya_parser.h"
+
+static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL;
+
+#define JUMBLE_SIZE 1024 /* query serialization buffer size */
+
+/*
+ * Struct for tracking locations/lengths of constants during normalization
+ */
+typedef struct pgssLocationLen
+{
+       int location; /* start offset in query text */
+       int length;       /* length in bytes, or -1 to ignore */
+} pgssLocationLen;
+
+/*
+ * Working state for computing a query jumble and producing a normalized
+ * query string
+ */
+typedef struct pgssJumbleState
+{
+       /* Jumble of current query tree */
+       unsigned char *jumble;
+
+       /* Number of bytes used in jumble[] */
+       Size jumble_len;
+
+       /* Array of locations of constants that should be removed */
+       pgssLocationLen *clocations;
+
+       /* Allocated length of clocations array */
+       int clocations_buf_size;
+
+       /* Current number of valid entries in clocations array */
+       int clocations_count;
+
+       /* highest Param id we've seen, in order to start normalization 
correctly */
+       int highest_extern_param_id;
+} pgssJumbleState;
+
+static void AppendJumble(pgssJumbleState *jstate,
+                                                const unsigned char *item, 
Size size);
+static void JumbleQuery(pgssJumbleState *jstate, Query *query);
+static void JumbleRangeTable(pgssJumbleState *jstate, List *rtable);
+static void JumbleExpr(pgssJumbleState *jstate, Node *node);
+static void RecordConstLocation(pgssJumbleState *jstate, int location);
+
+static StringInfo gen_normplan(const char *execution_plan);
+
+static bool need_replace(int token);
+
+void pgss_post_parse_analyze(ParseState *pstate, Query *query);
+
+void stat_statements_parser_init()
+{
+       prev_post_parse_analyze_hook = post_parse_analyze_hook;
+       post_parse_analyze_hook = pgss_post_parse_analyze;
+}
+
+void stat_statements_parser_deinit()
+{
+       post_parse_analyze_hook = prev_post_parse_analyze_hook;
+}
+
+/*
+ * AppendJumble: Append a value that is substantive in a given query to
+ * the current jumble.
+ */
+static void
+AppendJumble(pgssJumbleState *jstate, const unsigned char *item, Size size)
+{
+       unsigned char *jumble = jstate->jumble;
+       Size jumble_len = jstate->jumble_len;
+
+       /*
+        * Whenever the jumble buffer is full, we hash the current contents and
+        * reset the buffer to contain just that hash value, thus relying on the
+        * hash to summarize everything so far.
+        */
+       while (size > 0)
+       {
+               Size part_size;
+
+               if (jumble_len >= JUMBLE_SIZE)
+               {
+                       uint32 start_hash = hash_any(jumble, JUMBLE_SIZE);
+
+                       memcpy(jumble, &start_hash, sizeof(start_hash));
+                       jumble_len = sizeof(start_hash);
+               }
+               part_size = Min(size, JUMBLE_SIZE - jumble_len);
+               memcpy(jumble + jumble_len, item, part_size);
+               jumble_len += part_size;
+               item += part_size;
+               size -= part_size;
+       }
+       jstate->jumble_len = jumble_len;
+}
+
+/*
+ * Wrappers around AppendJumble to encapsulate details of serialization
+ * of individual local variable elements.
+ */
+#define APP_JUMB(item) \
+       AppendJumble(jstate, (const unsigned char *)&(item), sizeof(item))
+#define APP_JUMB_STRING(str) \
+       AppendJumble(jstate, (const unsigned char *)(str), strlen(str) + 1)
+
+/*
+ * JumbleQuery: Selectively serialize the query tree, appending significant
+ * data to the "query jumble" while ignoring nonsignificant data.
+ *
+ * Rule of thumb for what to include is that we should ignore anything not
+ * semantically significant (such as alias names) as well as anything that can
+ * be deduced from child nodes (else we'd just be double-hashing that piece
+ * of information).
+ */
+void JumbleQuery(pgssJumbleState *jstate, Query *query)
+{
+       Assert(IsA(query, Query));
+       Assert(query->utilityStmt == NULL);
+
+       APP_JUMB(query->commandType);
+       /* resultRelation is usually predictable from commandType */
+       JumbleExpr(jstate, (Node *)query->cteList);
+       JumbleRangeTable(jstate, query->rtable);
+       JumbleExpr(jstate, (Node *)query->jointree);
+       JumbleExpr(jstate, (Node *)query->targetList);
+       JumbleExpr(jstate, (Node *)query->returningList);
+       JumbleExpr(jstate, (Node *)query->groupClause);
+       JumbleExpr(jstate, query->havingQual);
+       JumbleExpr(jstate, (Node *)query->windowClause);
+       JumbleExpr(jstate, (Node *)query->distinctClause);
+       JumbleExpr(jstate, (Node *)query->sortClause);
+       JumbleExpr(jstate, query->limitOffset);
+       JumbleExpr(jstate, query->limitCount);
+       /* we ignore rowMarks */
+       JumbleExpr(jstate, query->setOperations);
+}
+
+/*
+ * Jumble a range table
+ */
+static void
+JumbleRangeTable(pgssJumbleState *jstate, List *rtable)
+{
+       ListCell *lc;
+
+       foreach (lc, rtable)
+       {
+               RangeTblEntry *rte = (RangeTblEntry *)lfirst(lc);
+
+               Assert(IsA(rte, RangeTblEntry));
+               APP_JUMB(rte->rtekind);
+               switch (rte->rtekind)
+               {
+               case RTE_RELATION:
+                       APP_JUMB(rte->relid);
+                       break;
+               case RTE_SUBQUERY:
+                       JumbleQuery(jstate, rte->subquery);
+                       break;
+               case RTE_JOIN:
+                       APP_JUMB(rte->jointype);
+                       break;
+               case RTE_FUNCTION:
+                       JumbleExpr(jstate, (Node *)rte->functions);
+                       break;
+               case RTE_VALUES:
+                       JumbleExpr(jstate, (Node *)rte->values_lists);
+                       break;
+               case RTE_CTE:
+
+                       /*
+                        * Depending on the CTE name here isn't ideal, but it's 
the
+                        * only info we have to identify the referenced WITH 
item.
+                        */
+                       APP_JUMB_STRING(rte->ctename);
+                       APP_JUMB(rte->ctelevelsup);
+                       break;
+               default:
+                       elog(ERROR, "unrecognized RTE kind: %d", 
(int)rte->rtekind);
+                       break;
+               }
+       }
+}
+
+/*
+ * Jumble an expression tree
+ *
+ * In general this function should handle all the same node types that
+ * expression_tree_walker() does, and therefore it's coded to be as parallel
+ * to that function as possible.  However, since we are only invoked on
+ * queries immediately post-parse-analysis, we need not handle node types
+ * that only appear in planning.
+ *
+ * Note: the reason we don't simply use expression_tree_walker() is that the
+ * point of that function is to support tree walkers that don't care about
+ * most tree node types, but here we care about all types.  We should complain
+ * about any unrecognized node type.
+ */
+static void
+JumbleExpr(pgssJumbleState *jstate, Node *node)
+{
+       ListCell *temp;
+
+       if (node == NULL)
+               return;
+
+       /* Guard against stack overflow due to overly complex expressions */
+       check_stack_depth();
+
+       /*
+        * We always emit the node's NodeTag, then any additional fields that 
are
+        * considered significant, and then we recurse to any child nodes.
+        */
+       APP_JUMB(node->type);
+
+       switch (nodeTag(node))
+       {
+       case T_Var:
+       {
+               Var *var = (Var *)node;
+
+               APP_JUMB(var->varno);
+               APP_JUMB(var->varattno);
+               APP_JUMB(var->varlevelsup);
+       }
+       break;
+       case T_Const:
+       {
+               Const *c = (Const *)node;
+
+               /* We jumble only the constant's type, not its value */
+               APP_JUMB(c->consttype);
+               /* Also, record its parse location for query normalization */
+               RecordConstLocation(jstate, c->location);
+       }
+       break;
+       case T_Param:
+       {
+               Param *p = (Param *)node;
+
+               APP_JUMB(p->paramkind);
+               APP_JUMB(p->paramid);
+               APP_JUMB(p->paramtype);
+       }
+       break;
+       case T_Aggref:
+       {
+               Aggref *expr = (Aggref *)node;
+
+               APP_JUMB(expr->aggfnoid);
+               JumbleExpr(jstate, (Node *)expr->aggdirectargs);
+               JumbleExpr(jstate, (Node *)expr->args);
+               JumbleExpr(jstate, (Node *)expr->aggorder);
+               JumbleExpr(jstate, (Node *)expr->aggdistinct);
+               JumbleExpr(jstate, (Node *)expr->aggfilter);
+       }
+       break;
+       case T_WindowFunc:
+       {
+               WindowFunc *expr = (WindowFunc *)node;
+
+               APP_JUMB(expr->winfnoid);
+               APP_JUMB(expr->winref);
+               JumbleExpr(jstate, (Node *)expr->args);
+               JumbleExpr(jstate, (Node *)expr->aggfilter);
+       }
+       break;
+       case T_ArrayRef:
+       {
+               ArrayRef *aref = (ArrayRef *)node;
+
+               JumbleExpr(jstate, (Node *)aref->refupperindexpr);
+               JumbleExpr(jstate, (Node *)aref->reflowerindexpr);
+               JumbleExpr(jstate, (Node *)aref->refexpr);
+               JumbleExpr(jstate, (Node *)aref->refassgnexpr);
+       }
+       break;
+       case T_FuncExpr:
+       {
+               FuncExpr *expr = (FuncExpr *)node;
+
+               APP_JUMB(expr->funcid);
+               JumbleExpr(jstate, (Node *)expr->args);
+       }
+       break;
+       case T_NamedArgExpr:
+       {
+               NamedArgExpr *nae = (NamedArgExpr *)node;
+
+               APP_JUMB(nae->argnumber);
+               JumbleExpr(jstate, (Node *)nae->arg);
+       }
+       break;
+       case T_OpExpr:
+       case T_DistinctExpr: /* struct-equivalent to OpExpr */
+       case T_NullIfExpr:       /* struct-equivalent to OpExpr */
+       {
+               OpExpr *expr = (OpExpr *)node;
+
+               APP_JUMB(expr->opno);
+               JumbleExpr(jstate, (Node *)expr->args);
+       }
+       break;
+       case T_ScalarArrayOpExpr:
+       {
+               ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *)node;
+
+               APP_JUMB(expr->opno);
+               APP_JUMB(expr->useOr);
+               JumbleExpr(jstate, (Node *)expr->args);
+       }
+       break;
+       case T_BoolExpr:
+       {
+               BoolExpr *expr = (BoolExpr *)node;
+
+               APP_JUMB(expr->boolop);
+               JumbleExpr(jstate, (Node *)expr->args);
+       }
+       break;
+       case T_SubLink:
+       {
+               SubLink *sublink = (SubLink *)node;
+
+               APP_JUMB(sublink->subLinkType);
+               JumbleExpr(jstate, (Node *)sublink->testexpr);
+               JumbleQuery(jstate, (Query *)sublink->subselect);
+       }
+       break;
+       case T_FieldSelect:
+       {
+               FieldSelect *fs = (FieldSelect *)node;
+
+               APP_JUMB(fs->fieldnum);
+               JumbleExpr(jstate, (Node *)fs->arg);
+       }
+       break;
+       case T_FieldStore:
+       {
+               FieldStore *fstore = (FieldStore *)node;
+
+               JumbleExpr(jstate, (Node *)fstore->arg);
+               JumbleExpr(jstate, (Node *)fstore->newvals);
+       }
+       break;
+       case T_RelabelType:
+       {
+               RelabelType *rt = (RelabelType *)node;
+
+               APP_JUMB(rt->resulttype);
+               JumbleExpr(jstate, (Node *)rt->arg);
+       }
+       break;
+       case T_CoerceViaIO:
+       {
+               CoerceViaIO *cio = (CoerceViaIO *)node;
+
+               APP_JUMB(cio->resulttype);
+               JumbleExpr(jstate, (Node *)cio->arg);
+       }
+       break;
+       case T_ArrayCoerceExpr:
+       {
+               ArrayCoerceExpr *acexpr = (ArrayCoerceExpr *)node;
+
+               APP_JUMB(acexpr->resulttype);
+               JumbleExpr(jstate, (Node *)acexpr->arg);
+       }
+       break;
+       case T_ConvertRowtypeExpr:
+       {
+               ConvertRowtypeExpr *crexpr = (ConvertRowtypeExpr *)node;
+
+               APP_JUMB(crexpr->resulttype);
+               JumbleExpr(jstate, (Node *)crexpr->arg);
+       }
+       break;
+       case T_CollateExpr:
+       {
+               CollateExpr *ce = (CollateExpr *)node;
+
+               APP_JUMB(ce->collOid);
+               JumbleExpr(jstate, (Node *)ce->arg);
+       }
+       break;
+       case T_CaseExpr:
+       {
+               CaseExpr *caseexpr = (CaseExpr *)node;
+
+               JumbleExpr(jstate, (Node *)caseexpr->arg);
+               foreach (temp, caseexpr->args)
+               {
+                       CaseWhen *when = (CaseWhen *)lfirst(temp);
+
+                       Assert(IsA(when, CaseWhen));
+                       JumbleExpr(jstate, (Node *)when->expr);
+                       JumbleExpr(jstate, (Node *)when->result);
+               }
+               JumbleExpr(jstate, (Node *)caseexpr->defresult);
+       }
+       break;
+       case T_CaseTestExpr:
+       {
+               CaseTestExpr *ct = (CaseTestExpr *)node;
+
+               APP_JUMB(ct->typeId);
+       }
+       break;
+       case T_ArrayExpr:
+               JumbleExpr(jstate, (Node *)((ArrayExpr *)node)->elements);
+               break;
+       case T_RowExpr:
+               JumbleExpr(jstate, (Node *)((RowExpr *)node)->args);
+               break;
+       case T_RowCompareExpr:
+       {
+               RowCompareExpr *rcexpr = (RowCompareExpr *)node;
+
+               APP_JUMB(rcexpr->rctype);
+               JumbleExpr(jstate, (Node *)rcexpr->largs);
+               JumbleExpr(jstate, (Node *)rcexpr->rargs);
+       }
+       break;
+       case T_CoalesceExpr:
+               JumbleExpr(jstate, (Node *)((CoalesceExpr *)node)->args);
+               break;
+       case T_MinMaxExpr:
+       {
+               MinMaxExpr *mmexpr = (MinMaxExpr *)node;
+
+               APP_JUMB(mmexpr->op);
+               JumbleExpr(jstate, (Node *)mmexpr->args);
+       }
+       break;
+       case T_XmlExpr:
+       {
+               XmlExpr *xexpr = (XmlExpr *)node;
+
+               APP_JUMB(xexpr->op);
+               JumbleExpr(jstate, (Node *)xexpr->named_args);
+               JumbleExpr(jstate, (Node *)xexpr->args);
+       }
+       break;
+       case T_NullTest:
+       {
+               NullTest *nt = (NullTest *)node;
+
+               APP_JUMB(nt->nulltesttype);
+               JumbleExpr(jstate, (Node *)nt->arg);
+       }
+       break;
+       case T_BooleanTest:
+       {
+               BooleanTest *bt = (BooleanTest *)node;
+
+               APP_JUMB(bt->booltesttype);
+               JumbleExpr(jstate, (Node *)bt->arg);
+       }
+       break;
+       case T_CoerceToDomain:
+       {
+               CoerceToDomain *cd = (CoerceToDomain *)node;
+
+               APP_JUMB(cd->resulttype);
+               JumbleExpr(jstate, (Node *)cd->arg);
+       }
+       break;
+       case T_CoerceToDomainValue:
+       {
+               CoerceToDomainValue *cdv = (CoerceToDomainValue *)node;
+
+               APP_JUMB(cdv->typeId);
+       }
+       break;
+       case T_SetToDefault:
+       {
+               SetToDefault *sd = (SetToDefault *)node;
+
+               APP_JUMB(sd->typeId);
+       }
+       break;
+       case T_CurrentOfExpr:
+       {
+               CurrentOfExpr *ce = (CurrentOfExpr *)node;
+
+               APP_JUMB(ce->cvarno);
+               if (ce->cursor_name)
+                       APP_JUMB_STRING(ce->cursor_name);
+               APP_JUMB(ce->cursor_param);
+       }
+       break;
+       case T_TargetEntry:
+       {
+               TargetEntry *tle = (TargetEntry *)node;
+
+               APP_JUMB(tle->resno);
+               APP_JUMB(tle->ressortgroupref);
+               JumbleExpr(jstate, (Node *)tle->expr);
+       }
+       break;
+       case T_RangeTblRef:
+       {
+               RangeTblRef *rtr = (RangeTblRef *)node;
+
+               APP_JUMB(rtr->rtindex);
+       }
+       break;
+       case T_JoinExpr:
+       {
+               JoinExpr *join = (JoinExpr *)node;
+
+               APP_JUMB(join->jointype);
+               APP_JUMB(join->isNatural);
+               APP_JUMB(join->rtindex);
+               JumbleExpr(jstate, join->larg);
+               JumbleExpr(jstate, join->rarg);
+               JumbleExpr(jstate, join->quals);
+       }
+       break;
+       case T_FromExpr:
+       {
+               FromExpr *from = (FromExpr *)node;
+
+               JumbleExpr(jstate, (Node *)from->fromlist);
+               JumbleExpr(jstate, from->quals);
+       }
+       break;
+       case T_List:
+               foreach (temp, (List *)node)
+               {
+                       JumbleExpr(jstate, (Node *)lfirst(temp));
+               }
+               break;
+       case T_SortGroupClause:
+       {
+               SortGroupClause *sgc = (SortGroupClause *)node;
+
+               APP_JUMB(sgc->tleSortGroupRef);
+               APP_JUMB(sgc->eqop);
+               APP_JUMB(sgc->sortop);
+               APP_JUMB(sgc->nulls_first);
+       }
+       break;
+       case T_WindowClause:
+       {
+               WindowClause *wc = (WindowClause *)node;
+
+               APP_JUMB(wc->winref);
+               APP_JUMB(wc->frameOptions);
+               JumbleExpr(jstate, (Node *)wc->partitionClause);
+               JumbleExpr(jstate, (Node *)wc->orderClause);
+               JumbleExpr(jstate, wc->startOffset);
+               JumbleExpr(jstate, wc->endOffset);
+       }
+       break;
+       case T_CommonTableExpr:
+       {
+               CommonTableExpr *cte = (CommonTableExpr *)node;
+
+               /* we store the string name because RTE_CTE RTEs need it */
+               APP_JUMB_STRING(cte->ctename);
+               JumbleQuery(jstate, (Query *)cte->ctequery);
+       }
+       break;
+       case T_SetOperationStmt:
+       {
+               SetOperationStmt *setop = (SetOperationStmt *)node;
+
+               APP_JUMB(setop->op);
+               APP_JUMB(setop->all);
+               JumbleExpr(jstate, setop->larg);
+               JumbleExpr(jstate, setop->rarg);
+       }
+       break;
+       case T_RangeTblFunction:
+       {
+               RangeTblFunction *rtfunc = (RangeTblFunction *)node;
+
+               JumbleExpr(jstate, rtfunc->funcexpr);
+       }
+       break;
+       default:
+               /* Only a warning, since we can stumble along anyway */
+               elog(WARNING, "unrecognized node type: %d",
+                        (int)nodeTag(node));
+               break;
+       }
+}
+
+/*
+ * Record location of constant within query string of query tree
+ * that is currently being walked.
+ */
+static void
+RecordConstLocation(pgssJumbleState *jstate, int location)
+{
+       /* -1 indicates unknown or undefined location */
+       if (location >= 0)
+       {
+               /* enlarge array if needed */
+               if (jstate->clocations_count >= jstate->clocations_buf_size)
+               {
+                       jstate->clocations_buf_size *= 2;
+                       jstate->clocations = (pgssLocationLen *)
+                               repalloc(jstate->clocations,
+                                                jstate->clocations_buf_size *
+                                                        
sizeof(pgssLocationLen));
+               }
+               jstate->clocations[jstate->clocations_count].location = 
location;
+               /* initialize lengths to -1 to simplify 
fill_in_constant_lengths */
+               jstate->clocations[jstate->clocations_count].length = -1;
+               jstate->clocations_count++;
+       }
+}
+
+/* check if token should be replaced by substitute varable */
+static bool
+need_replace(int token)
+{
+       return (token == FCONST) || (token == ICONST) || (token == SCONST) || 
(token == BCONST) || (token == XCONST);
+}
+
+/*
+ * gen_normplan - parse execution plan using flex and replace all CONST to
+ * substitute variables.
+ */
+static StringInfo
+gen_normplan(const char *execution_plan)
+{
+       core_yyscan_t yyscanner;
+       core_yy_extra_type yyextra;
+       core_YYSTYPE yylval;
+       YYLTYPE yylloc;
+       int tok;
+       int bind_prefix = 1;
+       char *tmp_str;
+       YYLTYPE last_yylloc = 0;
+       int last_tok = 0;
+       StringInfo plan_out = makeStringInfo();
+       ;
+
+       yyscanner = scanner_init(execution_plan,
+                                                        &yyextra,
+#if PG_VERSION_NUM >= 120000
+                                                        &ScanKeywords,
+                                                        ScanKeywordTokens
+#else
+                                                        ScanKeywords,
+                                                        NumScanKeywords
+#endif
+       );
+
+       for (;;)
+       {
+               /* get the next lexem */
+               tok = core_yylex(&yylval, &yylloc, yyscanner);
+
+               /* now we store end previsous lexem in yylloc - so could prcess 
it */
+               if (need_replace(last_tok))
+               {
+                       /* substitute variable instead of CONST */
+                       int s_len = asprintf(&tmp_str, "$%i", bind_prefix++);
+                       if (s_len > 0)
+                       {
+                               appendStringInfoString(plan_out, tmp_str);
+                               free(tmp_str);
+                       }
+                       else
+                       {
+                               appendStringInfoString(plan_out, "??");
+                       }
+               }
+               else
+               {
+                       /* do not change - just copy as-is */
+                       tmp_str = strndup((char *)execution_plan + last_yylloc, 
yylloc - last_yylloc);
+                       appendStringInfoString(plan_out, tmp_str);
+                       free(tmp_str);
+               }
+               /* check if further parsing not needed */
+               if (tok == 0)
+                       break;
+               last_tok = tok;
+               last_yylloc = yylloc;
+       }
+
+       scanner_finish(yyscanner);
+
+       return plan_out;
+}
+
+uint64_t get_plan_id(QueryDesc *queryDesc)
+{
+       if (!queryDesc->sourceText)
+               return 0;
+       StringInfo normalized = gen_normplan(queryDesc->sourceText);
+       return hash_any((unsigned char *)normalized->data, normalized->len);
+}
+
+/*
+ * Post-parse-analysis hook: mark query with a queryId
+ */
+void pgss_post_parse_analyze(ParseState *pstate, Query *query)
+{
+       pgssJumbleState jstate;
+
+       if (prev_post_parse_analyze_hook)
+               prev_post_parse_analyze_hook(pstate, query);
+
+       /* Assert we didn't do this already */
+       Assert(query->queryId == 0);
+
+       /*
+        * Utility statements get queryId zero.  We do this even in cases where
+        * the statement contains an optimizable statement for which a queryId
+        * could be derived (such as EXPLAIN or DECLARE CURSOR).  For such 
cases,
+        * runtime control will first go through ProcessUtility and then the
+        * executor, and we don't want the executor hooks to do anything, since 
we
+        * are already measuring the statement's costs at the utility level.
+        */
+       if (query->utilityStmt)
+       {
+               query->queryId = 0;
+               return;
+       }
+
+       /* Set up workspace for query jumbling */
+       jstate.jumble = (unsigned char *)palloc(JUMBLE_SIZE);
+       jstate.jumble_len = 0;
+       jstate.clocations_buf_size = 32;
+       jstate.clocations = (pgssLocationLen *)
+               palloc(jstate.clocations_buf_size * sizeof(pgssLocationLen));
+       jstate.clocations_count = 0;
+
+       /* Compute query ID and mark the Query node with it */
+       JumbleQuery(&jstate, query);
+       query->queryId = hash_any(jstate.jumble, jstate.jumble_len);
+
+       /*
+        * If we are unlucky enough to get a hash of zero, use 1 instead, to
+        * prevent confusion with the utility-statement case.
+        */
+       if (query->queryId == 0)
+               query->queryId = 1;
+}
\ No newline at end of file
diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.h 
b/src/stat_statements_parser/pg_stat_statements_ya_parser.h
new file mode 100644
index 00000000000..274f96aebaf
--- /dev/null
+++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+extern void stat_statements_parser_init(void);
+extern void stat_statements_parser_deinit(void);
+
+#ifdef __cplusplus
+}
+#endif
+
+uint64_t get_plan_id(QueryDesc *queryDesc);
\ No newline at end of file
diff --git a/src/yagp_hooks_collector.c b/src/yagp_hooks_collector.c
new file mode 100644
index 00000000000..69475ea5079
--- /dev/null
+++ b/src/yagp_hooks_collector.c
@@ -0,0 +1,22 @@
+#include "postgres.h"
+#include "cdb/cdbvars.h"
+#include "fmgr.h"
+
+#include "hook_wrappers.h"
+
+PG_MODULE_MAGIC;
+
+void _PG_init(void);
+void _PG_fini(void);
+
+void _PG_init(void) {
+  if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) {
+    hooks_init();
+  }
+}
+
+void _PG_fini(void) {
+  if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) {
+    hooks_deinit();
+  }
+}
diff --git a/yagp-hooks-collector.control b/yagp-hooks-collector.control
new file mode 100644
index 00000000000..82c189a88fc
--- /dev/null
+++ b/yagp-hooks-collector.control
@@ -0,0 +1,5 @@
+# yagp-hooks-collector extension
+comment = 'Intercept query and plan execution hooks and report them to Yandex 
GPCC agents'
+default_version = '1.0'
+module_pathname = '$libdir/yagp-hooks-collector'
+superuser = true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to