Alexander Korotkov писал 2017-09-26 12:07:
I propose following syntax options.WAITLSN lsn; WAITLSN lsn TIMEOUT delay; WAITLSN lsn INFINITE; WAITLSN lsn NOWAIT; For me that looks rather better. What do you think?
I agree with you, now syntax looks better. New patch attached to tha mail. Ants Aasma писал 2017-09-26 13:00:
Exposing this interface as WAITLSN will encode that visibility order matches LSN order. This removes any chance of fixing for example visibility order of async/vs/sync transactions. Just renaming it so the token is an opaque commit visibility token that just happens to be a LSN would still allow for progress in transaction management. For example, making PostgreSQL distributed will likely want timestamp and/or vector clock based visibility rules.
I'm sorry I did not understand exactly what you meant. Please explain this in more detail. -- Ivan Kartyshov Postgres Professional: http://www.postgrespro.com The Russian Postgres Company
commit 217f842726531edb1b0056a5c5727ab01bab7f9b Author: i.kartyshov <i.kartys...@postgrespro.com> Date: Mon Oct 23 12:08:59 2017 +0300 Cherry picked and ported 11dev diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index 01acc2e..6792eb0 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -181,6 +181,7 @@ Complete list of usable sgml source files in this directory. <!ENTITY update SYSTEM "update.sgml"> <!ENTITY vacuum SYSTEM "vacuum.sgml"> <!ENTITY values SYSTEM "values.sgml"> +<!ENTITY waitlsn SYSTEM "waitlsn.sgml"> <!-- applications and utilities --> <!ENTITY clusterdb SYSTEM "clusterdb.sgml"> diff --git a/doc/src/sgml/ref/waitlsn.sgml b/doc/src/sgml/ref/waitlsn.sgml new file mode 100644 index 0000000..6f389ca --- /dev/null +++ b/doc/src/sgml/ref/waitlsn.sgml @@ -0,0 +1,144 @@ +<!-- +doc/src/sgml/ref/waitlsn.sgml +PostgreSQL documentation +--> + +<refentry id="SQL-WAITLSN"> + <indexterm zone="sql-waitlsn"> + <primary>WAITLSN</primary> + </indexterm> + + <refmeta> + <refentrytitle>WAITLSN</refentrytitle> + <manvolnum>7</manvolnum> + <refmiscinfo>SQL - Language Statements</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>WAITLSN</refname> + <refpurpose>wait for the target <acronym>LSN</> to be replayed</refpurpose> + </refnamediv> + + <refsynopsisdiv> +<synopsis> +WAITLSN '<replaceable class="PARAMETER">LSN</replaceable>' [ INFINITELY ] +WAITLSN '<replaceable class="PARAMETER">LSN</replaceable>' TIMEOUT <replaceable class="PARAMETER">wait_time</replaceable> +WAITLSN '<replaceable class="PARAMETER">LSN</replaceable>' NOWAIT +</synopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + + <para> + interprocess communication mechanism to wait for the target log sequence + number (<acronym>LSN</>) on standby in <productname>&productname;</productname> + databases with master-standby asynchronous replication. When run with the + <replaceable>LSN</replaceable> option, the <command>WAITLSN</command> command + waits for the specified <acronym>LSN</> to be replayed. By default, wait + time is unlimited. Waiting can be interrupted using <literal>Ctrl+C</>, or + by shutting down the <literal>postgres</> server. You can also limit the wait + time using the <option>TIMEOUT</> option, or check the target <acronym>LSN</> + status immediately using the <option>NOWAIT</> option. + </para> + + </refsect1> + + <refsect1> + <title>Parameters</title> + + <variablelist> + <varlistentry> + <term><replaceable class="PARAMETER">LSN</replaceable></term> + <listitem> + <para> + Specify the target log sequence number to wait for. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term>INFINITELY</term> + <listitem> + <para> + Wait until the target <acronym>LSN</> is replayed on standby. + This is an optional parameter reinforcing the default behavior. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> + + <varlistentry> + <term>TIMEOUT <replaceable class="PARAMETER">wait_time</replaceable></term> + <listitem> + <para> + Limit the time to wait for the LSN to be replayed. + The specified <replaceable>wait_time</replaceable> must be an integer + and is measured in milliseconds. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>NOWAIT</term> + <listitem> + <para> + Report whether the target <acronym>LSN</> has been replayed already, + without any waiting. + </para> + </listitem> + </varlistentry> + </variablelist> + + <refsect1> + <title>Examples</title> + + <para> + Run <literal>WAITLSN</> from <application>psql</application>, + limiting wait time to 10000 milliseconds: + +<screen> +WAITLSN '0/3F07A6B1' TIMEOUT 10000; +NOTICE: LSN is not reached. Try to increase wait time. +LSN reached +------------- + f +(1 row) +</screen> + </para> + + <para> + Wait until the specified <acronym>LSN</> is replayed: +<screen> +WAITLSN '0/3F07A611'; +LSN reached +------------- + t +(1 row) +</screen> + </para> + + <para> + Limit <acronym>LSN</> wait time to 500000 milliseconds, and then cancel the command: +<screen> +WAITLSN '0/3F0FF791' TIMEOUT 500000; +^CCancel request sent +NOTICE: LSN is not reached. Try to increase wait time. +ERROR: canceling statement due to user request + LSN reached +------------- + f +(1 row) +</screen> +</para> + </refsect1> + + <refsect1> + <title>Compatibility</title> + + <para> + There is no <command>WAITLSN</command> statement in the SQL + standard. + </para> + </refsect1> +</refentry> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index 9000b3a..0c5951a 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -209,6 +209,7 @@ &update; &vacuum; &values; + &waitlsn; </reference> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index dd028a1..117cc9b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -40,6 +40,7 @@ #include "catalog/pg_control.h" #include "catalog/pg_database.h" #include "commands/tablespace.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "port/atomics.h" @@ -149,7 +150,6 @@ const struct config_enum_entry sync_method_options[] = { {NULL, 0, false} }; - /* * Although only "on", "off", and "always" are documented, * we accept all the likely variants of "on" and "off". @@ -7312,6 +7312,15 @@ StartupXLOG(void) break; } + /* + * After update lastReplayedEndRecPtr set Latches in SHMEM array + */ + if (XLogCtl->lastReplayedEndRecPtr >= GetMinWaitLSN()) + { + + WaitLSNSetLatch(XLogCtl->lastReplayedEndRecPtr); + } + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 4a6c99e..0d10117 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \ policy.o portalcmds.o prepare.o proclang.o publicationcmds.o \ schemacmds.o seclabel.o sequence.o statscmds.o subscriptioncmds.o \ tablecmds.o tablespace.o trigger.o tsearchcmds.o typecmds.o user.o \ - vacuum.o vacuumlazy.o variable.o view.o + vacuum.o vacuumlazy.o variable.o view.o waitlsn.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index f7de742..cdeddfc 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -141,7 +141,6 @@ #include "utils/timestamp.h" #include "utils/tqual.h" - /* * Maximum size of a NOTIFY payload, including terminating NULL. This * must be kept small enough so that a notification message fits on one diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c new file mode 100644 index 0000000..db2f549 --- /dev/null +++ b/src/backend/commands/waitlsn.c @@ -0,0 +1,273 @@ +/*------------------------------------------------------------------------- + * + * waitlsn.c + * WaitLSN statment: WAITLSN + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 2017, Regents of PostgresPro + * + * IDENTIFICATION + * src/backend/commands/waitlsn.c + * + *------------------------------------------------------------------------- + */ + +/* + * ------------------------------------------------------------------------- + * Wait for LSN been replayed on slave + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" +#include "pgstat.h" +#include "utils/pg_lsn.h" +#include "storage/latch.h" +#include "miscadmin.h" +#include "storage/spin.h" +#include "storage/backendid.h" +#include "access/xact.h" +#include "storage/shmem.h" +#include "storage/ipc.h" +#include "utils/timestamp.h" +#include "storage/pmsignal.h" +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "commands/waitlsn.h" +#include "storage/proc.h" +#include "access/transam.h" +#include "funcapi.h" +#include "catalog/pg_type.h" +#include "utils/builtins.h" + +/* Latches Own-DisownLatch and AbortCаllBack */ +static uint32 WaitLSNShmemSize(void); +static void WLDisownLatchAbort(XactEvent event, void *arg); +static void WLOwnLatch(XLogRecPtr trg_lsn); +static void WLDisownLatch(void); + +void _PG_init(void); + +/* Shared memory structures */ +typedef struct +{ + int pid; + volatile slock_t slock; + Latch latch; + XLogRecPtr trg_lsn; +} BIDLatch; + +typedef struct +{ + char dummy; // УÐÐ ÐТЬ + int backend_maxid; + XLogRecPtr min_lsn; + BIDLatch l_arr[FLEXIBLE_ARRAY_MEMBER]; +} GlobState; + +static volatile GlobState *state; +bool is_latch_owned = false; + +/* Take Latch for current backend at the begining of WAITLSN */ +static void +WLOwnLatch(XLogRecPtr trg_lsn) +{ + SpinLockAcquire(&state->l_arr[MyBackendId].slock); + OwnLatch(&state->l_arr[MyBackendId].latch); + is_latch_owned = true; + + if (state->backend_maxid < MyBackendId) + state->backend_maxid = MyBackendId; + + state->l_arr[MyBackendId].pid = MyProcPid; + state->l_arr[MyBackendId].trg_lsn = trg_lsn; + SpinLockRelease(&state->l_arr[MyBackendId].slock); + + if (trg_lsn < state->min_lsn) + state->min_lsn = trg_lsn; +} + +/* Release Latch for current backend at the end of WAITLSN */ +static void +WLDisownLatch(void) +{ + int i; + XLogRecPtr trg_lsn = state->l_arr[MyBackendId].trg_lsn; + + SpinLockAcquire(&state->l_arr[MyBackendId].slock); + DisownLatch(&state->l_arr[MyBackendId].latch); + is_latch_owned = false; + state->l_arr[MyBackendId].pid = 0; + state->l_arr[MyBackendId].trg_lsn = InvalidXLogRecPtr; + + /* Update state->min_lsn iff it is nessesary choosing next min_lsn */ + if (state->min_lsn == trg_lsn) + { + state->min_lsn = PG_UINT64_MAX; + for (i = 2; i <= state->backend_maxid; i++) + if (state->l_arr[i].trg_lsn != InvalidXLogRecPtr && + state->l_arr[i].trg_lsn < state->min_lsn) + state->min_lsn = state->l_arr[i].trg_lsn; + } + + if (state->backend_maxid == MyBackendId) + for (i = (MaxConnections+1); i >=2; i--) + if (state->l_arr[i].pid != 0) + { + state->backend_maxid = i; + break; + } + + SpinLockRelease(&state->l_arr[MyBackendId].slock); +} + +/* CallBack function on abort*/ +static void +WLDisownLatchAbort(XactEvent event, void *arg) +{ + if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT || + event == XACT_EVENT_ABORT)) + { + WLDisownLatch(); + } +} + +/* Module load callback */ +void +_PG_init(void) +{ + if (!IsUnderPostmaster) + RegisterXactCallback(WLDisownLatchAbort, NULL); +} + +/* Get size of shared memory to room GlobState */ +static uint32 +WaitLSNShmemSize(void) +{ + return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1); +} + +/* Init array of Latches in shared memory */ +void +WaitLSNShmemInit(void) +{ + bool found; + uint32 i; + + state = (GlobState *) ShmemInitStruct("pg_wait_lsn", + WaitLSNShmemSize(), + &found); + if (!found) + { + for (i = 0; i < (MaxConnections+1); i++) + { + state->l_arr[i].pid = 0; + state->l_arr[i].trg_lsn = InvalidXLogRecPtr; + SpinLockInit(&state->l_arr[i].slock); + InitSharedLatch(&state->l_arr[i].latch); + } + state->backend_maxid = 0; + state->min_lsn = PG_UINT64_MAX; + } +} + +/* Set all Latches in shared memory cause new LSN been replayed*/ +void +WaitLSNSetLatch(XLogRecPtr cur_lsn) +{ + uint32 i; + + for (i = 2; i <= state->backend_maxid; i++) + { + if (state->l_arr[i].trg_lsn != 0) + { + SpinLockAcquire(&state->l_arr[i].slock); + if (state->l_arr[i].trg_lsn <= cur_lsn) + SetLatch(&state->l_arr[i].latch); + SpinLockRelease(&state->l_arr[i].slock); + } + } +} + +/* Get minimal LSN that will be next */ +XLogRecPtr +GetMinWaitLSN(void) +{ + return state->min_lsn; +} + +/* + * On WAITLSN own latch and wait till LSN is replayed, Postmaster death, interruption + * or timeout. + */ +void +WaitLSNUtility(const char *lsn, const int delay, DestReceiver *dest) +{ + XLogRecPtr trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn))); + XLogRecPtr cur_lsn; + int latch_events; + uint64 tdelay = delay; + long secs; + int microsecs; + TimestampTz timer = GetCurrentTimestamp(); + TupOutputState *tstate; + TupleDesc tupdesc; + char *value = "f"; + + if (delay > 0) + latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; + else + latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + WLOwnLatch(trg_lsn); + + for (;;) + { + cur_lsn = GetXLogReplayRecPtr(NULL); + + /* If LSN had been Replayed */ + if (trg_lsn <= cur_lsn) + break; + + /* If the postmaster dies, finish immediately */ + if (!PostmasterIsAlive()) + break; + + /* If Delay time is over */ + if (latch_events & WL_TIMEOUT) + { + if (TimestampDifferenceExceeds(timer,GetCurrentTimestamp(),delay)) + break; + TimestampDifference(timer,GetCurrentTimestamp(),&secs, µsecs); + tdelay = delay - (secs*1000 + microsecs/1000); + } + + MyPgXact->xmin = InvalidTransactionId; + WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay, WAIT_EVENT_CLIENT_READ); + ResetLatch(&state->l_arr[MyBackendId].latch); + + /* CHECK_FOR_INTERRUPTS if they comes then disown latch current */ + if (InterruptPending) + { + WLDisownLatch(); + ProcessInterrupts(); + } + + } + + WLDisownLatch(); + + if (trg_lsn > cur_lsn) + elog(NOTICE,"LSN is not reached. Try to increase wait time."); + else + value = "t"; + + /* need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0); + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc); + /* Send it */ + do_text_output_oneline(tstate, value); + end_tup_output(tstate); +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 4c83a63..a149b54 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -275,7 +275,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SecLabelStmt SelectStmt TransactionStmt TruncateStmt UnlistenStmt UpdateStmt VacuumStmt VariableResetStmt VariableSetStmt VariableShowStmt - ViewStmt CheckPointStmt CreateConversionStmt + ViewStmt WaitLSNStmt CheckPointStmt CreateConversionStmt DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt @@ -322,7 +322,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <list> OptSchemaEltList %type <boolean> TriggerForSpec TriggerForType -%type <ival> TriggerActionTime +%type <ival> TriggerActionTime WaitDelay %type <list> TriggerEvents TriggerOneEvent %type <value> TriggerFuncArg %type <node> TriggerWhen @@ -636,7 +636,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); HANDLER HAVING HEADER_P HOLD HOUR_P IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P - INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P + INCLUDING INCREMENT INDEX INDEXES INFINITELY INHERIT INHERITS INITIALLY INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION @@ -675,7 +675,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SUBSCRIPTION SUBSTRING SYMMETRIC SYSID SYSTEM_P TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN - TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P + TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM TREAT TRIGGER TRIM TRUE_P TRUNCATE TRUSTED TYPE_P TYPES_P UNBOUNDED UNCOMMITTED UNENCRYPTED UNION UNIQUE UNKNOWN UNLISTEN UNLOGGED @@ -684,7 +684,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAITLSN WHEN WHERE WHITESPACE_P WINDOW + WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE @@ -935,6 +936,7 @@ stmt : | VariableSetStmt | VariableShowStmt | ViewStmt + | WaitLSNStmt | /*EMPTY*/ { $$ = NULL; } ; @@ -13831,6 +13833,44 @@ frame_bound: } ; +/***************************************************************************** + * + * QUERY: + * WAITLSN <LSN> can appear as a query-level command + * + * + *****************************************************************************/ + +WaitLSNStmt: + WAITLSN Sconst + { + WaitLSNStmt *n = makeNode(WaitLSNStmt); + n->lsn = $2; + n->delay = 0; + $$ = (Node *)n; + } + | WAITLSN Sconst TIMEOUT Iconst + { + WaitLSNStmt *n = makeNode(WaitLSNStmt); + n->lsn = $2; + n->delay = $4; + $$ = (Node *)n; + } + | WAITLSN Sconst INFINITELY + { + WaitLSNStmt *n = makeNode(WaitLSNStmt); + n->lsn = $2; + n->delay = 0; + $$ = (Node *)n; + } + | WAITLSN Sconst NOWAIT + { + WaitLSNStmt *n = makeNode(WaitLSNStmt); + n->lsn = $2; + n->delay = 1; + $$ = (Node *)n; + } + ; /* * Supporting nonterminals for expressions. @@ -14705,6 +14745,7 @@ unreserved_keyword: | INCREMENT | INDEX | INDEXES + | INFINITELY | INHERIT | INHERITS | INLINE_P @@ -14843,6 +14884,7 @@ unreserved_keyword: | TEMPLATE | TEMPORARY | TEXT_P + | TIMEOUT | TRANSACTION | TRANSFORM | TRIGGER @@ -14868,6 +14910,7 @@ unreserved_keyword: | VIEW | VIEWS | VOLATILE + | WAITLSN | WHITESPACE_P | WITHIN | WITHOUT diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2d1ed14..932136f 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -22,6 +22,7 @@ #include "access/subtrans.h" #include "access/twophase.h" #include "commands/async.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -271,6 +272,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) AsyncShmemInit(); BackendRandomShmemInit(); + /* + * Init array of Latches in SHMEM for WAITLSN + */ + WaitLSNShmemInit(); + #ifdef EXEC_BACKEND /* diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 82a707a..544baeb 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -56,6 +56,7 @@ #include "commands/user.h" #include "commands/vacuum.h" #include "commands/view.h" +#include "commands/waitlsn.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" @@ -923,6 +924,20 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; } + case T_WaitLSNStmt: + { + WaitLSNStmt *stmt = (WaitLSNStmt *) parsetree; + if (!RecoveryInProgress()) + { + ereport(ERROR,(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), + errmsg("cannot execute %s not during recovery", + "WaitLSN"))); + } + else + WaitLSNUtility(stmt->lsn, stmt->delay, dest); + } + break; + default: /* All other statement types have event trigger support */ ProcessUtilitySlow(pstate, pstmt, queryString, @@ -2481,6 +2496,10 @@ CreateCommandTag(Node *parsetree) tag = "NOTIFY"; break; + case T_WaitLSNStmt: + tag = "WAITLSN"; + break; + case T_ListenStmt: tag = "LISTEN"; break; @@ -3104,6 +3123,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_ALL; break; + case T_WaitLSNStmt: + lev = LOGSTMT_ALL; + break; + case T_ListenStmt: lev = LOGSTMT_ALL; break; diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h new file mode 100644 index 0000000..49cf9e8 --- /dev/null +++ b/src/include/commands/waitlsn.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * waitlsn.h + * WaitLSN notification: WAITLSN + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 2016, Regents of PostgresPRO + * + * src/include/commands/waitlsn.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAITLSN_H +#define WAITLSN_H +#include "tcop/dest.h" + +extern void WaitLSNUtility(const char *lsn, const int delay, DestReceiver *dest); +extern void WaitLSNShmemInit(void); +extern void WaitLSNSetLatch(XLogRecPtr cur_lsn); +extern XLogRecPtr GetMinWaitLSN(void); + +#endif /* WAITLSN_H */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index ffeeb49..201677b 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -479,6 +479,7 @@ typedef enum NodeTag T_DropReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, + T_WaitLSNStmt, T_SQLCmd, /* diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 732e5d6..55ffda8 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3446,4 +3446,16 @@ typedef struct DropSubscriptionStmt DropBehavior behavior; /* RESTRICT or CASCADE behavior */ } DropSubscriptionStmt; +/* ---------------------- + * WaitLSN Statement + * ---------------------- + */ +typedef struct WaitLSNStmt +{ + NodeTag type; + char *lsn; /* Taraget LSN to wait for */ + int delay; /* Delay to wait for LSN*/ + bool nowait; /* No wait for LSN just result*/ +} WaitLSNStmt; + #endif /* PARSENODES_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index f50e45e..618cdb2 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -198,6 +198,7 @@ PG_KEYWORD("including", INCLUDING, UNRESERVED_KEYWORD) PG_KEYWORD("increment", INCREMENT, UNRESERVED_KEYWORD) PG_KEYWORD("index", INDEX, UNRESERVED_KEYWORD) PG_KEYWORD("indexes", INDEXES, UNRESERVED_KEYWORD) +PG_KEYWORD("infinitely", INFINITELY, UNRESERVED_KEYWORD) PG_KEYWORD("inherit", INHERIT, UNRESERVED_KEYWORD) PG_KEYWORD("inherits", INHERITS, UNRESERVED_KEYWORD) PG_KEYWORD("initially", INITIALLY, RESERVED_KEYWORD) @@ -394,6 +395,7 @@ PG_KEYWORD("temporary", TEMPORARY, UNRESERVED_KEYWORD) PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD) PG_KEYWORD("then", THEN, RESERVED_KEYWORD) PG_KEYWORD("time", TIME, COL_NAME_KEYWORD) +PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD) PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD) PG_KEYWORD("to", TO, RESERVED_KEYWORD) PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD) @@ -433,6 +435,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD) PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD) PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD) PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD) +PG_KEYWORD("waitlsn", WAITLSN, UNRESERVED_KEYWORD) PG_KEYWORD("when", WHEN, RESERVED_KEYWORD) PG_KEYWORD("where", WHERE, RESERVED_KEYWORD) PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers