On 2020-07-13 14:21, Daniel Gustafsson wrote:
This patch require some rewording of documentation/comments and variable names after the language change introduced by 229f8c219f8f..a9a4a7ad565b, the thread
below can be used as reference for how to change:

https://www.postgresql.org/message-id/flat/20200615182235.x7lch5n6kcjq4aue%40alap3.anarazel.de


Thank you for the heads up!

I updated the most recent patch and removed the use of "master" from it, replacing it with "primary".

--
Anna Akenteva
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
commit 1021b166c0cb4e4017d503eb481e787638842ab1
Author: Akenteva Anna <a.akent...@postgrespro.ru>
Date:   Tue Aug 18 12:37:16 2020 +0300

    wait for lsn

diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index c23bbfb4e71..fbfadbac8e9 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
@@ -63,6 +63,17 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
    <xref linkend="sql-set-transaction"/>
    was executed.
   </para>
+
+  <para>
+   The <literal>WAIT FOR</literal> clause allows to wait for the target log
+   sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+   starting the transaction in <productname>PostgreSQL</productname> databases
+   with primary-standby asynchronous replication. Wait time can be limited by
+   specifying a timeout, which is measured in milliseconds and must be a positive
+   integer. If <acronym>LSN</acronym> was not reached before timeout, transaction
+   doesn't begin. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -146,6 +157,10 @@ BEGIN;
    different purpose in embedded SQL. You are advised to be careful
    about the transaction semantics when porting database applications.
   </para>
+
+  <para>
+   There is no <command>WAIT FOR</command> clause in the SQL standard.
+  </para>
  </refsect1>
 
  <refsect1>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index d6cd1d41779..016e59930df 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] [ WAIT FOR LSN <replaceable class="parameter">lsn_value</replaceable> [TIMEOUT <replaceable class="parameter">number_of_milliseconds</replaceable> ] ]
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
@@ -40,6 +40,17 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
    characteristics, as if <xref linkend="sql-set-transaction"/> was executed. This is the same
    as the <xref linkend="sql-begin"/> command.
   </para>
+
+  <para>
+   The <literal>WAIT FOR</literal> clause allows to wait for the target log
+   sequence number (<acronym>LSN</acronym>) to be replayed on standby before
+   starting the transaction in <productname>PostgreSQL</productname> databases
+   with primary-standby asynchronous replication. Wait time can be limited by
+   specifying a timeout, which is measured in milliseconds and must be a positive
+   integer. If <acronym>LSN</acronym> was not reached before timeout, transaction
+   doesn't begin. Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -78,6 +89,10 @@ START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable
    omitted.
   </para>
 
+  <para>
+   There is no <command>WAIT FOR</command> clause in the SQL standard.
+  </para>
+
   <para>
    See also the compatibility section of <xref linkend="sql-set-transaction"/>.
   </para>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 09c01ed4ae4..19cb04869c1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
 #include "catalog/pg_database.h"
 #include "commands/progress.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/controldata_utils.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
@@ -7385,6 +7386,15 @@ StartupXLOG(void)
 					break;
 				}
 
+				/*
+				 * If we replayed an LSN that someone was waiting for,
+				 * set latches in shared memory array to notify the waiter.
+				 */
+				if (XLogCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN())
+				{
+					WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+				}
+
 				/* Else, try to fetch the next WAL record */
 				record = ReadRecord(xlogreader, LOG, false);
 			} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index d4815d3ce65..9b310926c12 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -57,6 +57,7 @@ OBJS = \
 	user.o \
 	vacuum.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..e1123df321e
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,282 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/backendid.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/* Add to shared memory array */
+static void AddWaitedLSN(XLogRecPtr lsn_to_wait);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	pg_atomic_uint64 min_lsn; /* XLogRecPtr of minimal waited for LSN */
+	slock_t		mutex;
+	/* LSNs that different backends are waiting */
+	XLogRecPtr	lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static WaitState *state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+AddWaitedLSN(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->lsn[MyBackendId] = lsn_to_wait;
+
+	if (lsn_to_wait < state->min_lsn.value)
+		state->min_lsn.value = lsn_to_wait;
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ */
+void
+DeleteWaitedLSN(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete;
+
+	SpinLockAcquire(&state->mutex);
+
+	lsn_to_delete = state->lsn[MyBackendId];
+	state->lsn[MyBackendId] = InvalidXLogRecPtr;
+
+	/* If we are deleting the minimal LSN, then choose the next min_lsn */
+	if (lsn_to_delete != InvalidXLogRecPtr &&
+		lsn_to_delete == state->min_lsn.value)
+	{
+		state->min_lsn.value = PG_UINT64_MAX;
+		for (i = 2; i <= state->backend_maxid; i++)
+			if (state->lsn[i] != InvalidXLogRecPtr &&
+				state->lsn[i] < state->min_lsn.value)
+				state->min_lsn.value = state->lsn[i];
+	}
+
+	/* If deleting from the end of the array, shorten the array's used part */
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MyBackendId); i >= 2; i--)
+			if (state->lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, lsn);
+	size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/*
+ * Initialize an array of events to wait for in shared memory
+ */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < (MaxBackends + 1); i++)
+			state->lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		state->min_lsn.value = PG_UINT64_MAX;
+	}
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int			backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+
+	for (i = 2; i <= backend_maxid; i++)
+	{
+		backend = BackendIdGetProc(i);
+
+		if (backend && state->lsn[i] != 0 &&
+			state->lsn[i] <= cur_lsn)
+		{
+			SetLatch(&backend->procLatch);
+		}
+	}
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Get minimal LSN that someone waits for
+ */
+XLogRecPtr
+GetMinWaitedLSN(void)
+{
+	return state->min_lsn.value;
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed,
+ * postmaster dies or timeout happens. Timeout is specified in milliseconds.
+ * Returns true if LSN was reached and false otherwise.
+ */
+bool
+WaitUtility(XLogRecPtr target_lsn, const int timeout_ms)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	int			latch_events;
+	float8		endtime;
+	bool		res = false;
+	bool		wait_forever = (timeout_ms <= 0);
+
+	endtime = GetNowFloat() + timeout_ms / 1000.0;
+
+	latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	/* Check if we already reached the needed LSN */
+	if (cur_lsn >= target_lsn)
+		return true;
+
+	AddWaitedLSN(target_lsn);
+
+	for (;;)
+	{
+		int			rc;
+		float8		time_left = 0;
+		long		time_left_ms = 0;
+
+		time_left = endtime - GetNowFloat();
+
+		/* Use 100 ms as the default timeout to check for interrupts */
+		if (wait_forever || time_left < 0 || time_left > 0.1)
+			time_left_ms = 100;
+		else
+			time_left_ms = (long) ceil(time_left * 1000.0);
+
+		/* If interrupt, LockErrorCleanup() will do DeleteWaitedLSN() for us */
+		CHECK_FOR_INTERRUPTS();
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, time_left_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		ResetLatch(MyLatch);
+
+		if (rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+
+		if (rc & WL_TIMEOUT)
+		{
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+			/* If the time specified by user has passed, stop waiting */
+			time_left = endtime - GetNowFloat();
+			if (!wait_forever && time_left <= 0.0)
+				break;
+		}
+
+		/* If LSN has been replayed */
+		if (target_lsn <= cur_lsn)
+			break;
+	}
+
+	DeleteWaitedLSN();
+
+	if (cur_lsn < target_lsn)
+		ereport(WARNING,
+				(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
+				 errmsg("didn't start transaction because LSN was not reached"),
+				 errhint("Try to increase wait time.")));
+	else
+		res = true;
+
+	return res;
+}
+
+/*
+ * Implementation of WAIT FOR
+ */
+int
+WaitMain(WaitClause *stmt, DestReceiver *dest)
+{
+	TupleDesc	tupdesc;
+	TupOutputState *tstate;
+	XLogRecPtr	target_lsn;
+	bool		res = false;
+
+	target_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+												 CStringGetDatum(stmt->lsn)));
+	res = WaitUtility(target_lsn, stmt->timeout);
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send the result */
+	do_text_output_oneline(tstate, res ? "t" : "f");
+	end_tup_output(tstate);
+	return res;
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 89c409de664..2e8041a95df 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3749,10 +3749,22 @@ _copyTransactionStmt(const TransactionStmt *from)
 	COPY_STRING_FIELD(savepoint_name);
 	COPY_STRING_FIELD(gid);
 	COPY_SCALAR_FIELD(chain);
+	COPY_NODE_FIELD(wait);
 
 	return newnode;
 }
 
+static WaitClause *
+_copyWaitClause(const WaitClause *from)
+{
+	WaitClause *newnode = makeNode(WaitClause);
+
+	COPY_STRING_FIELD(lsn);
+	COPY_SCALAR_FIELD(timeout);
+
+	return newnode;
+};
+
 static CompositeTypeStmt *
 _copyCompositeTypeStmt(const CompositeTypeStmt *from)
 {
@@ -5340,6 +5352,9 @@ copyObjectImpl(const void *from)
 		case T_TransactionStmt:
 			retval = _copyTransactionStmt(from);
 			break;
+		case T_WaitClause:
+			retval = _copyWaitClause(from);
+			break;
 		case T_CompositeTypeStmt:
 			retval = _copyCompositeTypeStmt(from);
 			break;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index e3f33c40be5..c04412ea0d9 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1542,6 +1542,16 @@ _equalTransactionStmt(const TransactionStmt *a, const TransactionStmt *b)
 	COMPARE_STRING_FIELD(savepoint_name);
 	COMPARE_STRING_FIELD(gid);
 	COMPARE_SCALAR_FIELD(chain);
+	COMPARE_NODE_FIELD(wait);
+
+	return true;
+}
+
+static bool
+_equalWaitClause(const WaitClause *a, const WaitClause *b)
+{
+	COMPARE_STRING_FIELD(lsn);
+	COMPARE_SCALAR_FIELD(timeout);
 
 	return true;
 }
@@ -3392,6 +3402,9 @@ equal(const void *a, const void *b)
 		case T_TransactionStmt:
 			retval = _equalTransactionStmt(a, b);
 			break;
+		case T_WaitClause:
+			retval = _equalWaitClause(a, b);
+			break;
 		case T_CompositeTypeStmt:
 			retval = _equalCompositeTypeStmt(a, b);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index e2f177515da..626b7552629 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2787,6 +2787,28 @@ _outDefElem(StringInfo str, const DefElem *node)
 	WRITE_LOCATION_FIELD(location);
 }
 
+static void
+_outTransactionStmt(StringInfo str, const TransactionStmt *node)
+{
+	WRITE_NODE_TYPE("TRANSACTIONSTMT");
+
+	WRITE_STRING_FIELD(savepoint_name);
+	WRITE_STRING_FIELD(gid);
+	WRITE_NODE_FIELD(options);
+	WRITE_BOOL_FIELD(chain);
+	WRITE_ENUM_FIELD(kind, TransactionStmtKind);
+	WRITE_NODE_FIELD(wait);
+}
+
+static void
+_outWaitClause(StringInfo str, const WaitClause *node)
+{
+	WRITE_NODE_TYPE("WAITCLAUSE");
+
+	WRITE_STRING_FIELD(lsn);
+	WRITE_UINT_FIELD(timeout);
+}
+
 static void
 _outTableLikeClause(StringInfo str, const TableLikeClause *node)
 {
@@ -4337,6 +4359,12 @@ outNode(StringInfo str, const void *obj)
 			case T_PartitionRangeDatum:
 				_outPartitionRangeDatum(str, obj);
 				break;
+			case T_TransactionStmt:
+				_outTransactionStmt(str, obj);
+				break;
+			case T_WaitClause:
+				_outWaitClause(str, obj);
+				break;
 
 			default:
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index dbb47d49829..78a17190497 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -598,6 +598,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
+%type <ival>		wait_time
+%type <node>		wait_for
 
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
@@ -667,7 +669,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -698,7 +700,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P
 
 	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+	TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
 	TREAT TRIGGER TRIM TRUE_P
 	TRUNCATE TRUSTED TYPE_P TYPES_P
 
@@ -708,7 +710,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAIT WHEN WHERE WHITESPACE_P WINDOW
+	WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -9724,18 +9727,20 @@ TransactionStmt:
 					n->chain = $3;
 					$$ = (Node *)n;
 				}
-			| BEGIN_P opt_transaction transaction_mode_list_or_empty
+			| BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 					n->kind = TRANS_STMT_BEGIN;
 					n->options = $3;
+					n->wait = $4;
 					$$ = (Node *)n;
 				}
-			| START TRANSACTION transaction_mode_list_or_empty
+			| START TRANSACTION transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 					n->kind = TRANS_STMT_START;
 					n->options = $3;
+					n->wait = $4;
 					$$ = (Node *)n;
 				}
 			| COMMIT opt_transaction opt_transaction_chain
@@ -14012,6 +14017,25 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*
+ * WAIT FOR clause of BEGIN and START TRANSACTION statements
+ */
+wait_for:
+			WAIT FOR LSN Sconst wait_time
+				{
+					WaitClause *n = makeNode(WaitClause);
+					n->lsn = $4;
+					n->timeout = $5;
+					$$ = (Node *)n;
+				}
+			| /* EMPTY */		{ $$ = NULL; }
+		;
+
+wait_time:
+			TIMEOUT Iconst		{ $$ = $2; }
+			| /* EMPTY */		{ $$ = 0; }
+		;
+
 
 /*
  * Aggregate decoration clauses
@@ -15158,6 +15182,7 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATERIALIZED
@@ -15285,6 +15310,7 @@ unreserved_keyword:
 			| TEMPORARY
 			| TEXT_P
 			| TIES
+			| TIMEOUT
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -15311,6 +15337,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 96c2aaabbd6..3b941dc7b8c 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -23,6 +23,7 @@
 #include "access/syncscan.h"
 #include "access/twophase.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -268,6 +270,11 @@ CreateSharedMemoryAndSemaphores(void)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 
+	/*
+	 * Init array of events for the WAIT FOR clause in shared memory
+	 */
+	WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index aa9fbd80545..48dacb3366e 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -38,6 +38,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -712,6 +713,9 @@ LockErrorCleanup(void)
 
 	AbortStrongLockAcquire();
 
+	/* If BEGIN WAIT FOR LSN was interrupted, then stop waiting for that LSN */
+	DeleteWaitedLSN();
+
 	/* Nothing to do if we weren't waiting for a lock */
 	if (lockAwaited == NULL)
 	{
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 9b0c376c8cb..618804e0311 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -57,6 +57,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -593,6 +594,18 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_START:
 						{
 							ListCell   *lc;
+							WaitClause *waitstmt = (WaitClause *) stmt->wait;
+
+							/* WAIT FOR cannot be used on primary */
+							if (stmt->wait && !RecoveryInProgress())
+								ereport(ERROR,
+										(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+										 errmsg("WAIT FOR can only be "
+												"used on standby")));
+
+							/* If needed to WAIT FOR something but failed */
+							if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+								break;
 
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 37c23c9155a..2181191d99e 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -373,8 +373,6 @@ pg_sleep(PG_FUNCTION_ARGS)
 	 * less than the specified time when WaitLatch is terminated early by a
 	 * non-query-canceling signal such as SIGHUP.
 	 */
-#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
-
 	endtime = GetNowFloat() + secs;
 
 	for (;;)
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..d08ee574ed3
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+#include "nodes/parsenodes.h"
+
+extern bool WaitUtility(XLogRecPtr lsn, const int timeout_ms);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWaitedLSN(void);
+extern int	WaitMain(WaitClause *stmt, DestReceiver *dest);
+extern void DeleteWaitedLSN(void);
+
+#endif							/* WAIT_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 381d84b4e4f..822827aa32d 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -492,6 +492,7 @@ typedef enum NodeTag
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
 	T_SQLCmd,
+	T_WaitClause,
 
 	/*
 	 * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 151bcdb7ef5..6a21ed5faeb 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1431,6 +1431,17 @@ typedef struct OnConflictClause
 	int			location;		/* token location, or -1 if unknown */
 } OnConflictClause;
 
+/*
+ * WaitClause -
+ *		representation of WAIT FOR clause for BEGIN and START TRANSACTION.
+ */
+typedef struct WaitClause
+{
+	NodeTag		type;
+	char	   *lsn;			/* LSN to wait for */
+	int			timeout;		/* Number of milliseconds to limit wait time */
+} WaitClause;
+
 /*
  * CommonTableExpr -
  *	   representation of WITH list element
@@ -3061,6 +3072,7 @@ typedef struct TransactionStmt
 	char	   *savepoint_name; /* for savepoint commands */
 	char	   *gid;			/* for two-phase-commit related commands */
 	bool		chain;			/* AND CHAIN option */
+	Node	   *wait;			/* WAIT FOR clause */
 } TransactionStmt;
 
 /* ----------------------
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 08f22ce211d..6e1848fe4cc 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -243,6 +243,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("materialized", MATERIALIZED, UNRESERVED_KEYWORD)
@@ -410,6 +411,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD)
 PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD)
@@ -450,6 +452,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 03a1de569f0..eaeeb79c411 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -109,4 +109,6 @@ extern int	date2isoyearday(int year, int mon, int mday);
 
 extern bool TimestampTimestampTzRequiresRewrite(void);
 
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
 #endif							/* TIMESTAMP_H */
diff --git a/src/test/recovery/t/020_begin_wait.pl b/src/test/recovery/t/020_begin_wait.pl
new file mode 100644
index 00000000000..1cb3c59cace
--- /dev/null
+++ b/src/test/recovery/t/020_begin_wait.pl
@@ -0,0 +1,85 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Initialize primary node
+my $node_primary = get_new_node('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Using the backup, create a streaming standby with a 1 second delay
+my $node_standby = get_new_node('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Check that timeouts make us wait for the specified time (1s here)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $two_seconds = 2000; # in milliseconds
+my $start_time = time();
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '0/FFFFFFFF' TIMEOUT $two_seconds");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $two_seconds, "WAIT FOR TIMEOUT waits for enough time");
+
+
+# Check that timeouts let us stop waiting right away, before reaching target LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my ($ret, $out, $err) = $node_standby->psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn1' TIMEOUT 1");
+
+ok($ret == 0, "zero return value when failed to WAIT FOR LSN on standby");
+ok($err =~ /WARNING:  didn't start transaction because LSN was not reached/,
+	"correct error message when failed to WAIT FOR LSN on standby");
+ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN");
+
+
+# Check that WAIT FOR works fine and reaches target LSN if given no timeout
+
+# Add data on primary, memorize primary's last LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Wait for it to appear on replica, memorize replica's last LSN
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn2'");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+
+# Make sure that primary's and replica's LSNs are the same after WAIT
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)");
+ok($compare_lsns eq 0,
+	"standby reached the same LSN as primary before starting transaction");
+
+
+# Make sure that it's not allowed to use WAIT FOR on primary
+($ret, $out, $err) = $node_primary->psql('postgres',
+	"BEGIN WAIT FOR LSN '0/FFFFFFFF'");
+
+ok($ret != 0, "non-zero return value when trying to WAIT FOR LSN on primary");
+ok($err =~ /ERROR:  WAIT FOR can only be used on standby/,
+	"correct error message when trying to WAIT FOR LSN on primary");
+ok($out eq '', "empty output when trying to WAIT FOR LSN on primary");
+
+
+$node_standby->stop;
+$node_primary->stop;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 3d990463ce9..eee95edfe11 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2679,6 +2679,7 @@ WSABUF
 WSADATA
 WSANETWORKEVENTS
 WSAPROTOCOL_INFO
+WaitClause
 WaitEvent
 WaitEventActivity
 WaitEventClient
@@ -2687,6 +2688,7 @@ WaitEventIPC
 WaitEventSet
 WaitEventTimeout
 WaitPMResult
+WaitState
 WalCloseMethod
 WalLevel
 WalRcvData

Reply via email to