Hi, I've for a while suspected that the separation & duplication of infrastructure between walsenders and normal backends isn't nice. But looking at the code changes in v10 drove that home quite a bit.
The major changes in this area are 1) d1ecd539477fe640455dc890216a7c1561e047b4 - Add a SHOW command to the replication command language. 2) 7c4f52409a8c7d85ed169bbbc1f6092274d03920 - Logical replication support for initial data copy The first adds SHOW support for the replication protocol. With such oddities that "SHOW hba_file;" works, but "show hba_file;" doesn't. Unless your're connected to a database, when feature 2) kicks in and falls back to the normal SQL parser (as it'd for most other SQL commands). With 2) we can execute normal SQL over a walsender connection. That's useful for logical rep because it needs to able to copy data (via e.g. COPY), read catalogs (SELECT) and stream data (via START_REPLICATION). The problem is that we now end up with fairly similar infrastructure, that's duplicated in walsender and normal backends. We've already seen a couple bugs stem from this: - parallel queries hang when executed over walsender connection - query cancel doesn't work over walsender connection - parser/scanner hack deciding between replication and normal grammer isn't correct - SnapBuildClearExportedSnapshot isn't called reliably anymore - config files aren't reloaded while idle in a walsender connection while all of those are fixable, and several of them already have proposed fixes, I think this is some indication that the current split isn't well thought out. I think in hindsight, the whole idea of introducing a separate protocol/language for the replication protocol was a rather bad one. I think we should give up on the current course, and decide to unify as much as possible. Petr already proposed some fixes for the above (unifying signal and config file handling), but I don't think that solves the more fundamental issue of different grammars. For me it's fairly obvious that we should try to merge the two grammars, and live with the slight uglyness that'll caused by doing so. The primary problem here is that both languages "look" different, primary that the replication protocol uses enforced-all-caps-with-underscores as style. Attached is a very rough POC, that unifies the two grammars. Unsurprisingly that requires the addition of a bunch of additional keywords, but at least they're all unreserved. Both grammars seem to mostly merge well, except for repl_scanner.l's RECPTR which I wasn't able to add to scan.l within a couple minutes (therefore single quotes are now necessary). This really is WIP, and not meant as a patch to be reviewed in detail, but just as something to be discussed. I'm probably going to be tarred and feathered for this position, but I think we need to fix this before v10 is coming out. We're taking on quite some architectural burden here, and I'm pretty sure we'd otherwise have to fix it in v11, so we'll be stuck with some odd-duck v10, that behaves different from all other versions. If so, we'd have to: - gate catalog access more careful in !am_db_walsender connections - find a better way to deal with repl_scan.l's RECPTR - remove replnodes.h (or move at least parts of it) into parsenodes.h - do a lot of other minor cleanup Greetings, Andres Freund
>From 63866c2c8f9c968957dfb0d5b8c5ceca2c4787ed Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Mon, 24 Apr 2017 22:57:49 -0700 Subject: [PATCH] WIP: POC: Prototype: Unify SQL and replication grammars. --- src/backend/commands/event_trigger.c | 8 +- src/backend/parser/analyze.c | 10 + src/backend/parser/gram.y | 396 ++++++++++++++++++-- src/backend/replication/Makefile | 5 +- .../libpqwalreceiver/libpqwalreceiver.c | 2 +- src/backend/replication/logical/snapbuild.c | 5 +- src/backend/replication/repl_gram.y | 408 --------------------- src/backend/replication/repl_scanner.l | 248 ------------- src/backend/replication/walsender.c | 124 ++----- src/backend/tcop/postgres.c | 69 ++-- src/backend/tcop/pquery.c | 2 + src/backend/tcop/utility.c | 46 +++ src/bin/pg_basebackup/pg_recvlogical.c | 2 +- src/bin/pg_basebackup/receivelog.c | 2 +- src/include/nodes/nodes.h | 2 +- src/include/nodes/replnodes.h | 8 + src/include/parser/gramparse.h | 1 + src/include/parser/kwlist.h | 18 + src/include/replication/walsender.h | 5 +- 19 files changed, 538 insertions(+), 823 deletions(-) delete mode 100644 src/backend/replication/repl_gram.y delete mode 100644 src/backend/replication/repl_scanner.l diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index d7c199f314..1e441fbe86 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -295,7 +295,13 @@ check_ddl_tag(const char *tag) pg_strcasecmp(tag, "REVOKE") == 0 || pg_strcasecmp(tag, "DROP OWNED") == 0 || pg_strcasecmp(tag, "IMPORT FOREIGN SCHEMA") == 0 || - pg_strcasecmp(tag, "SECURITY LABEL") == 0) + pg_strcasecmp(tag, "SECURITY LABEL") == 0 || + pg_strcasecmp(tag, "IDENTIFY_SYSTEM") == 0 || + pg_strcasecmp(tag, "BASE_BACKUP") == 0 || + pg_strcasecmp(tag, "CREATE_REPLICATION_SLOT") == 0 || + pg_strcasecmp(tag, "DROP_REPLICATION_SLOT") == 0 || + pg_strcasecmp(tag, "START_REPLICATION") == 0 || + pg_strcasecmp(tag, "TIMELINE_HISTORY") == 0) return EVENT_TRIGGER_COMMAND_TAG_OK; /* diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 567dd54c6c..d969c0a4e0 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -42,6 +42,7 @@ #include "parser/parse_target.h" #include "parser/parsetree.h" #include "rewrite/rewriteManip.h" +#include "replication/walsender.h" #include "utils/rel.h" @@ -104,6 +105,15 @@ parse_analyze(RawStmt *parseTree, const char *sourceText, pstate->p_sourcetext = sourceText; + if (am_walsender && !am_db_walsender && + analyze_requires_snapshot(parseTree)) + { + /* FIXME: message */ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("non-replication statement not supported in pure replication connection"))); + } + if (numParams > 0) parse_fixed_parameters(pstate, paramTypes, numParams); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 89d2836c49..6ede6d6af0 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -48,6 +48,7 @@ #include <ctype.h> #include <limits.h> +#include "access/xlogdefs.h" #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_am.h" @@ -56,9 +57,12 @@ #include "commands/trigger.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "nodes/replnodes.h" #include "parser/gramparse.h" #include "parser/parser.h" #include "parser/parse_expr.h" +#include "replication/walsender.h" +#include "replication/walsender_private.h" #include "storage/lmgr.h" #include "utils/date.h" #include "utils/datetime.h" @@ -241,6 +245,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionSpec *partspec; PartitionRangeDatum *partrange_datum; RoleSpec *rolespec; + XLogRecPtr recptr; + uint32 uintval; } %type <node> stmt schema_stmt @@ -584,6 +590,25 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <partrange_datum> PartitionRangeDatum %type <list> range_datum_list + +/* replication stuff */ +%type <node> replication_command +%type <node> base_backup start_replication start_logical_replication + create_replication_slot drop_replication_slot identify_system + timeline_history +%type <list> base_backup_opt_list +%type <defelt> base_backup_opt +%type <uintval> opt_timeline +%type <list> plugin_options plugin_opt_list +%type <defelt> plugin_opt_elem +%type <node> plugin_opt_arg +%type <str> opt_slot +%type <boolean> opt_temporary +%type <list> create_slot_opt_list +%type <defelt> create_slot_opt +%type <recptr> recptr + + /* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their numeric codes do not depend on @@ -610,7 +635,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); AGGREGATE ALL ALSO ALTER ALWAYS ANALYSE ANALYZE AND ANY ARRAY AS ASC ASSERTION ASSIGNMENT ASYMMETRIC AT ATTACH ATTRIBUTE AUTHORIZATION - BACKWARD BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT + BACKWARD BASE_BACKUP BEFORE BEGIN_P BETWEEN BIGINT BINARY BIT BOOLEAN_P BOTH BY CACHE CALLED CASCADE CASCADED CASE CAST CATALOG_P CHAIN CHAR_P @@ -618,29 +643,29 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); CLUSTER COALESCE COLLATE COLLATION COLUMN COLUMNS COMMENT COMMENTS COMMIT COMMITTED CONCURRENTLY CONFIGURATION CONFLICT CONNECTION CONSTRAINT CONSTRAINTS CONTENT_P CONTINUE_P CONVERSION_P COPY COST CREATE - CROSS CSV CUBE CURRENT_P + CREATE_REPLICATION_SLOT CROSS CSV CUBE CURRENT_P CURRENT_CATALOG CURRENT_DATE CURRENT_ROLE CURRENT_SCHEMA CURRENT_TIME CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE DATA_P DATABASE DAY_P DEALLOCATE DEC DECIMAL_P DECLARE DEFAULT DEFAULTS DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DEPENDS DESC DETACH DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P - DOUBLE_P DROP + DOUBLE_P DROP DROP_REPLICATION_SLOT EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EVENT EXCEPT - EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN + EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXPORT_SNAPSHOT EXTENSION EXTERNAL EXTRACT - FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR + FALSE_P FAMILY FAST_P FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING HANDLER HAVING HEADER_P HOLD HOUR_P - IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P - INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P - INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER + IDENTITY_P IDENTIFY_SYSTEM IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P + IMPORT_P IN_P INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY + INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION JOIN @@ -649,45 +674,46 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LABEL LANGUAGE LARGE_P LAST_P LATERAL_P LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL - LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED + LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LOGICAL_P - MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE + MAPPING MATCH MATERIALIZED MAXVALUE MAX_RATE METHOD MINUTE_P MINVALUE MODE + MONTH_P MOVE - NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NONE + NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NOEXPORT_SNAPSHOT NONE NOREFRESH NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF NULLS_P NUMERIC OBJECT_P OF OFF OFFSET OIDS OLD ON ONLY OPERATOR OPTION OPTIONS OR ORDER ORDINALITY OUT_P OUTER_P OVER OVERLAPS OVERLAY OVERRIDING OWNED OWNER - PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POLICY - POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY - PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM PUBLICATION + PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PHYSICAL PLACING PLANS + POLICY POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY + PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM PROGRESS PUBLICATION QUOTE RANGE READ REAL REASSIGN RECHECK RECURSIVE REF REFERENCES REFERENCING REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA - RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP - ROW ROWS RULE + RESERVE_WAL RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE + ROLLBACK ROLLUP ROW ROWS RULE SAVEPOINT SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW SIMILAR SIMPLE SKIP SLOT SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P - START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P - SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P + START START_REPLICATION STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P + STRIP_P SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P - TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN - TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P - TRUNCATE TRUSTED TYPE_P TYPES_P + TABLE TABLES TABLESAMPLE TABLESPACE TABLESPACE_MAP_P TEMP TEMPLATE TEMPORARY + TEXT_P THEN TIME TIMELINE_P TIMELINE_HISTORY TIMESTAMP TO TRAILING TRANSACTION + TRANSFORM TREAT TRIGGER TRIM TRUE_P TRUNCATE TRUSTED TYPE_P TYPES_P UNBOUNDED UNCOMMITTED UNENCRYPTED UNION UNIQUE UNKNOWN UNLISTEN UNLOGGED - UNTIL UPDATE USER USING + UNTIL UPDATE USE_SNAPSHOT USER USING VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAL_P WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE @@ -942,6 +968,7 @@ stmt : | VariableSetStmt | VariableShowStmt | ViewStmt + | replication_command | /*EMPTY*/ { $$ = NULL; } ; @@ -14487,6 +14514,311 @@ AexprConst: Iconst } ; +/* replication protocol stuff */ + +replication_command: + identify_system + { + ReplicationCmd *rcmd = makeNode(ReplicationCmd); + rcmd->replicationCmd = $1; + $$ = (Node *) rcmd; + } + | base_backup + { + ReplicationCmd *rcmd = makeNode(ReplicationCmd); + rcmd->replicationCmd = $1; + $$ = (Node *) rcmd; + } + | start_replication + { + ReplicationCmd *rcmd = makeNode(ReplicationCmd); + rcmd->replicationCmd = $1; + $$ = (Node *) rcmd; + } + | start_logical_replication + { + ReplicationCmd *rcmd = makeNode(ReplicationCmd); + rcmd->replicationCmd = $1; + $$ = (Node *) rcmd; + } + | create_replication_slot + { + ReplicationCmd *rcmd = makeNode(ReplicationCmd); + rcmd->replicationCmd = $1; + $$ = (Node *) rcmd; + } + | drop_replication_slot + { + ReplicationCmd *rcmd = makeNode(ReplicationCmd); + rcmd->replicationCmd = $1; + $$ = (Node *) rcmd; + } + | timeline_history + { + ReplicationCmd *rcmd = makeNode(ReplicationCmd); + rcmd->replicationCmd = $1; + $$ = (Node *) rcmd; + } +; + + +/* + * IDENTIFY_SYSTEM + */ +identify_system: + IDENTIFY_SYSTEM + { + $$ = (Node *) makeNode(IdentifySystemCmd); + } + ; + +/* + * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] + * [MAX_RATE %d] [TABLESPACE_MAP] + */ +base_backup: + BASE_BACKUP base_backup_opt_list + { + BaseBackupCmd *cmd = makeNode(BaseBackupCmd); + cmd->options = $2; + $$ = (Node *) cmd; + } + ; + +base_backup_opt_list: + base_backup_opt_list base_backup_opt + { $$ = lappend($1, $2); } + | /* EMPTY */ + { $$ = NIL; } + ; + +base_backup_opt: + LABEL SCONST + { + $$ = makeDefElem("label", + (Node *)makeString($2), -1); + } + | PROGRESS + { + $$ = makeDefElem("progress", + (Node *)makeInteger(TRUE), -1); + } + | FAST_P + { + $$ = makeDefElem("fast", + (Node *)makeInteger(TRUE), -1); + } + | WAL_P + { + $$ = makeDefElem("wal", + (Node *)makeInteger(TRUE), -1); + } + | NOWAIT + { + $$ = makeDefElem("nowait", + (Node *)makeInteger(TRUE), -1); + } + | MAX_RATE ICONST + { + $$ = makeDefElem("max_rate", + (Node *)makeInteger($2), -1); + } + | TABLESPACE_MAP_P + { + $$ = makeDefElem("tablespace_map", + (Node *)makeInteger(TRUE), -1); + } + ; + +create_replication_slot: + /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ + CREATE_REPLICATION_SLOT IDENT opt_temporary PHYSICAL create_slot_opt_list + { + CreateReplicationSlotCmd *cmd; + cmd = makeNode(CreateReplicationSlotCmd); + cmd->kind = REPLICATION_KIND_PHYSICAL; + cmd->slotname = $2; + cmd->temporary = $3; + cmd->options = $5; + $$ = (Node *) cmd; + } + /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ + | CREATE_REPLICATION_SLOT IDENT opt_temporary LOGICAL_P IDENT create_slot_opt_list + { + CreateReplicationSlotCmd *cmd; + cmd = makeNode(CreateReplicationSlotCmd); + cmd->kind = REPLICATION_KIND_LOGICAL; + cmd->slotname = $2; + cmd->temporary = $3; + cmd->plugin = $5; + cmd->options = $6; + $$ = (Node *) cmd; + } + ; + +create_slot_opt_list: + create_slot_opt_list create_slot_opt + { $$ = lappend($1, $2); } + | /* EMPTY */ + { $$ = NIL; } + ; + +create_slot_opt: + EXPORT_SNAPSHOT + { + $$ = makeDefElem("export_snapshot", + (Node *)makeInteger(TRUE), -1); + } + | NOEXPORT_SNAPSHOT + { + $$ = makeDefElem("export_snapshot", + (Node *)makeInteger(FALSE), -1); + } + | USE_SNAPSHOT + { + $$ = makeDefElem("use_snapshot", + (Node *)makeInteger(TRUE), -1); + } + | RESERVE_WAL + { + $$ = makeDefElem("reserve_wal", + (Node *)makeInteger(TRUE), -1); + } + ; + +/* DROP_REPLICATION_SLOT slot */ +drop_replication_slot: + DROP_REPLICATION_SLOT IDENT + { + DropReplicationSlotCmd *cmd; + cmd = makeNode(DropReplicationSlotCmd); + cmd->slotname = $2; + $$ = (Node *) cmd; + } + ; + +recptr: + SCONST + { + uint32 hi, + lo; + if (sscanf($1, "%X/%X", &hi, &lo) != 2) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid frakbar"), + parser_errposition(@1))); + $$ = ((uint64) hi) << 32 | lo; + } + +/* + * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] + */ +start_replication: + START_REPLICATION opt_slot opt_physical recptr opt_timeline + { + StartReplicationCmd *cmd; + + cmd = makeNode(StartReplicationCmd); + cmd->kind = REPLICATION_KIND_PHYSICAL; + cmd->slotname = $2; + cmd->startpoint = $4; + cmd->timeline = $5; + $$ = (Node *) cmd; + } + ; + +/* START_REPLICATION SLOT slot LOGICAL %X/%X options */ +start_logical_replication: + START_REPLICATION SLOT IDENT LOGICAL_P recptr plugin_options + { + StartReplicationCmd *cmd; + cmd = makeNode(StartReplicationCmd); + cmd->kind = REPLICATION_KIND_LOGICAL; + cmd->slotname = $3; + cmd->startpoint = $5; + cmd->options = $6; + $$ = (Node *) cmd; + } + ; +/* + * TIMELINE_HISTORY %d + */ +timeline_history: + TIMELINE_HISTORY ICONST + { + TimeLineHistoryCmd *cmd; + + if ($2 <= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + (errmsg("invalid timeline %u", $2)))); + + cmd = makeNode(TimeLineHistoryCmd); + cmd->timeline = $2; + + $$ = (Node *) cmd; + } + ; + +opt_physical: + PHYSICAL + | /* EMPTY */ + ; + +opt_temporary: + TEMPORARY { $$ = true; } + | /* EMPTY */ { $$ = false; } + ; + +opt_slot: + SLOT IDENT + { $$ = $2; } + | /* EMPTY */ + { $$ = NULL; } + ; + +opt_timeline: + TIMELINE_P ICONST + { + if ($2 <= 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + (errmsg("invalid timeline %u", $2)))); + $$ = $2; + } + | /* EMPTY */ { $$ = 0; } + ; + + +plugin_options: + '(' plugin_opt_list ')' { $$ = $2; } + | /* EMPTY */ { $$ = NIL; } + ; + +plugin_opt_list: + plugin_opt_elem + { + $$ = list_make1($1); + } + | plugin_opt_list ',' plugin_opt_elem + { + $$ = lappend($1, $3); + } + ; + +plugin_opt_elem: + IDENT plugin_opt_arg + { + $$ = makeDefElem($1, $2, -1); + } + ; + +plugin_opt_arg: + SCONST { $$ = (Node *) makeString($1); } + | /* EMPTY */ { $$ = NULL; } + ; + + Iconst: ICONST { $$ = $1; }; Sconst: SCONST { $$ = $1; }; @@ -14646,6 +14978,7 @@ unreserved_keyword: | ATTACH | ATTRIBUTE | BACKWARD + | BASE_BACKUP | BEFORE | BEGIN_P | BY @@ -14674,6 +15007,7 @@ unreserved_keyword: | CONVERSION_P | COPY | COST + | CREATE_REPLICATION_SLOT | CSV | CUBE | CURRENT_P @@ -14699,6 +15033,7 @@ unreserved_keyword: | DOMAIN_P | DOUBLE_P | DROP + | DROP_REPLICATION_SLOT | EACH | ENABLE_P | ENCODING @@ -14711,9 +15046,11 @@ unreserved_keyword: | EXCLUSIVE | EXECUTE | EXPLAIN + | EXPORT_SNAPSHOT | EXTENSION | EXTERNAL | FAMILY + | FAST_P | FILTER | FIRST_P | FOLLOWING @@ -14728,6 +15065,7 @@ unreserved_keyword: | HEADER_P | HOLD | HOUR_P + | IDENTIFY_SYSTEM | IDENTITY_P | IF_P | IMMEDIATE @@ -14761,10 +15099,12 @@ unreserved_keyword: | LOCK_P | LOCKED | LOGGED + | LOGICAL_P | MAPPING | MATCH | MATERIALIZED | MAXVALUE + | MAX_RATE | METHOD | MINUTE_P | MINVALUE @@ -14776,6 +15116,7 @@ unreserved_keyword: | NEW | NEXT | NO + | NOEXPORT_SNAPSHOT | NOREFRESH | NOTHING | NOTIFY @@ -14800,6 +15141,7 @@ unreserved_keyword: | PARTITION | PASSING | PASSWORD + | PHYSICAL | PLANS | POLICY | PRECEDING @@ -14811,6 +15153,7 @@ unreserved_keyword: | PROCEDURAL | PROCEDURE | PROGRAM + | PROGRESS | PUBLICATION | QUOTE | RANGE @@ -14828,6 +15171,7 @@ unreserved_keyword: | REPEATABLE | REPLACE | REPLICA + | RESERVE_WAL | RESET | RESTART | RESTRICT @@ -14862,6 +15206,7 @@ unreserved_keyword: | STABLE | STANDALONE_P | START + | START_REPLICATION | STATEMENT | STATISTICS | STDIN @@ -14874,10 +15219,13 @@ unreserved_keyword: | SYSTEM_P | TABLES | TABLESPACE + | TABLESPACE_MAP_P | TEMP | TEMPLATE | TEMPORARY | TEXT_P + | TIMELINE_P + | TIMELINE_HISTORY | TRANSACTION | TRANSFORM | TRIGGER @@ -14893,6 +15241,7 @@ unreserved_keyword: | UNLOGGED | UNTIL | UPDATE + | USE_SNAPSHOT | VACUUM | VALID | VALIDATE @@ -14903,6 +15252,7 @@ unreserved_keyword: | VIEW | VIEWS | VOLATILE + | WAL_P | WHITESPACE_P | WITHIN | WITHOUT diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index da8bcf0471..62f8f3a440 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -15,15 +15,12 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ - repl_gram.o slot.o slotfuncs.o syncrep.o syncrep_gram.o + slot.o slotfuncs.o syncrep.o syncrep_gram.o SUBDIRS = logical include $(top_srcdir)/src/backend/common.mk -# repl_scanner is compiled as part of repl_gram -repl_gram.o: repl_scanner.c - # syncrep_scanner is complied as part of syncrep_gram syncrep_gram.o: syncrep_scanner.c syncrep_scanner.c: FLEXFLAGS = -CF -p -i diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9d7bb25d39..26a53333f8 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -352,7 +352,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn, if (options->logical) appendStringInfo(&cmd, " LOGICAL"); - appendStringInfo(&cmd, " %X/%X", + appendStringInfo(&cmd, " '%X/%X'", (uint32) (options->startpoint >> 32), (uint32) options->startpoint); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 358ec28932..eb261ea02f 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -589,8 +589,7 @@ SnapBuildExportSnapshot(SnapBuild *builder) Snapshot snap; char *snapname; - if (IsTransactionOrTransactionBlock()) - elog(ERROR, "cannot export a snapshot from within a transaction"); + Assert(IsTransactionOrTransactionBlock()); if (SavedResourceOwnerDuringExport) elog(ERROR, "can only export one snapshot at a time"); @@ -598,8 +597,6 @@ SnapBuildExportSnapshot(SnapBuild *builder) SavedResourceOwnerDuringExport = CurrentResourceOwner; ExportInProgress = true; - StartTransactionCommand(); - /* There doesn't seem to a nice API to set these */ XactIsoLevel = XACT_REPEATABLE_READ; XactReadOnly = true; diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y deleted file mode 100644 index ec047c827c..0000000000 --- a/src/backend/replication/repl_gram.y +++ /dev/null @@ -1,408 +0,0 @@ -%{ -/*------------------------------------------------------------------------- - * - * repl_gram.y - Parser for the replication commands - * - * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * - * IDENTIFICATION - * src/backend/replication/repl_gram.y - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include "access/xlogdefs.h" -#include "nodes/makefuncs.h" -#include "nodes/replnodes.h" -#include "replication/walsender.h" -#include "replication/walsender_private.h" - - -/* Result of the parsing is returned here */ -Node *replication_parse_result; - -static SQLCmd *make_sqlcmd(void); - - -/* - * Bison doesn't allocate anything that needs to live across parser calls, - * so we can easily have it use palloc instead of malloc. This prevents - * memory leaks if we error out during parsing. Note this only works with - * bison >= 2.0. However, in bison 1.875 the default is to use alloca() - * if possible, so there's not really much problem anyhow, at least if - * you're building with gcc. - */ -#define YYMALLOC palloc -#define YYFREE pfree - -%} - -%expect 0 -%name-prefix="replication_yy" - -%union { - char *str; - bool boolval; - uint32 uintval; - - XLogRecPtr recptr; - Node *node; - List *list; - DefElem *defelt; -} - -/* Non-keyword tokens */ -%token <str> SCONST IDENT -%token <uintval> UCONST -%token <recptr> RECPTR -%token T_WORD - -/* Keyword tokens. */ -%token K_BASE_BACKUP -%token K_IDENTIFY_SYSTEM -%token K_SHOW -%token K_START_REPLICATION -%token K_CREATE_REPLICATION_SLOT -%token K_DROP_REPLICATION_SLOT -%token K_TIMELINE_HISTORY -%token K_LABEL -%token K_PROGRESS -%token K_FAST -%token K_NOWAIT -%token K_MAX_RATE -%token K_WAL -%token K_TABLESPACE_MAP -%token K_TIMELINE -%token K_PHYSICAL -%token K_LOGICAL -%token K_SLOT -%token K_RESERVE_WAL -%token K_TEMPORARY -%token K_EXPORT_SNAPSHOT -%token K_NOEXPORT_SNAPSHOT -%token K_USE_SNAPSHOT - -%type <node> command -%type <node> base_backup start_replication start_logical_replication - create_replication_slot drop_replication_slot identify_system - timeline_history show sql_cmd -%type <list> base_backup_opt_list -%type <defelt> base_backup_opt -%type <uintval> opt_timeline -%type <list> plugin_options plugin_opt_list -%type <defelt> plugin_opt_elem -%type <node> plugin_opt_arg -%type <str> opt_slot var_name -%type <boolval> opt_temporary -%type <list> create_slot_opt_list -%type <defelt> create_slot_opt - -%% - -firstcmd: command opt_semicolon - { - replication_parse_result = $1; - } - ; - -opt_semicolon: ';' - | /* EMPTY */ - ; - -command: - identify_system - | base_backup - | start_replication - | start_logical_replication - | create_replication_slot - | drop_replication_slot - | timeline_history - | show - | sql_cmd - ; - -/* - * IDENTIFY_SYSTEM - */ -identify_system: - K_IDENTIFY_SYSTEM - { - $$ = (Node *) makeNode(IdentifySystemCmd); - } - ; - -/* - * SHOW setting - */ -show: - K_SHOW var_name - { - VariableShowStmt *n = makeNode(VariableShowStmt); - n->name = $2; - $$ = (Node *) n; - } - -var_name: IDENT { $$ = $1; } - | var_name '.' IDENT - { $$ = psprintf("%s.%s", $1, $3); } - ; - -/* - * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] - * [MAX_RATE %d] [TABLESPACE_MAP] - */ -base_backup: - K_BASE_BACKUP base_backup_opt_list - { - BaseBackupCmd *cmd = makeNode(BaseBackupCmd); - cmd->options = $2; - $$ = (Node *) cmd; - } - ; - -base_backup_opt_list: - base_backup_opt_list base_backup_opt - { $$ = lappend($1, $2); } - | /* EMPTY */ - { $$ = NIL; } - ; - -base_backup_opt: - K_LABEL SCONST - { - $$ = makeDefElem("label", - (Node *)makeString($2), -1); - } - | K_PROGRESS - { - $$ = makeDefElem("progress", - (Node *)makeInteger(TRUE), -1); - } - | K_FAST - { - $$ = makeDefElem("fast", - (Node *)makeInteger(TRUE), -1); - } - | K_WAL - { - $$ = makeDefElem("wal", - (Node *)makeInteger(TRUE), -1); - } - | K_NOWAIT - { - $$ = makeDefElem("nowait", - (Node *)makeInteger(TRUE), -1); - } - | K_MAX_RATE UCONST - { - $$ = makeDefElem("max_rate", - (Node *)makeInteger($2), -1); - } - | K_TABLESPACE_MAP - { - $$ = makeDefElem("tablespace_map", - (Node *)makeInteger(TRUE), -1); - } - ; - -create_replication_slot: - /* CREATE_REPLICATION_SLOT slot TEMPORARY PHYSICAL RESERVE_WAL */ - K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_PHYSICAL create_slot_opt_list - { - CreateReplicationSlotCmd *cmd; - cmd = makeNode(CreateReplicationSlotCmd); - cmd->kind = REPLICATION_KIND_PHYSICAL; - cmd->slotname = $2; - cmd->temporary = $3; - cmd->options = $5; - $$ = (Node *) cmd; - } - /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list - { - CreateReplicationSlotCmd *cmd; - cmd = makeNode(CreateReplicationSlotCmd); - cmd->kind = REPLICATION_KIND_LOGICAL; - cmd->slotname = $2; - cmd->temporary = $3; - cmd->plugin = $5; - cmd->options = $6; - $$ = (Node *) cmd; - } - ; - -create_slot_opt_list: - create_slot_opt_list create_slot_opt - { $$ = lappend($1, $2); } - | /* EMPTY */ - { $$ = NIL; } - ; - -create_slot_opt: - K_EXPORT_SNAPSHOT - { - $$ = makeDefElem("export_snapshot", - (Node *)makeInteger(TRUE), -1); - } - | K_NOEXPORT_SNAPSHOT - { - $$ = makeDefElem("export_snapshot", - (Node *)makeInteger(FALSE), -1); - } - | K_USE_SNAPSHOT - { - $$ = makeDefElem("use_snapshot", - (Node *)makeInteger(TRUE), -1); - } - | K_RESERVE_WAL - { - $$ = makeDefElem("reserve_wal", - (Node *)makeInteger(TRUE), -1); - } - ; - -/* DROP_REPLICATION_SLOT slot */ -drop_replication_slot: - K_DROP_REPLICATION_SLOT IDENT - { - DropReplicationSlotCmd *cmd; - cmd = makeNode(DropReplicationSlotCmd); - cmd->slotname = $2; - $$ = (Node *) cmd; - } - ; - -/* - * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] - */ -start_replication: - K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline - { - StartReplicationCmd *cmd; - - cmd = makeNode(StartReplicationCmd); - cmd->kind = REPLICATION_KIND_PHYSICAL; - cmd->slotname = $2; - cmd->startpoint = $4; - cmd->timeline = $5; - $$ = (Node *) cmd; - } - ; - -/* START_REPLICATION SLOT slot LOGICAL %X/%X options */ -start_logical_replication: - K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options - { - StartReplicationCmd *cmd; - cmd = makeNode(StartReplicationCmd); - cmd->kind = REPLICATION_KIND_LOGICAL; - cmd->slotname = $3; - cmd->startpoint = $5; - cmd->options = $6; - $$ = (Node *) cmd; - } - ; -/* - * TIMELINE_HISTORY %d - */ -timeline_history: - K_TIMELINE_HISTORY UCONST - { - TimeLineHistoryCmd *cmd; - - if ($2 <= 0) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - (errmsg("invalid timeline %u", $2)))); - - cmd = makeNode(TimeLineHistoryCmd); - cmd->timeline = $2; - - $$ = (Node *) cmd; - } - ; - -opt_physical: - K_PHYSICAL - | /* EMPTY */ - ; - -opt_temporary: - K_TEMPORARY { $$ = true; } - | /* EMPTY */ { $$ = false; } - ; - -opt_slot: - K_SLOT IDENT - { $$ = $2; } - | /* EMPTY */ - { $$ = NULL; } - ; - -opt_timeline: - K_TIMELINE UCONST - { - if ($2 <= 0) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - (errmsg("invalid timeline %u", $2)))); - $$ = $2; - } - | /* EMPTY */ { $$ = 0; } - ; - - -plugin_options: - '(' plugin_opt_list ')' { $$ = $2; } - | /* EMPTY */ { $$ = NIL; } - ; - -plugin_opt_list: - plugin_opt_elem - { - $$ = list_make1($1); - } - | plugin_opt_list ',' plugin_opt_elem - { - $$ = lappend($1, $3); - } - ; - -plugin_opt_elem: - IDENT plugin_opt_arg - { - $$ = makeDefElem($1, $2, -1); - } - ; - -plugin_opt_arg: - SCONST { $$ = (Node *) makeString($1); } - | /* EMPTY */ { $$ = NULL; } - ; - -sql_cmd: - IDENT { $$ = (Node *) make_sqlcmd(); } - ; -%% - -static SQLCmd * -make_sqlcmd(void) -{ - SQLCmd *cmd = makeNode(SQLCmd); - int tok; - - /* Just move lexer to the end of command. */ - for (;;) - { - tok = yylex(); - if (tok == ';' || tok == 0) - break; - } - return cmd; -} - -#include "repl_scanner.c" diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l deleted file mode 100644 index 52ae7b343f..0000000000 --- a/src/backend/replication/repl_scanner.l +++ /dev/null @@ -1,248 +0,0 @@ -%{ -/*------------------------------------------------------------------------- - * - * repl_scanner.l - * a lexical scanner for the replication commands - * - * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * - * IDENTIFICATION - * src/backend/replication/repl_scanner.l - * - *------------------------------------------------------------------------- - */ -#include "postgres.h" - -#include "utils/builtins.h" -#include "parser/scansup.h" - -/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */ -#undef fprintf -#define fprintf(file, fmt, msg) fprintf_to_ereport(fmt, msg) - -static void -fprintf_to_ereport(const char *fmt, const char *msg) -{ - ereport(ERROR, (errmsg_internal("%s", msg))); -} - -/* Handle to the buffer that the lexer uses internally */ -static YY_BUFFER_STATE scanbufhandle; - -static StringInfoData litbuf; - -static void startlit(void); -static char *litbufdup(void); -static void addlit(char *ytext, int yleng); -static void addlitchar(unsigned char ychar); - -%} - -%option 8bit -%option never-interactive -%option nodefault -%option noinput -%option nounput -%option noyywrap -%option warn -%option prefix="replication_yy" - -%x xq xd - -/* Extended quote - * xqdouble implements embedded quote, '''' - */ -xqstart {quote} -xqdouble {quote}{quote} -xqinside [^']+ - -/* Double quote - * Allows embedded spaces and other special characters into identifiers. - */ -dquote \" -xdstart {dquote} -xdstop {dquote} -xddouble {dquote}{dquote} -xdinside [^"]+ - -digit [0-9]+ -hexdigit [0-9A-Za-z]+ - -quote ' -quotestop {quote} - -ident_start [A-Za-z\200-\377_] -ident_cont [A-Za-z\200-\377_0-9\$] - -identifier {ident_start}{ident_cont}* - -%% - -BASE_BACKUP { return K_BASE_BACKUP; } -FAST { return K_FAST; } -IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } -SHOW { return K_SHOW; } -LABEL { return K_LABEL; } -NOWAIT { return K_NOWAIT; } -PROGRESS { return K_PROGRESS; } -MAX_RATE { return K_MAX_RATE; } -WAL { return K_WAL; } -TABLESPACE_MAP { return K_TABLESPACE_MAP; } -TIMELINE { return K_TIMELINE; } -START_REPLICATION { return K_START_REPLICATION; } -CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } -DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } -TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } -PHYSICAL { return K_PHYSICAL; } -RESERVE_WAL { return K_RESERVE_WAL; } -LOGICAL { return K_LOGICAL; } -SLOT { return K_SLOT; } -TEMPORARY { return K_TEMPORARY; } -EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } -NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } -USE_SNAPSHOT { return K_USE_SNAPSHOT; } - -"," { return ','; } -";" { return ';'; } -"(" { return '('; } -")" { return ')'; } - -[\n] ; -[\t] ; -" " ; - -{digit}+ { - yylval.uintval = strtoul(yytext, NULL, 10); - return UCONST; - } - -{hexdigit}+\/{hexdigit}+ { - uint32 hi, - lo; - if (sscanf(yytext, "%X/%X", &hi, &lo) != 2) - yyerror("invalid streaming start location"); - yylval.recptr = ((uint64) hi) << 32 | lo; - return RECPTR; - } - -{xqstart} { - BEGIN(xq); - startlit(); - } - -<xq>{quotestop} { - yyless(1); - BEGIN(INITIAL); - yylval.str = litbufdup(); - return SCONST; - } - -<xq>{xqdouble} { - addlitchar('\''); - } - -<xq>{xqinside} { - addlit(yytext, yyleng); - } - -{xdstart} { - BEGIN(xd); - startlit(); - } - -<xd>{xdstop} { - int len; - yyless(1); - BEGIN(INITIAL); - yylval.str = litbufdup(); - len = strlen(yylval.str); - truncate_identifier(yylval.str, len, true); - return IDENT; - } - -<xd>{xdinside} { - addlit(yytext, yyleng); - } - -{identifier} { - int len = strlen(yytext); - - yylval.str = downcase_truncate_identifier(yytext, len, true); - return IDENT; - } - -<xq,xd><<EOF>> { yyerror("unterminated quoted string"); } - - -<<EOF>> { - yyterminate(); - } - -. { - return T_WORD; - } -%% - - -static void -startlit(void) -{ - initStringInfo(&litbuf); -} - -static char * -litbufdup(void) -{ - return litbuf.data; -} - -static void -addlit(char *ytext, int yleng) -{ - appendBinaryStringInfo(&litbuf, ytext, yleng); -} - -static void -addlitchar(unsigned char ychar) -{ - appendStringInfoChar(&litbuf, ychar); -} - -void -yyerror(const char *message) -{ - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg_internal("%s", message))); -} - - -void -replication_scanner_init(const char *str) -{ - Size slen = strlen(str); - char *scanbuf; - - /* - * Might be left over after ereport() - */ - if (YY_CURRENT_BUFFER) - yy_delete_buffer(YY_CURRENT_BUFFER); - - /* - * Make a scan buffer with special termination needed by flex. - */ - scanbuf = (char *) palloc(slen + 2); - memcpy(scanbuf, str, slen); - scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR; - scanbufhandle = yy_scan_buffer(scanbuf, slen + 2); -} - -void -replication_scanner_finish(void) -{ - yy_delete_buffer(scanbufhandle); - scanbufhandle = NULL; -} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 064cf5ee28..e3284363e0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -360,18 +360,7 @@ IdentifySystem(void) snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); if (MyDatabaseId != InvalidOid) - { - MemoryContext cur = CurrentMemoryContext; - - /* syscache access needs a transaction env. */ - StartTransactionCommand(); - /* make dbname live outside TX context */ - MemoryContextSwitchTo(cur); dbname = get_database_name(MyDatabaseId); - CommitTransactionCommand(); - /* CommitTransactionCommand switches to TopMemoryContext */ - MemoryContextSwitchTo(cur); - } dest = CreateDestReceiver(DestRemoteSimple); MemSet(nulls, false, sizeof(nulls)); @@ -870,6 +859,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL); } + if (cmd->kind == REPLICATION_KIND_LOGICAL) { LogicalDecodingContext *ctx; @@ -908,6 +898,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) "must not be called in a subtransaction"))); } + /* XXX: document */ + CommitTransactionCommand(); + ctx = CreateInitDecodingContext(cmd->plugin, NIL, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData); @@ -924,6 +917,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); + /* match*/ + StartTransactionCommand(); + /* * Export or use the snapshot if we've been asked to do so. * @@ -940,8 +936,10 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) snap = SnapBuildInitialSnapshot(ctx->snapshot_builder); RestoreTransactionSnapshot(snap, MyProc); + } + /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -1354,65 +1352,37 @@ WalSndWaitForWal(XLogRecPtr loc) return RecentFlushPtr; } -/* - * Execute an incoming replication command. - * - * Returns true if the cmd_string was recognized as WalSender command, false - * if not. - */ bool -exec_replication_command(const char *cmd_string) +exec_replication_command(ReplicationCmd *cmd, const char *query_string, bool is_toplevel) { - int parse_rc; - Node *cmd_node; - MemoryContext cmd_context; - MemoryContext old_context; - - /* - * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next - * command arrives. Clean up the old stuff if there's anything. - */ - SnapBuildClearExportedSnapshot(); - - CHECK_FOR_INTERRUPTS(); - - cmd_context = AllocSetContextCreate(CurrentMemoryContext, - "Replication command context", - ALLOCSET_DEFAULT_SIZES); - old_context = MemoryContextSwitchTo(cmd_context); - - replication_scanner_init(cmd_string); - parse_rc = replication_yyparse(); - if (parse_rc != 0) + if (!am_walsender) + { ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - (errmsg_internal("replication command parser returned %d", - parse_rc)))); - - cmd_node = replication_parse_result; + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("replication protocol statement not supported in a normal connection"))); + } /* * Log replication command if log_replication_commands is enabled. Even * when it's disabled, log the command with DEBUG1 level for backward - * compatibility. Note that SQL commands are not logged here, and will be - * logged later if log_statement is enabled. + * compatibility. */ - if (cmd_node->type != T_SQLCmd) - ereport(log_replication_commands ? LOG : DEBUG1, - (errmsg("received replication command: %s", cmd_string))); + ereport(log_replication_commands ? LOG : DEBUG1, + (errmsg("received replication command: %s", query_string))); /* * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was * called outside of transaction the snapshot should be cleared here. + * + * FIXME: this is completely borked. */ if (!IsTransactionBlock()) SnapBuildClearExportedSnapshot(); /* - * For aborted transactions, don't allow anything except pure SQL, - * the exec_simple_query() will handle it correctly. + * Nothing to do here in an aborted transaction. */ - if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd)) + if (IsAbortedTransactionBlockState()) ereport(ERROR, (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), errmsg("current transaction is aborted, " @@ -1428,7 +1398,7 @@ exec_replication_command(const char *cmd_string) initStringInfo(&reply_message); initStringInfo(&tmpbuf); - switch (cmd_node->type) + switch (nodeTag(cmd->replicationCmd)) { case T_IdentifySystemCmd: IdentifySystem(); @@ -1436,64 +1406,46 @@ exec_replication_command(const char *cmd_string) case T_BaseBackupCmd: PreventTransactionChain(true, "BASE_BACKUP"); - SendBaseBackup((BaseBackupCmd *) cmd_node); + CommitTransactionCommand(); + SendBaseBackup((BaseBackupCmd *) cmd->replicationCmd); + StartTransactionCommand(); break; case T_CreateReplicationSlotCmd: - CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); + CreateReplicationSlot((CreateReplicationSlotCmd *) cmd->replicationCmd); break; case T_DropReplicationSlotCmd: - DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); + DropReplicationSlot((DropReplicationSlotCmd *) cmd->replicationCmd); break; case T_StartReplicationCmd: { - StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; + StartReplicationCmd *scmd = (StartReplicationCmd *) cmd->replicationCmd; PreventTransactionChain(true, "START_REPLICATION"); - if (cmd->kind == REPLICATION_KIND_PHYSICAL) - StartReplication(cmd); + CommitTransactionCommand(); + + if (scmd->kind == REPLICATION_KIND_PHYSICAL) + StartReplication(scmd); else - StartLogicalReplication(cmd); + StartLogicalReplication(scmd); + + StartTransactionCommand(); break; } case T_TimeLineHistoryCmd: PreventTransactionChain(true, "TIMELINE_HISTORY"); - SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); + SendTimeLineHistory((TimeLineHistoryCmd *) cmd->replicationCmd); break; - case T_VariableShowStmt: - { - DestReceiver *dest = CreateDestReceiver(DestRemoteSimple); - VariableShowStmt *n = (VariableShowStmt *) cmd_node; - - GetPGVariable(n->name, dest); - } - break; - - case T_SQLCmd: - if (MyDatabaseId == InvalidOid) - ereport(ERROR, - (errmsg("not connected to database"))); - - /* Tell the caller that this wasn't a WalSender command. */ - return false; - default: elog(ERROR, "unrecognized replication command node tag: %u", - cmd_node->type); + cmd->replicationCmd->type); } - /* done */ - MemoryContextSwitchTo(old_context); - MemoryContextDelete(cmd_context); - - /* Send CommandComplete message */ - EndCommand("SELECT", DestRemote); - return true; } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 75c2d9a61d..485e61f709 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -173,7 +173,6 @@ static int InteractiveBackend(StringInfo inBuf); static int interactive_getc(void); static int SocketBackend(StringInfo inBuf); static int ReadCommand(StringInfo inBuf); -static void forbidden_in_wal_sender(char firstchar); static List *pg_rewrite_query(Query *query); static bool check_log_statement(List *stmt_list); static int errdetail_execute(List *raw_parsetree_list); @@ -1015,6 +1014,14 @@ exec_simple_query(const char *query_string) */ if (analyze_requires_snapshot(parsetree)) { + if (am_walsender && !am_db_walsender) + { + /* FIXME: message */ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("non-replication statement not supported in pure replication connection"))); + } + PushActiveSnapshot(GetTransactionSnapshot()); snapshot_set = true; } @@ -1325,6 +1332,14 @@ exec_parse_message(const char *query_string, /* string to execute */ */ if (analyze_requires_snapshot(raw_parse_tree)) { + if (am_walsender && !am_db_walsender) + { + /* FIXME: message */ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("non-replication statement not supported in pure replication connection"))); + } + PushActiveSnapshot(GetTransactionSnapshot()); snapshot_set = true; } @@ -1610,6 +1625,14 @@ exec_bind_message(StringInfo input_message) (psrc->raw_parse_tree && analyze_requires_snapshot(psrc->raw_parse_tree))) { + if (am_walsender && !am_db_walsender) + { + /* FIXME: message */ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("non-replication statement not supported in pure replication connection"))); + } + PushActiveSnapshot(GetTransactionSnapshot()); snapshot_set = true; } @@ -4066,13 +4089,7 @@ PostgresMain(int argc, char *argv[], query_string = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); - if (am_walsender) - { - if (!exec_replication_command(query_string)) - exec_simple_query(query_string); - } - else - exec_simple_query(query_string); + exec_simple_query(query_string); send_ready_for_query = true; } @@ -4085,8 +4102,6 @@ PostgresMain(int argc, char *argv[], int numParams; Oid *paramTypes = NULL; - forbidden_in_wal_sender(firstchar); - /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4109,8 +4124,6 @@ PostgresMain(int argc, char *argv[], break; case 'B': /* bind */ - forbidden_in_wal_sender(firstchar); - /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4126,8 +4139,6 @@ PostgresMain(int argc, char *argv[], const char *portal_name; int max_rows; - forbidden_in_wal_sender(firstchar); - /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4140,8 +4151,6 @@ PostgresMain(int argc, char *argv[], break; case 'F': /* fastpath function call */ - forbidden_in_wal_sender(firstchar); - /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4177,8 +4186,6 @@ PostgresMain(int argc, char *argv[], int close_type; const char *close_target; - forbidden_in_wal_sender(firstchar); - close_type = pq_getmsgbyte(&input_message); close_target = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); @@ -4221,8 +4228,6 @@ PostgresMain(int argc, char *argv[], int describe_type; const char *describe_target; - forbidden_in_wal_sender(firstchar); - /* Set statement_timestamp() (needed for xact) */ SetCurrentStatementStartTimestamp(); @@ -4305,30 +4310,6 @@ PostgresMain(int argc, char *argv[], } /* - * Throw an error if we're a WAL sender process. - * - * This is used to forbid anything else than simple query protocol messages - * in a WAL sender process. 'firstchar' specifies what kind of a forbidden - * message was received, and is used to construct the error message. - */ -static void -forbidden_in_wal_sender(char firstchar) -{ - if (am_walsender) - { - if (firstchar == 'F') - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("fastpath function calls not supported in a replication connection"))); - else - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("extended query protocol not supported in a replication connection"))); - } -} - - -/* * Obtain platform stack depth limit (in bytes) * * Return -1 if unknown diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index e30aeb1c7f..450ee08461 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -1155,6 +1155,8 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, IsA(utilityStmt, VariableSetStmt) || IsA(utilityStmt, VariableShowStmt) || IsA(utilityStmt, ConstraintsSetStmt) || + /* replication commands shouldn't run w/ snapshot */ + IsA(utilityStmt, ReplicationCmd) || /* efficiency hacks from here down */ IsA(utilityStmt, FetchStmt) || IsA(utilityStmt, ListenStmt) || diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 24e5c427c6..a313fde8d9 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -57,10 +57,12 @@ #include "commands/vacuum.h" #include "commands/view.h" #include "miscadmin.h" +#include "nodes/replnodes.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteRemove.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "tcop/pquery.h" #include "tcop/utility.h" @@ -923,6 +925,11 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; } + case T_ReplicationCmd: + exec_replication_command((ReplicationCmd *) parsetree, queryString, isTopLevel); + strcpy(completionTag, CreateCommandTag(parsetree)); + break; + default: /* All other statement types have event trigger support */ ProcessUtilitySlow(pstate, pstmt, queryString, @@ -2843,6 +2850,40 @@ CreateCommandTag(Node *parsetree) } break; + case T_ReplicationCmd: + switch (nodeTag(((ReplicationCmd *) parsetree)->replicationCmd)) + { + case T_IdentifySystemCmd: + tag = "IDENTIFY_SYSTEM"; + break; + + case T_BaseBackupCmd: + tag = "BASE_BACKUP"; + break; + + case T_CreateReplicationSlotCmd: + tag = "CREATE_REPLICATION_SLOT"; + break; + + case T_DropReplicationSlotCmd: + tag = "DROP_REPLICATION_SLOT"; + break; + + case T_StartReplicationCmd: + tag = "START_REPLICATION"; + break; + + case T_TimeLineHistoryCmd: + tag = "TIMELINE_HISTORY"; + break; + default: + elog(WARNING, "unrecognized replication command: %d", + (int) nodeTag(parsetree)); + tag = "???"; + break; + } + break; + default: elog(WARNING, "unrecognized node type: %d", (int) nodeTag(parsetree)); @@ -3349,6 +3390,11 @@ GetCommandLogLevel(Node *parsetree) } break; + case T_ReplicationCmd: + /* FIXME: Invent new category? */ + lev = LOGSTMT_DDL; + break; + default: elog(WARNING, "unrecognized node type: %d", (int) nodeTag(parsetree)); diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 6b081bd737..2950c7b338 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -240,7 +240,7 @@ StreamLogicalLog(void) replication_slot); /* Initiate the replication stream at specified location */ - appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X", + appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL '%X/%X'", replication_slot, (uint32) (startpos >> 32), (uint32) startpos); /* print options if there are any */ diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 8511e57cf7..5a11bb8c4f 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -583,7 +583,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) return true; /* Initiate the replication stream at specified location */ - snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", + snprintf(query, sizeof(query), "START_REPLICATION %s'%X/%X' TIMELINE %u", slotcmd, (uint32) (stream->startpos >> 32), (uint32) stream->startpos, stream->timeline); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index f59d719923..107b36fd6e 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -472,13 +472,13 @@ typedef enum NodeTag /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) */ + T_ReplicationCmd, /* container for all other replication commands */ T_IdentifySystemCmd, T_BaseBackupCmd, T_CreateReplicationSlotCmd, T_DropReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, - T_SQLCmd, /* * TAGS FOR RANDOM OTHER STUFF diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 92ada41b6d..b4afab85da 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -17,6 +17,14 @@ #include "access/xlogdefs.h" #include "nodes/pg_list.h" + +typedef struct ReplicationCmd +{ + NodeTag type; + + Node *replicationCmd; +} ReplicationCmd; + typedef enum ReplicationKind { REPLICATION_KIND_PHYSICAL, diff --git a/src/include/parser/gramparse.h b/src/include/parser/gramparse.h index 2da98f67f3..0a3edc79e2 100644 --- a/src/include/parser/gramparse.h +++ b/src/include/parser/gramparse.h @@ -20,6 +20,7 @@ #define GRAMPARSE_H #include "nodes/parsenodes.h" +#include "access/xlogdefs.h" #include "parser/scanner.h" /* diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 37542aaee4..7130803e21 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -53,6 +53,7 @@ PG_KEYWORD("attach", ATTACH, UNRESERVED_KEYWORD) PG_KEYWORD("attribute", ATTRIBUTE, UNRESERVED_KEYWORD) PG_KEYWORD("authorization", AUTHORIZATION, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("backward", BACKWARD, UNRESERVED_KEYWORD) +PG_KEYWORD("base_backup", BASE_BACKUP, UNRESERVED_KEYWORD) PG_KEYWORD("before", BEFORE, UNRESERVED_KEYWORD) PG_KEYWORD("begin", BEGIN_P, UNRESERVED_KEYWORD) PG_KEYWORD("between", BETWEEN, COL_NAME_KEYWORD) @@ -99,6 +100,7 @@ PG_KEYWORD("conversion", CONVERSION_P, UNRESERVED_KEYWORD) PG_KEYWORD("copy", COPY, UNRESERVED_KEYWORD) PG_KEYWORD("cost", COST, UNRESERVED_KEYWORD) PG_KEYWORD("create", CREATE, RESERVED_KEYWORD) +PG_KEYWORD("create_replication_slot", CREATE_REPLICATION_SLOT, UNRESERVED_KEYWORD) PG_KEYWORD("cross", CROSS, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("csv", CSV, UNRESERVED_KEYWORD) PG_KEYWORD("cube", CUBE, UNRESERVED_KEYWORD) @@ -139,6 +141,7 @@ PG_KEYWORD("document", DOCUMENT_P, UNRESERVED_KEYWORD) PG_KEYWORD("domain", DOMAIN_P, UNRESERVED_KEYWORD) PG_KEYWORD("double", DOUBLE_P, UNRESERVED_KEYWORD) PG_KEYWORD("drop", DROP, UNRESERVED_KEYWORD) +PG_KEYWORD("drop_replication_slot", DROP_REPLICATION_SLOT, UNRESERVED_KEYWORD) PG_KEYWORD("each", EACH, UNRESERVED_KEYWORD) PG_KEYWORD("else", ELSE, RESERVED_KEYWORD) PG_KEYWORD("enable", ENABLE_P, UNRESERVED_KEYWORD) @@ -155,11 +158,13 @@ PG_KEYWORD("exclusive", EXCLUSIVE, UNRESERVED_KEYWORD) PG_KEYWORD("execute", EXECUTE, UNRESERVED_KEYWORD) PG_KEYWORD("exists", EXISTS, COL_NAME_KEYWORD) PG_KEYWORD("explain", EXPLAIN, UNRESERVED_KEYWORD) +PG_KEYWORD("export_snapshot", EXPORT_SNAPSHOT, UNRESERVED_KEYWORD) PG_KEYWORD("extension", EXTENSION, UNRESERVED_KEYWORD) PG_KEYWORD("external", EXTERNAL, UNRESERVED_KEYWORD) PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD) PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD) PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD) +PG_KEYWORD("fast", FAST_P, UNRESERVED_KEYWORD) PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD) PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD) PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD) @@ -186,6 +191,7 @@ PG_KEYWORD("having", HAVING, RESERVED_KEYWORD) PG_KEYWORD("header", HEADER_P, UNRESERVED_KEYWORD) PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD) PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD) +PG_KEYWORD("identify_system", IDENTIFY_SYSTEM, UNRESERVED_KEYWORD) PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD) PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD) PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD) @@ -240,9 +246,11 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD) PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD) PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD) +PG_KEYWORD("logical", LOGICAL_P, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD) PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD) +PG_KEYWORD("max_rate", MAX_RATE, UNRESERVED_KEYWORD) PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD) PG_KEYWORD("method", METHOD, UNRESERVED_KEYWORD) PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD) @@ -258,6 +266,7 @@ PG_KEYWORD("nchar", NCHAR, COL_NAME_KEYWORD) PG_KEYWORD("new", NEW, UNRESERVED_KEYWORD) PG_KEYWORD("next", NEXT, UNRESERVED_KEYWORD) PG_KEYWORD("no", NO, UNRESERVED_KEYWORD) +PG_KEYWORD("noexport_snapshot", NOEXPORT_SNAPSHOT, UNRESERVED_KEYWORD) PG_KEYWORD("none", NONE, COL_NAME_KEYWORD) PG_KEYWORD("norefresh", NOREFRESH, UNRESERVED_KEYWORD) PG_KEYWORD("not", NOT, RESERVED_KEYWORD) @@ -297,6 +306,7 @@ PG_KEYWORD("partial", PARTIAL, UNRESERVED_KEYWORD) PG_KEYWORD("partition", PARTITION, UNRESERVED_KEYWORD) PG_KEYWORD("passing", PASSING, UNRESERVED_KEYWORD) PG_KEYWORD("password", PASSWORD, UNRESERVED_KEYWORD) +PG_KEYWORD("physical", PHYSICAL, UNRESERVED_KEYWORD) PG_KEYWORD("placing", PLACING, RESERVED_KEYWORD) PG_KEYWORD("plans", PLANS, UNRESERVED_KEYWORD) PG_KEYWORD("policy", POLICY, UNRESERVED_KEYWORD) @@ -312,6 +322,7 @@ PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD) PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD) PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD) PG_KEYWORD("program", PROGRAM, UNRESERVED_KEYWORD) +PG_KEYWORD("progress", PROGRESS, UNRESERVED_KEYWORD) PG_KEYWORD("publication", PUBLICATION, UNRESERVED_KEYWORD) PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD) PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD) @@ -331,6 +342,7 @@ PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD) PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD) PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD) PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD) +PG_KEYWORD("reserve_wal", RESERVE_WAL, UNRESERVED_KEYWORD) PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD) PG_KEYWORD("restart", RESTART, UNRESERVED_KEYWORD) PG_KEYWORD("restrict", RESTRICT, UNRESERVED_KEYWORD) @@ -374,6 +386,7 @@ PG_KEYWORD("sql", SQL_P, UNRESERVED_KEYWORD) PG_KEYWORD("stable", STABLE, UNRESERVED_KEYWORD) PG_KEYWORD("standalone", STANDALONE_P, UNRESERVED_KEYWORD) PG_KEYWORD("start", START, UNRESERVED_KEYWORD) +PG_KEYWORD("start_replication", START_REPLICATION, UNRESERVED_KEYWORD) PG_KEYWORD("statement", STATEMENT, UNRESERVED_KEYWORD) PG_KEYWORD("statistics", STATISTICS, UNRESERVED_KEYWORD) PG_KEYWORD("stdin", STDIN, UNRESERVED_KEYWORD) @@ -390,12 +403,15 @@ PG_KEYWORD("table", TABLE, RESERVED_KEYWORD) PG_KEYWORD("tables", TABLES, UNRESERVED_KEYWORD) PG_KEYWORD("tablesample", TABLESAMPLE, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("tablespace", TABLESPACE, UNRESERVED_KEYWORD) +PG_KEYWORD("tablespace_map", TABLESPACE_MAP_P, UNRESERVED_KEYWORD) PG_KEYWORD("temp", TEMP, UNRESERVED_KEYWORD) PG_KEYWORD("template", TEMPLATE, UNRESERVED_KEYWORD) PG_KEYWORD("temporary", TEMPORARY, UNRESERVED_KEYWORD) PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD) PG_KEYWORD("then", THEN, RESERVED_KEYWORD) PG_KEYWORD("time", TIME, COL_NAME_KEYWORD) +PG_KEYWORD("timeline", TIMELINE_P, UNRESERVED_KEYWORD) +PG_KEYWORD("timeline_history", TIMELINE_HISTORY, UNRESERVED_KEYWORD) PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD) PG_KEYWORD("to", TO, RESERVED_KEYWORD) PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD) @@ -419,6 +435,7 @@ PG_KEYWORD("unlisten", UNLISTEN, UNRESERVED_KEYWORD) PG_KEYWORD("unlogged", UNLOGGED, UNRESERVED_KEYWORD) PG_KEYWORD("until", UNTIL, UNRESERVED_KEYWORD) PG_KEYWORD("update", UPDATE, UNRESERVED_KEYWORD) +PG_KEYWORD("use_snapshot", USE_SNAPSHOT, UNRESERVED_KEYWORD) PG_KEYWORD("user", USER, RESERVED_KEYWORD) PG_KEYWORD("using", USING, RESERVED_KEYWORD) PG_KEYWORD("vacuum", VACUUM, UNRESERVED_KEYWORD) @@ -435,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD) PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD) PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD) PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD) +PG_KEYWORD("wal", WAL_P, UNRESERVED_KEYWORD) PG_KEYWORD("when", WHEN, RESERVED_KEYWORD) PG_KEYWORD("where", WHERE, RESERVED_KEYWORD) PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD) diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 2ca903872e..65026eb83b 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -16,6 +16,9 @@ #include "fmgr.h" +#include "nodes/nodes.h" +#include "nodes/replnodes.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -38,7 +41,7 @@ extern int wal_sender_timeout; extern bool log_replication_commands; extern void InitWalSender(void); -extern bool exec_replication_command(const char *query_string); +extern bool exec_replication_command(ReplicationCmd *cmd, const char *query_string, bool isToplevel); extern void WalSndErrorCleanup(void); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); -- 2.12.0.264.gd6db3f2165.dirty
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers