This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit 029897c883255a7f63b6f8c10443341760040fe9 Author: Maxim Smyatkin <[email protected]> AuthorDate: Thu Mar 23 15:15:26 2023 +0300 [yagp_hooks_collector] Add extension skeleton with GRPC transport Add yagp_hooks_collector, a shared-preload module that hooks into ExecutorStart and ExecutorFinish to capture query lifecycle events. Includes Makefile with protobuf code generation, GRPC-based delivery, QueryInfo generation (query text, plan text, query_id, plan_id, session metadata), and basic protobuf message filling. --- .gitignore | 7 +- Makefile | 2 - protos/yagpcc_metrics.proto | 130 ++++ protos/yagpcc_plan.proto | 570 +++++++++++++++ protos/yagpcc_set_service.proto | 45 ++ sql/yagp-hooks-collector--1.0.sql | 2 + sql/yagp-hooks-collector--unpackaged--1.0.sql | 2 + src/EventSender.cpp | 189 +++++ src/EventSender.h | 19 + src/GrpcConnector.cpp | 55 ++ src/GrpcConnector.h | 15 + src/hook_wrappers.cpp | 67 ++ src/hook_wrappers.h | 12 + src/stat_statements_parser/README.MD | 1 + .../pg_stat_statements_ya_parser.c | 771 +++++++++++++++++++++ .../pg_stat_statements_ya_parser.h | 15 + src/yagp_hooks_collector.c | 22 + yagp-hooks-collector.control | 5 + 18 files changed, 1926 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 5c21989c4ab..29b40ee096c 100644 --- a/.gitignore +++ b/.gitignore @@ -73,4 +73,9 @@ lib*.pc /compile_commands.json /tmp_install/ /.cache/ -/install/ \ No newline at end of file +/install/ +*.o +*.so +src/protos/ +.vscode +compile_commands.json diff --git a/Makefile b/Makefile index e9ab3fbf2d4..15c5dabb70e 100644 --- a/Makefile +++ b/Makefile @@ -3,14 +3,12 @@ # to build Postgres with a different make, we have this make file # that, as a service, will look for a GNU make and invoke it, or show # an error message if none could be found. - # If the user were using GNU make now, this file would not get used # because GNU make uses a make file named "GNUmakefile" in preference # to "Makefile" if it exists. PostgreSQL is shipped with a # "GNUmakefile". If the user hasn't run the configure script yet, the # GNUmakefile won't exist yet, so we catch that case as well. - # AIX make defaults to building *every* target of the first rule. Start with # a single-target, empty rule to make the other targets non-default. all: diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto new file mode 100644 index 00000000000..b7e255484c7 --- /dev/null +++ b/protos/yagpcc_metrics.proto @@ -0,0 +1,130 @@ +syntax = "proto3"; + +package yagpcc; +option java_outer_classname = "SegmentYAGPCCM"; +option go_package = "a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/common;greenplum"; + +enum QueryStatus { + QUERY_STATUS_UNSPECIFIED = 0; + QUERY_STATUS_SUBMIT = 1; + QUERY_STATUS_START = 2; + QUERY_STATUS_DONE = 3; + QUERY_STATUS_QUERY_DONE = 4; + QUERY_STATUS_ERROR = 5; + QUERY_STATUS_CANCELLING = 6; + QUERY_STATUS_CANCELED = 7; + QUERY_STATUS_END = 8; +} + +enum PlanNodeStatus { + PLAN_NODE_STATUS_UNSPECIFIED = 0; + PLAN_NODE_STATUS_INITIALIZED = 1; + PLAN_NODE_STATUS_EXECUTING = 2; + PLAN_NODE_STATUS_FINISHED = 3; +} + +message QueryInfo { + PlanGenerator generator = 1; + uint64 query_id = 2; + uint64 plan_id = 3; + string queryText = 4; + string planText = 5; + SessionInfo sessionInfo = 6; +} + +enum PlanGenerator +{ + PLAN_GENERATOR_UNSPECIFIED = 0; + PLAN_GENERATOR_PLANNER = 1; /* plan produced by the planner*/ + PLAN_GENERATOR_OPTIMIZER = 2; /* plan produced by the optimizer*/ +} + +message GPMetrics { + SystemStat systemStat = 1; + MetricInstrumentation instrumentation = 2; + SpillInfo spill = 3; +} + +message QueryInfoHeader { + int32 pid = 1; + GpId gpIdentity = 2; + + int32 tmid = 3; /* A time identifier for a particular query. All records associated with the query will have the same tmid. */ + int32 ssid = 4; /* The session id as shown by gp_session_id. All records associated with the query will have the same ssid */ + int32 ccnt = 5; /* The command number within this session as shown by gp_command_count. All records associated with the query will have the same ccnt */ + int32 sliceid = 6; /* slice identificator, 0 means general info for the whole query */ +} + +message GpId { + int32 dbid = 1; /* the dbid of this database */ + int32 segindex = 2; /* content indicator: -1 for entry database, + * 0, ..., n-1 for segment database * + * a primary and its mirror have the same segIndex */ + GpRole gp_role = 3; + GpRole gp_session_role = 4; +} + +enum GpRole +{ + GP_ROLE_UNSPECIFIED = 0; + GP_ROLE_UTILITY = 1; /* Operating as a simple database engine */ + GP_ROLE_DISPATCH = 2; /* Operating as the parallel query dispatcher */ + GP_ROLE_EXECUTE = 3; /* Operating as a parallel query executor */ + GP_ROLE_UNDEFINED = 4; /* Should never see this role in use */ +} + +message SessionInfo { + string sql = 1; + string userName = 2; + string databaseName = 3; + string resourceGroup = 4; + string applicationName = 5; +} + +message SystemStat { + /* CPU stat*/ + double runningTimeSeconds = 1; + double userTimeSeconds = 2; + double kernelTimeSeconds = 3; + + /* Memory stat */ + uint64 vsize = 4; + uint64 rss = 5; + uint64 VmSizeKb = 6; + uint64 VmPeakKb = 7; + + /* Storage stat */ + uint64 rchar = 8; + uint64 wchar = 9; + uint64 syscr = 10; + uint64 syscw = 11; + uint64 read_bytes = 12; + uint64 write_bytes = 13; + uint64 cancelled_write_bytes = 14; +} + +message MetricInstrumentation { + uint64 ntuples = 1; /* Total tuples produced */ + uint64 nloops = 2; /* # of run cycles for this node */ + uint64 tuplecount = 3; /* Tuples emitted so far this cycle */ + double firsttuple = 4; /* Time for first tuple of this cycle */ + double startup = 5; /* Total startup time (in seconds) */ + double total = 6; /* Total total time (in seconds) */ + uint64 shared_blks_hit = 7; /* shared blocks stats*/ + uint64 shared_blks_read = 8; + uint64 shared_blks_dirtied = 9; + uint64 shared_blks_written = 10; + uint64 local_blks_hit = 11; /* data read from disks */ + uint64 local_blks_read = 12; + uint64 local_blks_dirtied = 13; + uint64 local_blks_written = 14; + uint64 temp_blks_read = 15; /* temporary tables read stat */ + uint64 temp_blks_written = 16; + double blk_read_time = 17; /* measured read/write time */ + double blk_write_time = 18; +} + +message SpillInfo { + int32 fileCount = 1; + int64 totalBytes = 2; +} diff --git a/protos/yagpcc_plan.proto b/protos/yagpcc_plan.proto new file mode 100644 index 00000000000..962fab4bbdd --- /dev/null +++ b/protos/yagpcc_plan.proto @@ -0,0 +1,570 @@ +syntax = "proto3"; + +package yagpcc; +option java_outer_classname = "SegmentYAGPCCP"; +option go_package = "a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/common;greenplum"; + +message MetricPlan { + GpdbNodeType type = 1; + + int32 plan_node_id = 2; + int32 parent_plan_node_id = 3; // Valid only for QueryInfoMetricQuerySubmit + + double startup_cost = 4; /* cost expended before fetching any tuples */ + double total_cost = 5; /* total cost (assuming all tuples fetched) */ + double plan_rows = 6; /* number of rows plan is expected to emit */ + int32 plan_width = 7; /* average row width in bytes */ + + int32 arg1 = 8; // for some nodes it's additional opperand type + int32 arg2 = 9; // for some nodes it's additional opperand type + + MetricMotionInfo motion_info = 10; + MetricRelationInfo relation_info = 11; + + string scan_index_name = 12; + ScanDirection scan_direction = 13; + MetricSliceInfo slice_info = 14; + string statement = 15; +} + +message MetricMotionInfo { + MotionType type = 1; + bool isBroadcast = 2; + CdbLocusType locusType = 3; + + int32 sliceId = 4; + int32 parentSliceId = 5; +} + +message MetricRelationInfo { + int32 oid = 1; + string name = 2; + string schema = 3; + string alias = 4; + int32 dynamicScanId = 5; +} + +message MetricSliceInfo { + int32 slice = 1; + int32 segments = 2; + GangType gangType = 3; + int32 gang = 4; +} + +enum ScanDirection +{ + SCAN_DIRECTION_UNSPECIFIED = 0; + SCAN_DIRECTION_BACKWARD = 1; + SCAN_DIRECTION_FORWARD = 2; +} + +/* GangType enumeration is used in several structures related to CDB + * slice plan support. + */ +enum GangType +{ + GANG_TYPE_UNSPECIFIED = 0; + GANG_TYPE_UNALLOCATED = 1; /* a root slice executed by the qDisp */ + GANG_TYPE_ENTRYDB_READER = 2; /* a 1-gang with read access to the entry db */ + GANG_TYPE_SINGLETON_READER = 3; /* a 1-gang to read the segment dbs */ + GANG_TYPE_PRIMARY_READER = 4; /* a 1-gang or N-gang to read the segment dbs */ + GANG_TYPE_PRIMARY_WRITER = 5; /* the N-gang that can update the segment dbs */ +} + + +enum CdbLocusType +{ + CDB_LOCUS_TYPE_UNSPECIFIED = 0; + CDB_LOCUS_TYPE_ENTRY = 1; /* a single backend process on the entry db: + * usually the qDisp itself, but could be a + * qExec started by the entry postmaster. + */ + + CDB_LOCUS_TYPE_SINGLE_QE = 2; /* a single backend process on any db: the + * qDisp itself, or a qExec started by a + * segment postmaster or the entry postmaster. + */ + + CDB_LOCUS_TYPE_GENERAL = 3; /* compatible with any locus (data is + * self-contained in the query plan or + * generally available in any qExec or qDisp) */ + + CDB_LOCUS_TYPE_SEGMENT_GENERAL = 4; /* generally available in any qExec, but not + * available in qDisp */ + + CDB_LOCUS_TYPE_REPLICATED = 5; /* replicated over all qExecs of an N-gang */ + CDB_LOCUS_TYPE_HASHED = 6; /* hash partitioned over all qExecs of N-gang */ + CDB_LOCUS_TYPE_HASHED_OJ = 7; /* result of hash partitioned outer join, NULLs can be anywhere */ + CDB_LOCUS_TYPE_STREWN = 8; /* partitioned on no known function */ + CDB_LOCUS_TYPE_END = 9; /* = last valid CdbLocusType + 1 */ +} + +enum MotionType +{ + MOTION_TYPE_UNSPECIFIED = 0; + MOTION_TYPE_HASH = 1; // Use hashing to select a segindex destination + MOTION_TYPE_FIXED = 2; // Send tuples to a fixed set of segindexes + MOTION_TYPE_EXPLICIT = 3; // Send tuples to the segment explicitly specified in their segid column +} + +enum GpdbNodeType { + GPDB_NODE_TYPE_UNSPECIFIED = 0; + INDEX_INFO = 1; + EXPR_CONTEXT = 2; + PROJECTION_INFO = 3; + JUNK_FILTER = 4; + RESULT_REL_INFO = 5; + E_STATE = 6; + TUPLE_TABLE_SLOT = 7; + CDB_PROCESS = 8; + SLICE = 9; + SLICE_TABLE = 10; + CURSOR_POS_INFO = 11; + SHARE_NODE_ENTRY = 12; + PARTITION_STATE = 13; + QUERY_DISPATCH_DESC = 14; + OID_ASSIGNMENT = 15; + PLAN = 16; + SCAN = 17; + JOIN = 18; + RESULT = 19; + MODIFY_TABLE = 20; + APPEND = 21; + MERGE_APPEND = 22; + RECURSIVE_UNION = 23; + SEQUENCE = 24; + BITMAP_AND = 25; + BITMAP_OR = 26; + SEQ_SCAN = 27; + DYNAMIC_SEQ_SCAN = 28; + EXTERNAL_SCAN = 29; + INDEX_SCAN = 30; + DYNAMIC_INDEX_SCAN = 31; + INDEX_ONLY_SCAN = 32; + BITMAP_INDEX_SCAN = 33; + DYNAMIC_BITMAP_INDEX_SCAN = 34; + BITMAP_HEAP_SCAN = 35; + DYNAMIC_BITMAP_HEAP_SCAN = 36; + TID_SCAN = 37; + SUBQUERY_SCAN = 38; + FUNCTION_SCAN = 39; + TABLE_FUNCTION_SCAN = 40; + VALUES_SCAN = 41; + CTE_SCAN = 42; + WORK_TABLE_SCAN = 43; + FOREIGN_SCAN = 44; + NEST_LOOP = 45; + MERGE_JOIN = 46; + HASH_JOIN = 47; + MATERIAL = 48; + SORT = 49; + AGG = 50; + WINDOW_AGG = 51; + UNIQUE = 52; + HASH = 53; + SET_OP = 54; + LOCK_ROWS = 55; + LIMIT = 56; + MOTION = 57; + SHARE_INPUT_SCAN = 58; + REPEAT = 59; + DML = 60; + SPLIT_UPDATE = 61; + ROW_TRIGGER = 62; + ASSERT_OP = 63; + PARTITION_SELECTOR = 64; + PLAN_END = 65; + NEST_LOOP_PARAM = 66; + PLAN_ROW_MARK = 67; + PLAN_INVAL_ITEM = 68; + PLAN_STATE = 69; + SCAN_STATE = 70; + JOIN_STATE = 71; + RESULT_STATE = 72; + MODIFY_TABLE_STATE = 73; + APPEND_STATE = 74; + MERGE_APPEND_STATE = 75; + RECURSIVE_UNION_STATE = 76; + SEQUENCE_STATE = 77; + BITMAP_AND_STATE = 78; + BITMAP_OR_STATE = 79; + SEQ_SCAN_STATE = 80; + DYNAMIC_SEQ_SCAN_STATE = 81; + EXTERNAL_SCAN_STATE = 82; + INDEX_SCAN_STATE = 83; + DYNAMIC_INDEX_SCAN_STATE = 84; + INDEX_ONLY_SCAN_STATE = 85; + BITMAP_INDEX_SCAN_STATE = 86; + DYNAMIC_BITMAP_INDEX_SCAN_STATE = 87; + BITMAP_HEAP_SCAN_STATE = 88; + DYNAMIC_BITMAP_HEAP_SCAN_STATE = 89; + TID_SCAN_STATE = 90; + SUBQUERY_SCAN_STATE = 91; + FUNCTION_SCAN_STATE = 92; + TABLE_FUNCTION_STATE = 93; + VALUES_SCAN_STATE = 94; + CTE_SCAN_STATE = 95; + WORK_TABLE_SCAN_STATE = 96; + FOREIGN_SCAN_STATE = 97; + NEST_LOOP_STATE = 98; + MERGE_JOIN_STATE = 99; + HASH_JOIN_STATE = 100; + MATERIAL_STATE = 101; + SORT_STATE = 102; + AGG_STATE = 103; + WINDOW_AGG_STATE = 104; + UNIQUE_STATE = 105; + HASH_STATE = 106; + SET_OP_STATE = 107; + LOCK_ROWS_STATE = 108; + LIMIT_STATE = 109; + MOTION_STATE = 110; + SHARE_INPUT_SCAN_STATE = 111; + REPEAT_STATE = 112; + DML_STATE = 113; + SPLIT_UPDATE_STATE = 114; + ROW_TRIGGER_STATE = 115; + ASSERT_OP_STATE = 116; + PARTITION_SELECTOR_STATE = 117; + TUPLE_DESC_NODE = 118; + SERIALIZED_PARAM_EXTERN_DATA = 119; + ALIAS = 120; + RANGE_VAR = 121; + EXPR = 122; + VAR = 123; + CONST = 124; + PARAM = 125; + AGGREF = 126; + WINDOW_FUNC = 127; + ARRAY_REF = 128; + FUNC_EXPR = 129; + NAMED_ARG_EXPR = 130; + OP_EXPR = 131; + DISTINCT_EXPR = 132; + NULL_IF_EXPR = 133; + SCALAR_ARRAY_OP_EXPR = 134; + BOOL_EXPR = 135; + SUB_LINK = 136; + SUB_PLAN = 137; + ALTERNATIVE_SUB_PLAN = 138; + FIELD_SELECT = 139; + FIELD_STORE = 140; + RELABEL_TYPE = 141; + COERCE_VIA_IO = 142; + ARRAY_COERCE_EXPR = 143; + CONVERT_ROWTYPE_EXPR = 144; + COLLATE_EXPR = 145; + CASE_EXPR = 146; + CASE_WHEN = 147; + CASE_TEST_EXPR = 148; + ARRAY_EXPR = 149; + ROW_EXPR = 150; + ROW_COMPARE_EXPR = 151; + COALESCE_EXPR = 152; + MIN_MAX_EXPR = 153; + XML_EXPR = 154; + NULL_TEST = 155; + BOOLEAN_TEST = 156; + COERCE_TO_DOMAIN = 157; + COERCE_TO_DOMAIN_VALUES = 158; + SET_TO_DEFAULT = 159; + CURRENT_OF_EXPR = 160; + TARGET_ENTRY = 161; + RANGE_TBL_REF = 162; + JOIN_EXPR = 163; + FROM_EXPR = 164; + INTO_CLAUSE = 165; + COPY_INTO_CLAUSE = 166; + REFRESH_CLAUSE = 167; + FLOW = 168; + GROUPING = 169; + GROUP_ID = 170; + DISTRIBUTED_BY = 171; + DML_ACTION_EXPR = 172; + PART_SELECTED_EXPR = 173; + PART_DEFAULT_EXPR = 174; + PART_BOUND_EXPR = 175; + PART_BOUND_INCLUSION_EXPR = 176; + PART_BOUND_OPEN_EXPR = 177; + PART_LIST_RULE_EXPR = 178; + PART_LIST_NULL_TEST_EXPR = 179; + TABLE_OID_INFO = 180; + EXPR_STATE = 181; + GENERIC_EXPR_STATE = 182; + WHOLE_ROW_VAR_EXPR_STATE = 183; + AGGREF_EXPR_STATE = 184; + WINDOW_FUNC_EXPR_STATE = 185; + ARRAY_REF_EXPR_STATE = 186; + FUNC_EXPR_STATE = 187; + SCALAR_ARRAY_OP_EXPR_STATE = 188; + BOOL_EXPR_STATE = 189; + SUB_PLAN_STATE = 190; + ALTERNATIVE_SUB_PLAN_STATE = 191; + FIELD_SELECT_STATE = 192; + FIELD_STORE_STATE = 193; + COERCE_VIA_IO_STATE = 194; + ARRAY_COERCE_EXPR_STATE = 195; + CONVERT_ROWTYPE_EXPR_STATE = 196; + CASE_EXPR_STATE = 197; + CASE_WHEN_STATE = 198; + ARRAY_EXPR_STATE = 199; + ROW_EXPR_STATE = 200; + ROW_COMPARE_EXPR_STATE = 201; + COALESCE_EXPR_STATE = 202; + MIN_MAX_EXPR_STATE = 203; + XML_EXPR_STATE = 204; + NULL_TEST_STATE = 205; + COERCE_TO_DOMAIN_STATE = 206; + DOMAIN_CONSTRAINT_STATE = 207; + GROUPING_FUNC_EXPR_STATE = 208; + PART_SELECTED_EXPR_STATE = 209; + PART_DEFAULT_EXPR_STATE = 210; + PART_BOUND_EXPR_STATE = 211; + PART_BOUND_INCLUSION_EXPR_STATE = 212; + PART_BOUND_OPEN_EXPR_STATE = 213; + PART_LIST_RULE_EXPR_STATE = 214; + PART_LIST_NULL_TEST_EXPR_STATE = 215; + PLANNER_INFO = 216; + PLANNER_GLOBAL = 217; + REL_OPT_INFO = 218; + INDEX_OPT_INFO = 219; + PARAM_PATH_INFO = 220; + PATH = 221; + APPEND_ONLY_PATH = 222; + AOCS_PATH = 223; + EXTERNAL_PATH = 224; + INDEX_PATH = 225; + BITMAP_HEAP_PATH = 226; + BITMAP_AND_PATH = 227; + BITMAP_OR_PATH = 228; + NEST_PATH = 229; + MERGE_PATH = 230; + HASH_PATH = 231; + TID_PATH = 232; + FOREIGN_PATH = 233; + APPEND_PATH = 234; + MERGE_APPEND_PATH = 235; + RESULT_PATH = 236; + MATERIAL_PATH = 237; + UNIQUE_PATH = 238; + PROJECTION_PATH = 239; + EQUIVALENCE_CLASS = 240; + EQUIVALENCE_MEMBER = 241; + PATH_KEY = 242; + RESTRICT_INFO = 243; + PLACE_HOLDER_VAR = 244; + SPECIAL_JOIN_INFO = 245; + LATERAL_JOIN_INFO = 246; + APPEND_REL_INFO = 247; + PLACE_HOLDER_INFO = 248; + MIN_MAX_AGG_INFO = 249; + PARTITION = 250; + PARTITION_RULE = 251; + PARTITION_NODE = 252; + PG_PART_RULE = 253; + SEGFILE_MAP_NODE = 254; + PLANNER_PARAM_ITEM = 255; + CDB_MOTION_PATH = 256; + PARTITION_SELECTOR_PATH = 257; + CDB_REL_COLUMN_INFO = 258; + DISTRIBUTION_KEY = 259; + MEMORY_CONTEXT = 260; + ALLOC_SET_CONTEXT = 261; + MEMORY_ACCOUNT = 262; + VALUE = 263; + INTEGER = 264; + FLOAT = 265; + STRING = 266; + BIT_STRING = 267; + NULL_VALUE = 268; + LIST = 269; + INT_LIST = 270; + OID_LIST = 271; + QUERY = 272; + PLANNED_STMT = 273; + INSERT_STMT = 274; + DELETE_STMT = 275; + UPDATE_STMT = 276; + SELECT_STMT = 277; + ALTER_TABLE_STMT = 278; + ALTER_TABLE_CMD = 279; + ALTER_DOMAIN_STMT = 280; + SET_OPERATION_STMT = 281; + GRANT_STMT = 282; + GRANT_ROLE_STMT = 283; + ALTER_DEFAULT_PRIVILEGES_STMT = 284; + CLOSE_PORTAL_STMT = 285; + CLUSTER_STMT = 286; + COPY_STMT = 287; + CREATE_STMT = 288; + SINGLE_ROW_ERROR_DESC = 289; + EXT_TABLE_TYPE_DESC = 290; + CREATE_EXTERNAL_STMT = 291; + DEFINE_STMT = 292; + DROP_STMT = 293; + TRUNCATE_STMT = 294; + COMMENT_STMT = 295; + FETCH_STMT = 296; + INDEX_STMT = 297; + CREATE_FUNCTION_STMT = 298; + ALTER_FUNCTION_STMT = 299; + DO_STMT = 300; + RENAME_STMT = 301; + RULE_STMT = 302; + NOTIFY_STMT = 303; + LISTEN_STMT = 304; + UNLISTEN_STMT = 305; + TRANSACTION_STMT = 306; + VIEW_STMT = 307; + LOAD_STMT = 308; + CREATE_DOMAIN_STMT = 309; + CREATEDB_STMT = 310; + DROPDB_STMT = 311; + VACUUM_STMT = 312; + EXPLAIN_STMT = 313; + CREATE_TABLE_AS_STMT = 314; + CREATE_SEQ_STMT = 315; + ALTER_SEQ_STMT = 316; + VARIABLE_SET_STMT = 317; + VARIABLE_SHOW_STMT = 318; + DISCARD_STMT = 319; + CREATE_TRIG_STMT = 320; + CREATE_P_LANG_STMT = 321; + CREATE_ROLE_STMT = 322; + ALTER_ROLE_STMT = 323; + DROP_ROLE_STMT = 324; + CREATE_QUEUE_STMT = 325; + ALTER_QUEUE_STMT = 326; + DROP_QUEUE_STMT = 327; + CREATE_RESOURCE_GROUP_STMT = 328; + DROP_RESOURCE_GROUP_STMT = 329; + ALTER_RESOURCE_GROUP_STMT = 330; + LOCK_STMT = 331; + CONSTRAINTS_SET_STMT = 332; + REINDEX_STMT = 333; + CHECK_POINT_STMT = 334; + CREATE_SCHEMA_STMT = 335; + ALTER_DATABASE_STMT = 336; + ALTER_DATABASE_SET_STMT = 337; + ALTER_ROLE_SET_STMT = 338; + CREATE_CONVERSION_STMT = 339; + CREATE_CAST_STMT = 340; + CREATE_OP_CLASS_STMT = 341; + CREATE_OP_FAMILY_STMT = 342; + ALTER_OP_FAMILY_STMT = 343; + PREPARE_STMT = 344; + EXECUTE_STMT = 345; + DEALLOCATE_STMT = 346; + DECLARE_CURSOR_STMT = 347; + CREATE_TABLE_SPACE_STMT = 348; + DROP_TABLE_SPACE_STMT = 349; + ALTER_OBJECT_SCHEMA_STMT = 350; + ALTER_OWNER_STMT = 351; + DROP_OWNED_STMT = 352; + REASSIGN_OWNED_STMT = 353; + COMPOSITE_TYPE_STMT = 354; + CREATE_ENUM_STMT = 355; + CREATE_RANGE_STMT = 356; + ALTER_ENUM_STMT = 357; + ALTER_TS_DICTIONARY_STMT = 358; + ALTER_TS_CONFIGURATION_STMT = 359; + CREATE_FDW_STMT = 360; + ALTER_FDW_STMT = 361; + CREATE_FOREIGN_SERVER_STMT = 362; + ALTER_FOREIGN_SERVER_STMT = 363; + CREATE_USER_MAPPING_STMT = 364; + ALTER_USER_MAPPING_STMT = 365; + DROP_USER_MAPPING_STMT = 366; + ALTER_TABLE_SPACE_OPTIONS_STMT = 367; + ALTER_TABLE_MOVE_ALL_STMT = 368; + SEC_LABEL_STMT = 369; + CREATE_FOREIGN_TABLE_STMT = 370; + CREATE_EXTENSION_STMT = 371; + ALTER_EXTENSION_STMT = 372; + ALTER_EXTENSION_CONTENTS_STMT = 373; + CREATE_EVENT_TRIG_STMT = 374; + ALTER_EVENT_TRIG_STMT = 375; + REFRESH_MAT_VIEW_STMT = 376; + REPLICA_IDENTITY_STMT = 377; + ALTER_SYSTEM_STMT = 378; + PARTITION_BY = 379; + PARTITION_ELEM = 380; + PARTITION_RANGE_ITEM = 381; + PARTITION_BOUND_SPEC = 382; + PARTITION_SPEC = 383; + PARTITION_VALUES_SPEC = 384; + ALTER_PARTITION_ID = 385; + ALTER_PARTITION_CMD = 386; + INHERIT_PARTITION_CMD = 387; + CREATE_FILE_SPACE_STMT = 388; + FILE_SPACE_ENTRY = 389; + DROP_FILE_SPACE_STMT = 390; + TABLE_VALUE_EXPR = 391; + DENY_LOGIN_INTERVAL = 392; + DENY_LOGIN_POINT = 393; + ALTER_TYPE_STMT = 394; + SET_DISTRIBUTION_CMD = 395; + EXPAND_STMT_SPEC = 396; + A_EXPR = 397; + COLUMN_REF = 398; + PARAM_REF = 399; + A_CONST = 400; + FUNC_CALL = 401; + A_STAR = 402; + A_INDICES = 403; + A_INDIRECTION = 404; + A_ARRAY_EXPR = 405; + RES_TARGET = 406; + TYPE_CAST = 407; + COLLATE_CLAUSE = 408; + SORT_BY = 409; + WINDOW_DEF = 410; + RANGE_SUBSELECT = 411; + RANGE_FUNCTION = 412; + TYPE_NAME = 413; + COLUMN_DEF = 414; + INDEX_ELEM = 415; + CONSTRAINT = 416; + DEF_ELEM = 417; + RANGE_TBL_ENTRY = 418; + RANGE_TBL_FUNCTION = 419; + WITH_CHECK_OPTION = 420; + GROUPING_CLAUSE = 421; + GROUPING_FUNC = 422; + SORT_GROUP_CLAUSE = 423; + WINDOW_CLAUSE = 424; + PRIV_GRANTEE = 425; + FUNC_WITH_ARGS = 426; + ACCESS_PRIV = 427; + CREATE_OP_CLASS_ITEM = 428; + TABLE_LIKE_CLAUSE = 429; + FUNCTION_PARAMETER = 430; + LOCKING_CLAUSE = 431; + ROW_MARK_CLAUSE = 432; + XML_SERIALIZE = 433; + WITH_CLAUSE = 434; + COMMON_TABLE_EXPR = 435; + COLUMN_REFERENCE_STORAGE_DIRECTIVE = 436; + IDENTIFY_SYSTEM_CMD = 437; + BASE_BACKUP_CMD = 438; + CREATE_REPLICATION_SLOT_CMD = 439; + DROP_REPLICATION_SLOT_CMD = 440; + START_REPLICATION_CMD = 441; + TIME_LINE_HISTORY_CMD = 442; + TRIGGER_DATA = 443; + EVENT_TRIGGER_DATA = 444; + RETURN_SET_INFO = 445; + WINDOW_OBJECT_DATA = 446; + TID_BITMAP = 447; + INLINE_CODE_BLOCK = 448; + FDW_ROUTINE = 449; + STREAM_BITMAP = 450; + FORMATTER_DATA = 451; + EXT_PROTOCOL_DATA = 452; + EXT_PROTOCOL_VALIDATOR_DATA = 453; + SELECTED_PARTS = 454; + COOKED_CONSTRAINT = 455; + CDB_EXPLAIN_STAT_HDR = 456; + GP_POLICY = 457; + RETRIEVE_STMT = 458; +} diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto new file mode 100644 index 00000000000..0bef72891ee --- /dev/null +++ b/protos/yagpcc_set_service.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +import "protos/yagpcc_metrics.proto"; +import "protos/yagpcc_plan.proto"; + +package yagpcc; +option java_outer_classname = "SegmentYAGPCCAS"; +option go_package = "a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/agent_segment;greenplum"; + +service SetQueryInfo { + rpc SetMetricPlanNode (SetPlanNodeReq) returns (MetricResponse) {} + + rpc SetMetricQuery (SetQueryReq) returns (MetricResponse) {} +} + +message MetricResponse { + MetricResponseStatusCode error_code = 1; + string error_text = 2; +} + +enum MetricResponseStatusCode { + METRIC_RESPONSE_STATUS_CODE_UNSPECIFIED = 0; + METRIC_RESPONSE_STATUS_CODE_SUCCESS = 1; + METRIC_RESPONSE_STATUS_CODE_ERROR = 2; +} + +message SetQueryReq { + QueryStatus query_status = 1; + google.protobuf.Timestamp datetime = 2; + + QueryInfoHeader header = 3; + QueryInfo query_info = 4; + GPMetrics query_metrics = 5; + repeated MetricPlan plan_tree = 6; +} + +message SetPlanNodeReq { + PlanNodeStatus node_status = 1; + google.protobuf.Timestamp datetime = 2; + QueryInfoHeader header = 3; + GPMetrics node_metrics = 4; + MetricPlan plan_node = 5; +} diff --git a/sql/yagp-hooks-collector--1.0.sql b/sql/yagp-hooks-collector--1.0.sql new file mode 100644 index 00000000000..f9ab15fb400 --- /dev/null +++ b/sql/yagp-hooks-collector--1.0.sql @@ -0,0 +1,2 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use '''CREATE EXTENSION "yagp-hooks-collector"''' to load this file. \quit diff --git a/sql/yagp-hooks-collector--unpackaged--1.0.sql b/sql/yagp-hooks-collector--unpackaged--1.0.sql new file mode 100644 index 00000000000..0441c97bd84 --- /dev/null +++ b/sql/yagp-hooks-collector--unpackaged--1.0.sql @@ -0,0 +1,2 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use '''CREATE EXTENSION "uuid-cb" FROM unpackaged''' to load this file. \quit diff --git a/src/EventSender.cpp b/src/EventSender.cpp new file mode 100644 index 00000000000..bb4765adeb1 --- /dev/null +++ b/src/EventSender.cpp @@ -0,0 +1,189 @@ +#include "EventSender.h" +#include "GrpcConnector.h" +#include "protos/yagpcc_set_service.pb.h" +#include <ctime> + +extern "C" +{ +#include "postgres.h" +#include "utils/metrics_utils.h" +#include "utils/elog.h" +#include "executor/executor.h" +#include "commands/explain.h" +#include "commands/dbcommands.h" +#include "commands/resgroupcmds.h" + +#include "cdb/cdbvars.h" +#include "cdb/cdbexplain.h" + +#include "tcop/utility.h" +#include "pg_stat_statements_ya_parser.h" +} + +namespace +{ +std::string* get_user_name() +{ + const char *username = GetConfigOption("session_authorization", false, false); + return username ? new std::string(username) : nullptr; +} + +std::string* get_db_name() +{ + char *dbname = get_database_name(MyDatabaseId); + std::string* result = dbname ? new std::string(dbname) : nullptr; + pfree(dbname); + return result; +} + +std::string* get_rg_name() +{ + auto userId = GetUserId(); + if (!OidIsValid(userId)) + return nullptr; + auto groupId = GetResGroupIdForRole(userId); + if (!OidIsValid(groupId)) + return nullptr; + char *rgname = GetResGroupNameForId(groupId); + if (rgname == nullptr) + return nullptr; + pfree(rgname); + return new std::string(rgname); +} + +std::string* get_app_name() +{ + return application_name ? new std::string(application_name) : nullptr; +} + +int get_cur_slice_id(QueryDesc *desc) +{ + if (!desc->estate) + { + return 0; + } + return LocallyExecutingSliceIndex(desc->estate); +} + +google::protobuf::Timestamp current_ts() +{ + google::protobuf::Timestamp current_ts; + struct timeval tv; + gettimeofday(&tv, nullptr); + current_ts.set_seconds(tv.tv_sec); + current_ts.set_nanos(static_cast<int32_t>(tv.tv_usec * 1000)); + return current_ts; +} + +void set_header(yagpcc::QueryInfoHeader *header, QueryDesc *queryDesc) +{ + header->set_pid(MyProcPid); + auto gpId = header->mutable_gpidentity(); + gpId->set_dbid(GpIdentity.dbid); + gpId->set_segindex(GpIdentity.segindex); + gpId->set_gp_role(static_cast<yagpcc::GpRole>(Gp_role)); + gpId->set_gp_session_role(static_cast<yagpcc::GpRole>(Gp_session_role)); + header->set_ssid(gp_session_id); + header->set_ccnt(gp_command_count); + header->set_sliceid(get_cur_slice_id(queryDesc)); + int32 tmid = 0; + gpmon_gettmid(&tmid); + header->set_tmid(tmid); +} + +void set_session_info(yagpcc::SessionInfo *si, QueryDesc *queryDesc) +{ + if (queryDesc->sourceText) + *si->mutable_sql() = std::string(queryDesc->sourceText); + si->set_allocated_applicationname(get_app_name()); + si->set_allocated_databasename(get_db_name()); + si->set_allocated_resourcegroup(get_rg_name()); + si->set_allocated_username(get_user_name()); +} + +ExplainState get_explain_state(QueryDesc *queryDesc, bool costs) +{ + ExplainState es; + ExplainInitState(&es); + es.costs = costs; + es.verbose = true; + es.format = EXPLAIN_FORMAT_TEXT; + ExplainBeginOutput(&es); + ExplainPrintPlan(&es, queryDesc); + ExplainEndOutput(&es); + return es; +} + +void set_plan_text(std::string *plan_text, QueryDesc *queryDesc) +{ + auto es = get_explain_state(queryDesc, true); + *plan_text = std::string(es.str->data, es.str->len); +} + +void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *queryDesc) +{ + set_session_info(qi->mutable_sessioninfo(), queryDesc); + if (queryDesc->sourceText) + *qi->mutable_querytext() = queryDesc->sourceText; + if (queryDesc->plannedstmt) + { + qi->set_generator(queryDesc->plannedstmt->planGen == PLANGEN_OPTIMIZER + ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER + : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER); + set_plan_text(qi->mutable_plantext(), queryDesc); + qi->set_plan_id(get_plan_id(queryDesc)); + qi->set_query_id(queryDesc->plannedstmt->queryId); + } +} +} // namespace + +void EventSender::ExecutorStart(QueryDesc *queryDesc, int /* eflags*/) +{ + elog(DEBUG1, "Query %s start recording", queryDesc->sourceText); + yagpcc::SetQueryReq req; + req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START); + *req.mutable_datetime() = current_ts(); + set_header(req.mutable_header(), queryDesc); + set_query_info(req.mutable_query_info(), queryDesc); + auto result = connector->set_metric_query(req); + if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) + { + elog(WARNING, "Query %s start reporting failed with an error %s", + queryDesc->sourceText, result.error_text().c_str()); + } + else + { + elog(DEBUG1, "Query %s start successful", queryDesc->sourceText); + } +} + +void EventSender::ExecutorFinish(QueryDesc *queryDesc) +{ + elog(DEBUG1, "Query %s finish recording", queryDesc->sourceText); + yagpcc::SetQueryReq req; + req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE); + *req.mutable_datetime() = current_ts(); + set_header(req.mutable_header(), queryDesc); + set_query_info(req.mutable_query_info(), queryDesc); + auto result = connector->set_metric_query(req); + if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) + { + elog(WARNING, "Query %s finish reporting failed with an error %s", + queryDesc->sourceText, result.error_text().c_str()); + } + else + { + elog(DEBUG1, "Query %s finish successful", queryDesc->sourceText); + } +} + +EventSender *EventSender::instance() +{ + static EventSender sender; + return &sender; +} + +EventSender::EventSender() +{ + connector = std::make_unique<GrpcConnector>(); +} \ No newline at end of file diff --git a/src/EventSender.h b/src/EventSender.h new file mode 100644 index 00000000000..70868f6c757 --- /dev/null +++ b/src/EventSender.h @@ -0,0 +1,19 @@ +#pragma once + +#include <memory> + +class GrpcConnector; + +struct QueryDesc; + +class EventSender +{ +public: + void ExecutorStart(QueryDesc *queryDesc, int eflags); + void ExecutorFinish(QueryDesc *queryDesc); + static EventSender *instance(); + +private: + EventSender(); + std::unique_ptr<GrpcConnector> connector; +}; \ No newline at end of file diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp new file mode 100644 index 00000000000..7329f392010 --- /dev/null +++ b/src/GrpcConnector.cpp @@ -0,0 +1,55 @@ +#include "GrpcConnector.h" +#include "yagpcc_set_service.grpc.pb.h" + +#include <grpcpp/grpcpp.h> +#include <grpcpp/channel.h> +#include <string> + +class GrpcConnector::Impl +{ +public: + Impl() + { + GOOGLE_PROTOBUF_VERIFY_VERSION; + this->stub = yagpcc::SetQueryInfo::NewStub(grpc::CreateChannel( + SOCKET_FILE, grpc::InsecureChannelCredentials())); + } + + yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req) + { + yagpcc::MetricResponse response; + grpc::ClientContext context; + auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(50); + context.set_deadline(deadline); + + grpc::Status status = (stub->SetMetricQuery)(&context, req, &response); + + if (!status.ok()) + { + response.set_error_text("Connection lost: " + status.error_message() + "; " + status.error_details()); + response.set_error_code(yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR); + } + + return response; + } + +private: + const std::string SOCKET_FILE = "unix:///tmp/yagpcc_agent.sock"; + const std::string TCP_ADDRESS = "127.0.0.1:1432"; + std::unique_ptr<yagpcc::SetQueryInfo::Stub> stub; +}; + +GrpcConnector::GrpcConnector() +{ + impl = new Impl(); +} + +GrpcConnector::~GrpcConnector() +{ + delete impl; +} + +yagpcc::MetricResponse GrpcConnector::set_metric_query(yagpcc::SetQueryReq req) +{ + return impl->set_metric_query(req); +} \ No newline at end of file diff --git a/src/GrpcConnector.h b/src/GrpcConnector.h new file mode 100644 index 00000000000..dc0f21706a3 --- /dev/null +++ b/src/GrpcConnector.h @@ -0,0 +1,15 @@ +#pragma once + +#include "yagpcc_set_service.pb.h" + +class GrpcConnector +{ +public: + GrpcConnector(); + ~GrpcConnector(); + yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req); + +private: + class Impl; + Impl *impl; +}; \ No newline at end of file diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp new file mode 100644 index 00000000000..9f3200c006f --- /dev/null +++ b/src/hook_wrappers.cpp @@ -0,0 +1,67 @@ +#include "hook_wrappers.h" +#include "EventSender.h" + +extern "C" +{ +#include "postgres.h" +#include "utils/metrics_utils.h" +#include "utils/elog.h" +#include "executor/executor.h" + +#include "cdb/cdbvars.h" +#include "cdb/cdbexplain.h" + +#include "tcop/utility.h" +} + +#include "stat_statements_parser/pg_stat_statements_ya_parser.h" + +static ExecutorStart_hook_type previous_ExecutorStart_hook = nullptr; +static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr; + +static void ya_ExecutorStart_hook(QueryDesc *queryDesc, int eflags); +static void ya_ExecutorFinish_hook(QueryDesc *queryDesc); + +#define REPLACE_HOOK(hookName) \ + previous_##hookName = hookName; \ + hookName = ya_##hookName; + +void hooks_init() +{ + REPLACE_HOOK(ExecutorStart_hook); + REPLACE_HOOK(ExecutorFinish_hook); + stat_statements_parser_init(); +} + +void hooks_deinit() +{ + ExecutorStart_hook = previous_ExecutorStart_hook; + ExecutorFinish_hook = ExecutorFinish_hook; + stat_statements_parser_deinit(); +} + +#define CREATE_HOOK_WRAPPER(hookName, ...) \ + PG_TRY(); \ + { \ + EventSender::instance()->hookName(__VA_ARGS__); \ + } \ + PG_CATCH(); \ + { \ + ereport(WARNING, (errmsg("EventSender failed in %s", #hookName))); \ + PG_RE_THROW(); \ + } \ + PG_END_TRY(); \ + if (previous_##hookName##_hook) \ + (*previous_##hookName##_hook)(__VA_ARGS__); \ + else \ + standard_##hookName(__VA_ARGS__); + +void ya_ExecutorStart_hook(QueryDesc *queryDesc, int eflags) +{ + CREATE_HOOK_WRAPPER(ExecutorStart, queryDesc, eflags); +} + +void ya_ExecutorFinish_hook(QueryDesc *queryDesc) +{ + CREATE_HOOK_WRAPPER(ExecutorFinish, queryDesc); +} \ No newline at end of file diff --git a/src/hook_wrappers.h b/src/hook_wrappers.h new file mode 100644 index 00000000000..815fcb7cd51 --- /dev/null +++ b/src/hook_wrappers.h @@ -0,0 +1,12 @@ +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +extern void hooks_init(); +extern void hooks_deinit(); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/src/stat_statements_parser/README.MD b/src/stat_statements_parser/README.MD new file mode 100644 index 00000000000..291e31a3099 --- /dev/null +++ b/src/stat_statements_parser/README.MD @@ -0,0 +1 @@ +This directory contains a slightly modified subset of pg_stat_statements for PG v9.4 to be used in query and plan ID generation. diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.c b/src/stat_statements_parser/pg_stat_statements_ya_parser.c new file mode 100644 index 00000000000..f14742337bd --- /dev/null +++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.c @@ -0,0 +1,771 @@ +#include "postgres.h" + +#include <sys/stat.h> +#include <unistd.h> + +#include "access/hash.h" +#include "executor/instrument.h" +#include "executor/execdesc.h" +#include "funcapi.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "parser/analyze.h" +#include "parser/parsetree.h" +#include "parser/scanner.h" +#include "parser/gram.h" +#include "pgstat.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/spin.h" +#include "tcop/utility.h" +#include "utils/builtins.h" +#include "utils/memutils.h" + +#include "pg_stat_statements_ya_parser.h" + +static post_parse_analyze_hook_type prev_post_parse_analyze_hook = NULL; + +#define JUMBLE_SIZE 1024 /* query serialization buffer size */ + +/* + * Struct for tracking locations/lengths of constants during normalization + */ +typedef struct pgssLocationLen +{ + int location; /* start offset in query text */ + int length; /* length in bytes, or -1 to ignore */ +} pgssLocationLen; + +/* + * Working state for computing a query jumble and producing a normalized + * query string + */ +typedef struct pgssJumbleState +{ + /* Jumble of current query tree */ + unsigned char *jumble; + + /* Number of bytes used in jumble[] */ + Size jumble_len; + + /* Array of locations of constants that should be removed */ + pgssLocationLen *clocations; + + /* Allocated length of clocations array */ + int clocations_buf_size; + + /* Current number of valid entries in clocations array */ + int clocations_count; + + /* highest Param id we've seen, in order to start normalization correctly */ + int highest_extern_param_id; +} pgssJumbleState; + +static void AppendJumble(pgssJumbleState *jstate, + const unsigned char *item, Size size); +static void JumbleQuery(pgssJumbleState *jstate, Query *query); +static void JumbleRangeTable(pgssJumbleState *jstate, List *rtable); +static void JumbleExpr(pgssJumbleState *jstate, Node *node); +static void RecordConstLocation(pgssJumbleState *jstate, int location); + +static StringInfo gen_normplan(const char *execution_plan); + +static bool need_replace(int token); + +void pgss_post_parse_analyze(ParseState *pstate, Query *query); + +void stat_statements_parser_init() +{ + prev_post_parse_analyze_hook = post_parse_analyze_hook; + post_parse_analyze_hook = pgss_post_parse_analyze; +} + +void stat_statements_parser_deinit() +{ + post_parse_analyze_hook = prev_post_parse_analyze_hook; +} + +/* + * AppendJumble: Append a value that is substantive in a given query to + * the current jumble. + */ +static void +AppendJumble(pgssJumbleState *jstate, const unsigned char *item, Size size) +{ + unsigned char *jumble = jstate->jumble; + Size jumble_len = jstate->jumble_len; + + /* + * Whenever the jumble buffer is full, we hash the current contents and + * reset the buffer to contain just that hash value, thus relying on the + * hash to summarize everything so far. + */ + while (size > 0) + { + Size part_size; + + if (jumble_len >= JUMBLE_SIZE) + { + uint32 start_hash = hash_any(jumble, JUMBLE_SIZE); + + memcpy(jumble, &start_hash, sizeof(start_hash)); + jumble_len = sizeof(start_hash); + } + part_size = Min(size, JUMBLE_SIZE - jumble_len); + memcpy(jumble + jumble_len, item, part_size); + jumble_len += part_size; + item += part_size; + size -= part_size; + } + jstate->jumble_len = jumble_len; +} + +/* + * Wrappers around AppendJumble to encapsulate details of serialization + * of individual local variable elements. + */ +#define APP_JUMB(item) \ + AppendJumble(jstate, (const unsigned char *)&(item), sizeof(item)) +#define APP_JUMB_STRING(str) \ + AppendJumble(jstate, (const unsigned char *)(str), strlen(str) + 1) + +/* + * JumbleQuery: Selectively serialize the query tree, appending significant + * data to the "query jumble" while ignoring nonsignificant data. + * + * Rule of thumb for what to include is that we should ignore anything not + * semantically significant (such as alias names) as well as anything that can + * be deduced from child nodes (else we'd just be double-hashing that piece + * of information). + */ +void JumbleQuery(pgssJumbleState *jstate, Query *query) +{ + Assert(IsA(query, Query)); + Assert(query->utilityStmt == NULL); + + APP_JUMB(query->commandType); + /* resultRelation is usually predictable from commandType */ + JumbleExpr(jstate, (Node *)query->cteList); + JumbleRangeTable(jstate, query->rtable); + JumbleExpr(jstate, (Node *)query->jointree); + JumbleExpr(jstate, (Node *)query->targetList); + JumbleExpr(jstate, (Node *)query->returningList); + JumbleExpr(jstate, (Node *)query->groupClause); + JumbleExpr(jstate, query->havingQual); + JumbleExpr(jstate, (Node *)query->windowClause); + JumbleExpr(jstate, (Node *)query->distinctClause); + JumbleExpr(jstate, (Node *)query->sortClause); + JumbleExpr(jstate, query->limitOffset); + JumbleExpr(jstate, query->limitCount); + /* we ignore rowMarks */ + JumbleExpr(jstate, query->setOperations); +} + +/* + * Jumble a range table + */ +static void +JumbleRangeTable(pgssJumbleState *jstate, List *rtable) +{ + ListCell *lc; + + foreach (lc, rtable) + { + RangeTblEntry *rte = (RangeTblEntry *)lfirst(lc); + + Assert(IsA(rte, RangeTblEntry)); + APP_JUMB(rte->rtekind); + switch (rte->rtekind) + { + case RTE_RELATION: + APP_JUMB(rte->relid); + break; + case RTE_SUBQUERY: + JumbleQuery(jstate, rte->subquery); + break; + case RTE_JOIN: + APP_JUMB(rte->jointype); + break; + case RTE_FUNCTION: + JumbleExpr(jstate, (Node *)rte->functions); + break; + case RTE_VALUES: + JumbleExpr(jstate, (Node *)rte->values_lists); + break; + case RTE_CTE: + + /* + * Depending on the CTE name here isn't ideal, but it's the + * only info we have to identify the referenced WITH item. + */ + APP_JUMB_STRING(rte->ctename); + APP_JUMB(rte->ctelevelsup); + break; + default: + elog(ERROR, "unrecognized RTE kind: %d", (int)rte->rtekind); + break; + } + } +} + +/* + * Jumble an expression tree + * + * In general this function should handle all the same node types that + * expression_tree_walker() does, and therefore it's coded to be as parallel + * to that function as possible. However, since we are only invoked on + * queries immediately post-parse-analysis, we need not handle node types + * that only appear in planning. + * + * Note: the reason we don't simply use expression_tree_walker() is that the + * point of that function is to support tree walkers that don't care about + * most tree node types, but here we care about all types. We should complain + * about any unrecognized node type. + */ +static void +JumbleExpr(pgssJumbleState *jstate, Node *node) +{ + ListCell *temp; + + if (node == NULL) + return; + + /* Guard against stack overflow due to overly complex expressions */ + check_stack_depth(); + + /* + * We always emit the node's NodeTag, then any additional fields that are + * considered significant, and then we recurse to any child nodes. + */ + APP_JUMB(node->type); + + switch (nodeTag(node)) + { + case T_Var: + { + Var *var = (Var *)node; + + APP_JUMB(var->varno); + APP_JUMB(var->varattno); + APP_JUMB(var->varlevelsup); + } + break; + case T_Const: + { + Const *c = (Const *)node; + + /* We jumble only the constant's type, not its value */ + APP_JUMB(c->consttype); + /* Also, record its parse location for query normalization */ + RecordConstLocation(jstate, c->location); + } + break; + case T_Param: + { + Param *p = (Param *)node; + + APP_JUMB(p->paramkind); + APP_JUMB(p->paramid); + APP_JUMB(p->paramtype); + } + break; + case T_Aggref: + { + Aggref *expr = (Aggref *)node; + + APP_JUMB(expr->aggfnoid); + JumbleExpr(jstate, (Node *)expr->aggdirectargs); + JumbleExpr(jstate, (Node *)expr->args); + JumbleExpr(jstate, (Node *)expr->aggorder); + JumbleExpr(jstate, (Node *)expr->aggdistinct); + JumbleExpr(jstate, (Node *)expr->aggfilter); + } + break; + case T_WindowFunc: + { + WindowFunc *expr = (WindowFunc *)node; + + APP_JUMB(expr->winfnoid); + APP_JUMB(expr->winref); + JumbleExpr(jstate, (Node *)expr->args); + JumbleExpr(jstate, (Node *)expr->aggfilter); + } + break; + case T_ArrayRef: + { + ArrayRef *aref = (ArrayRef *)node; + + JumbleExpr(jstate, (Node *)aref->refupperindexpr); + JumbleExpr(jstate, (Node *)aref->reflowerindexpr); + JumbleExpr(jstate, (Node *)aref->refexpr); + JumbleExpr(jstate, (Node *)aref->refassgnexpr); + } + break; + case T_FuncExpr: + { + FuncExpr *expr = (FuncExpr *)node; + + APP_JUMB(expr->funcid); + JumbleExpr(jstate, (Node *)expr->args); + } + break; + case T_NamedArgExpr: + { + NamedArgExpr *nae = (NamedArgExpr *)node; + + APP_JUMB(nae->argnumber); + JumbleExpr(jstate, (Node *)nae->arg); + } + break; + case T_OpExpr: + case T_DistinctExpr: /* struct-equivalent to OpExpr */ + case T_NullIfExpr: /* struct-equivalent to OpExpr */ + { + OpExpr *expr = (OpExpr *)node; + + APP_JUMB(expr->opno); + JumbleExpr(jstate, (Node *)expr->args); + } + break; + case T_ScalarArrayOpExpr: + { + ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *)node; + + APP_JUMB(expr->opno); + APP_JUMB(expr->useOr); + JumbleExpr(jstate, (Node *)expr->args); + } + break; + case T_BoolExpr: + { + BoolExpr *expr = (BoolExpr *)node; + + APP_JUMB(expr->boolop); + JumbleExpr(jstate, (Node *)expr->args); + } + break; + case T_SubLink: + { + SubLink *sublink = (SubLink *)node; + + APP_JUMB(sublink->subLinkType); + JumbleExpr(jstate, (Node *)sublink->testexpr); + JumbleQuery(jstate, (Query *)sublink->subselect); + } + break; + case T_FieldSelect: + { + FieldSelect *fs = (FieldSelect *)node; + + APP_JUMB(fs->fieldnum); + JumbleExpr(jstate, (Node *)fs->arg); + } + break; + case T_FieldStore: + { + FieldStore *fstore = (FieldStore *)node; + + JumbleExpr(jstate, (Node *)fstore->arg); + JumbleExpr(jstate, (Node *)fstore->newvals); + } + break; + case T_RelabelType: + { + RelabelType *rt = (RelabelType *)node; + + APP_JUMB(rt->resulttype); + JumbleExpr(jstate, (Node *)rt->arg); + } + break; + case T_CoerceViaIO: + { + CoerceViaIO *cio = (CoerceViaIO *)node; + + APP_JUMB(cio->resulttype); + JumbleExpr(jstate, (Node *)cio->arg); + } + break; + case T_ArrayCoerceExpr: + { + ArrayCoerceExpr *acexpr = (ArrayCoerceExpr *)node; + + APP_JUMB(acexpr->resulttype); + JumbleExpr(jstate, (Node *)acexpr->arg); + } + break; + case T_ConvertRowtypeExpr: + { + ConvertRowtypeExpr *crexpr = (ConvertRowtypeExpr *)node; + + APP_JUMB(crexpr->resulttype); + JumbleExpr(jstate, (Node *)crexpr->arg); + } + break; + case T_CollateExpr: + { + CollateExpr *ce = (CollateExpr *)node; + + APP_JUMB(ce->collOid); + JumbleExpr(jstate, (Node *)ce->arg); + } + break; + case T_CaseExpr: + { + CaseExpr *caseexpr = (CaseExpr *)node; + + JumbleExpr(jstate, (Node *)caseexpr->arg); + foreach (temp, caseexpr->args) + { + CaseWhen *when = (CaseWhen *)lfirst(temp); + + Assert(IsA(when, CaseWhen)); + JumbleExpr(jstate, (Node *)when->expr); + JumbleExpr(jstate, (Node *)when->result); + } + JumbleExpr(jstate, (Node *)caseexpr->defresult); + } + break; + case T_CaseTestExpr: + { + CaseTestExpr *ct = (CaseTestExpr *)node; + + APP_JUMB(ct->typeId); + } + break; + case T_ArrayExpr: + JumbleExpr(jstate, (Node *)((ArrayExpr *)node)->elements); + break; + case T_RowExpr: + JumbleExpr(jstate, (Node *)((RowExpr *)node)->args); + break; + case T_RowCompareExpr: + { + RowCompareExpr *rcexpr = (RowCompareExpr *)node; + + APP_JUMB(rcexpr->rctype); + JumbleExpr(jstate, (Node *)rcexpr->largs); + JumbleExpr(jstate, (Node *)rcexpr->rargs); + } + break; + case T_CoalesceExpr: + JumbleExpr(jstate, (Node *)((CoalesceExpr *)node)->args); + break; + case T_MinMaxExpr: + { + MinMaxExpr *mmexpr = (MinMaxExpr *)node; + + APP_JUMB(mmexpr->op); + JumbleExpr(jstate, (Node *)mmexpr->args); + } + break; + case T_XmlExpr: + { + XmlExpr *xexpr = (XmlExpr *)node; + + APP_JUMB(xexpr->op); + JumbleExpr(jstate, (Node *)xexpr->named_args); + JumbleExpr(jstate, (Node *)xexpr->args); + } + break; + case T_NullTest: + { + NullTest *nt = (NullTest *)node; + + APP_JUMB(nt->nulltesttype); + JumbleExpr(jstate, (Node *)nt->arg); + } + break; + case T_BooleanTest: + { + BooleanTest *bt = (BooleanTest *)node; + + APP_JUMB(bt->booltesttype); + JumbleExpr(jstate, (Node *)bt->arg); + } + break; + case T_CoerceToDomain: + { + CoerceToDomain *cd = (CoerceToDomain *)node; + + APP_JUMB(cd->resulttype); + JumbleExpr(jstate, (Node *)cd->arg); + } + break; + case T_CoerceToDomainValue: + { + CoerceToDomainValue *cdv = (CoerceToDomainValue *)node; + + APP_JUMB(cdv->typeId); + } + break; + case T_SetToDefault: + { + SetToDefault *sd = (SetToDefault *)node; + + APP_JUMB(sd->typeId); + } + break; + case T_CurrentOfExpr: + { + CurrentOfExpr *ce = (CurrentOfExpr *)node; + + APP_JUMB(ce->cvarno); + if (ce->cursor_name) + APP_JUMB_STRING(ce->cursor_name); + APP_JUMB(ce->cursor_param); + } + break; + case T_TargetEntry: + { + TargetEntry *tle = (TargetEntry *)node; + + APP_JUMB(tle->resno); + APP_JUMB(tle->ressortgroupref); + JumbleExpr(jstate, (Node *)tle->expr); + } + break; + case T_RangeTblRef: + { + RangeTblRef *rtr = (RangeTblRef *)node; + + APP_JUMB(rtr->rtindex); + } + break; + case T_JoinExpr: + { + JoinExpr *join = (JoinExpr *)node; + + APP_JUMB(join->jointype); + APP_JUMB(join->isNatural); + APP_JUMB(join->rtindex); + JumbleExpr(jstate, join->larg); + JumbleExpr(jstate, join->rarg); + JumbleExpr(jstate, join->quals); + } + break; + case T_FromExpr: + { + FromExpr *from = (FromExpr *)node; + + JumbleExpr(jstate, (Node *)from->fromlist); + JumbleExpr(jstate, from->quals); + } + break; + case T_List: + foreach (temp, (List *)node) + { + JumbleExpr(jstate, (Node *)lfirst(temp)); + } + break; + case T_SortGroupClause: + { + SortGroupClause *sgc = (SortGroupClause *)node; + + APP_JUMB(sgc->tleSortGroupRef); + APP_JUMB(sgc->eqop); + APP_JUMB(sgc->sortop); + APP_JUMB(sgc->nulls_first); + } + break; + case T_WindowClause: + { + WindowClause *wc = (WindowClause *)node; + + APP_JUMB(wc->winref); + APP_JUMB(wc->frameOptions); + JumbleExpr(jstate, (Node *)wc->partitionClause); + JumbleExpr(jstate, (Node *)wc->orderClause); + JumbleExpr(jstate, wc->startOffset); + JumbleExpr(jstate, wc->endOffset); + } + break; + case T_CommonTableExpr: + { + CommonTableExpr *cte = (CommonTableExpr *)node; + + /* we store the string name because RTE_CTE RTEs need it */ + APP_JUMB_STRING(cte->ctename); + JumbleQuery(jstate, (Query *)cte->ctequery); + } + break; + case T_SetOperationStmt: + { + SetOperationStmt *setop = (SetOperationStmt *)node; + + APP_JUMB(setop->op); + APP_JUMB(setop->all); + JumbleExpr(jstate, setop->larg); + JumbleExpr(jstate, setop->rarg); + } + break; + case T_RangeTblFunction: + { + RangeTblFunction *rtfunc = (RangeTblFunction *)node; + + JumbleExpr(jstate, rtfunc->funcexpr); + } + break; + default: + /* Only a warning, since we can stumble along anyway */ + elog(WARNING, "unrecognized node type: %d", + (int)nodeTag(node)); + break; + } +} + +/* + * Record location of constant within query string of query tree + * that is currently being walked. + */ +static void +RecordConstLocation(pgssJumbleState *jstate, int location) +{ + /* -1 indicates unknown or undefined location */ + if (location >= 0) + { + /* enlarge array if needed */ + if (jstate->clocations_count >= jstate->clocations_buf_size) + { + jstate->clocations_buf_size *= 2; + jstate->clocations = (pgssLocationLen *) + repalloc(jstate->clocations, + jstate->clocations_buf_size * + sizeof(pgssLocationLen)); + } + jstate->clocations[jstate->clocations_count].location = location; + /* initialize lengths to -1 to simplify fill_in_constant_lengths */ + jstate->clocations[jstate->clocations_count].length = -1; + jstate->clocations_count++; + } +} + +/* check if token should be replaced by substitute varable */ +static bool +need_replace(int token) +{ + return (token == FCONST) || (token == ICONST) || (token == SCONST) || (token == BCONST) || (token == XCONST); +} + +/* + * gen_normplan - parse execution plan using flex and replace all CONST to + * substitute variables. + */ +static StringInfo +gen_normplan(const char *execution_plan) +{ + core_yyscan_t yyscanner; + core_yy_extra_type yyextra; + core_YYSTYPE yylval; + YYLTYPE yylloc; + int tok; + int bind_prefix = 1; + char *tmp_str; + YYLTYPE last_yylloc = 0; + int last_tok = 0; + StringInfo plan_out = makeStringInfo(); + ; + + yyscanner = scanner_init(execution_plan, + &yyextra, +#if PG_VERSION_NUM >= 120000 + &ScanKeywords, + ScanKeywordTokens +#else + ScanKeywords, + NumScanKeywords +#endif + ); + + for (;;) + { + /* get the next lexem */ + tok = core_yylex(&yylval, &yylloc, yyscanner); + + /* now we store end previsous lexem in yylloc - so could prcess it */ + if (need_replace(last_tok)) + { + /* substitute variable instead of CONST */ + int s_len = asprintf(&tmp_str, "$%i", bind_prefix++); + if (s_len > 0) + { + appendStringInfoString(plan_out, tmp_str); + free(tmp_str); + } + else + { + appendStringInfoString(plan_out, "??"); + } + } + else + { + /* do not change - just copy as-is */ + tmp_str = strndup((char *)execution_plan + last_yylloc, yylloc - last_yylloc); + appendStringInfoString(plan_out, tmp_str); + free(tmp_str); + } + /* check if further parsing not needed */ + if (tok == 0) + break; + last_tok = tok; + last_yylloc = yylloc; + } + + scanner_finish(yyscanner); + + return plan_out; +} + +uint64_t get_plan_id(QueryDesc *queryDesc) +{ + if (!queryDesc->sourceText) + return 0; + StringInfo normalized = gen_normplan(queryDesc->sourceText); + return hash_any((unsigned char *)normalized->data, normalized->len); +} + +/* + * Post-parse-analysis hook: mark query with a queryId + */ +void pgss_post_parse_analyze(ParseState *pstate, Query *query) +{ + pgssJumbleState jstate; + + if (prev_post_parse_analyze_hook) + prev_post_parse_analyze_hook(pstate, query); + + /* Assert we didn't do this already */ + Assert(query->queryId == 0); + + /* + * Utility statements get queryId zero. We do this even in cases where + * the statement contains an optimizable statement for which a queryId + * could be derived (such as EXPLAIN or DECLARE CURSOR). For such cases, + * runtime control will first go through ProcessUtility and then the + * executor, and we don't want the executor hooks to do anything, since we + * are already measuring the statement's costs at the utility level. + */ + if (query->utilityStmt) + { + query->queryId = 0; + return; + } + + /* Set up workspace for query jumbling */ + jstate.jumble = (unsigned char *)palloc(JUMBLE_SIZE); + jstate.jumble_len = 0; + jstate.clocations_buf_size = 32; + jstate.clocations = (pgssLocationLen *) + palloc(jstate.clocations_buf_size * sizeof(pgssLocationLen)); + jstate.clocations_count = 0; + + /* Compute query ID and mark the Query node with it */ + JumbleQuery(&jstate, query); + query->queryId = hash_any(jstate.jumble, jstate.jumble_len); + + /* + * If we are unlucky enough to get a hash of zero, use 1 instead, to + * prevent confusion with the utility-statement case. + */ + if (query->queryId == 0) + query->queryId = 1; +} \ No newline at end of file diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.h b/src/stat_statements_parser/pg_stat_statements_ya_parser.h new file mode 100644 index 00000000000..274f96aebaf --- /dev/null +++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.h @@ -0,0 +1,15 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +extern void stat_statements_parser_init(void); +extern void stat_statements_parser_deinit(void); + +#ifdef __cplusplus +} +#endif + +uint64_t get_plan_id(QueryDesc *queryDesc); \ No newline at end of file diff --git a/src/yagp_hooks_collector.c b/src/yagp_hooks_collector.c new file mode 100644 index 00000000000..69475ea5079 --- /dev/null +++ b/src/yagp_hooks_collector.c @@ -0,0 +1,22 @@ +#include "postgres.h" +#include "cdb/cdbvars.h" +#include "fmgr.h" + +#include "hook_wrappers.h" + +PG_MODULE_MAGIC; + +void _PG_init(void); +void _PG_fini(void); + +void _PG_init(void) { + if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) { + hooks_init(); + } +} + +void _PG_fini(void) { + if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) { + hooks_deinit(); + } +} diff --git a/yagp-hooks-collector.control b/yagp-hooks-collector.control new file mode 100644 index 00000000000..82c189a88fc --- /dev/null +++ b/yagp-hooks-collector.control @@ -0,0 +1,5 @@ +# yagp-hooks-collector extension +comment = 'Intercept query and plan execution hooks and report them to Yandex GPCC agents' +default_version = '1.0' +module_pathname = '$libdir/yagp-hooks-collector' +superuser = true --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
