Should rise disscusion on separate utility statement or find
case where procedure version is failed.

1) Classic (wait_classic_v3.patch)
https://www.postgresql.org/message-id/3cc883048264c2e9af022033925ff8db%40postgrespro.ru
==========
advantages: multiple wait events, separate WAIT FOR statement
disadvantages: new words in grammar



WAIT FOR  [ANY | ALL] event [, ...]
BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
    [ WAIT FOR [ANY | ALL] event [, ...]]
event:
LSN value
TIMEOUT number_of_milliseconds
timestamp



2) After style: Kyotaro and Freund (wait_after_within_v2.patch)
https://www.postgresql.org/message-id/d3ff2e363af60b345f82396992595a03%40postgrespro.ru
==========
advantages: no new words in grammar
disadvantages: a little harder to understand



AFTER lsn_event [ WITHIN delay_milliseconds ] [, ...]
BEGIN [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
    [ AFTER lsn_event [ WITHIN delay_milliseconds ]]
START [ WORK | TRANSACTION ] [ transaction_mode [, ...] ]
    [ AFTER lsn_event [ WITHIN delay_milliseconds ]]



3) Procedure style: Tom Lane and Kyotaro (wait_proc_v7.patch)
https://www.postgresql.org/message-id/27171.1586439221%40sss.pgh.pa.us
https://www.postgresql.org/message-id/20210121.173009.235021120161403875.horikyota.ntt%40gmail.com
==========
advantages: no new words in grammar,like it made in
pg_last_wal_replay_lsn
disadvantages: use snapshot xmin trick
SELECT pg_waitlsn(‘LSN’, timeout);
SELECT pg_waitlsn_infinite(‘LSN’);
SELECT pg_waitlsn_no_wait(‘LSN’);


Regards
--
Ivan Kartyshov
Postgres Professional: www.postgrespro.com
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 54b5f22d6e..18695e013e 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY update             SYSTEM "update.sgml">
 <!ENTITY vacuum             SYSTEM "vacuum.sgml">
 <!ENTITY values             SYSTEM "values.sgml">
+<!ENTITY wait               SYSTEM "wait.sgml">
 
 <!-- applications and utilities -->
 <!ENTITY clusterdb          SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..a2794763b1 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+    AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..46a3bcf1a8 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+    AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index e11b4b6130..a83ff4551e 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -216,6 +216,7 @@
    &update;
    &vacuum;
    &values;
+   &wait;
 
  </reference>
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index c61566666a..08399452ce 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1780,6 +1781,15 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			* If we replayed an LSN that someone was waiting for,
+			* set latches in shared memory array to notify the waiter.
+			*/
+			if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWait())
+			{
+				 WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+			}
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 42cced9ebe..ec6ab7722a 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'wait.c',
 )
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..5baa6e2ee3
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,338 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	pg_atomic_uint64	min_lsn;
+	slock_t		mutex;
+	XLogRecPtr	waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+	if (lsn_to_wait < state->min_lsn.value)
+		state->min_lsn.value = lsn_to_wait;
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete = state->waited_lsn[MyBackendId];
+
+	state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+	SpinLockAcquire(&state->mutex);
+
+	/* If we need to choose the next min_lsn, update state->min_lsn */
+	if (state->min_lsn.value == lsn_to_delete)
+	{
+		state->min_lsn.value = PG_UINT64_MAX;
+		for (i = 2; i <= state->backend_maxid; i++)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+				state->waited_lsn[i] < state->min_lsn.value)
+				state->min_lsn.value = state->waited_lsn[i];
+	}
+
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MyBackendId); i >= 2; i--)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, waited_lsn);
+	size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < (MaxBackends + 1); i++)
+			state->waited_lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		state->min_lsn.value = PG_UINT64_MAX;
+	}
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int 		backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+	SpinLockRelease(&state->mutex);
+
+	for (i = 2; i <= backend_maxid; i++)
+	{
+		backend = BackendIdGetProc(i);
+		if (state->waited_lsn[i] != 0)
+		{
+			if (backend && state->waited_lsn[i] <= cur_lsn)
+				SetLatch(&backend->procLatch);
+		}
+	}
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+	return pg_atomic_read_u64(&state->min_lsn);
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	int			latch_events;
+	float8		endtime;
+	uint		res = 0;
+
+	if (!RecoveryInProgress())
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Work only in standby mode")));
+		return false;
+	}
+
+#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
+	endtime = GetNowFloat() + secs;
+
+	latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	if (lsn != InvalidXLogRecPtr)
+	{
+		/* Just check if we reached */
+		if (lsn < cur_lsn || secs < 0)
+			return (lsn < cur_lsn);
+
+		latch_events |= WL_LATCH_SET;
+		AddEvent(lsn);
+	}
+	else if (!secs)
+		return 1;
+
+	for (;;)
+	{
+		int			rc;
+		float8		delay = 0;
+		long		delay_ms;
+
+		/* If LSN has been replayed */
+		if (lsn && lsn <= cur_lsn)
+			break;
+
+		if (secs > 0)
+			delay = endtime - GetNowFloat();
+		else if (secs == 0)
+			/*
+			* If we wait forever, then 1 minute timeout to check
+			* for Interupts.
+			*/
+			delay = 60;
+
+		if (delay > 0.0)
+			delay_ms = (long) ceil(delay * 1000.0);
+		else
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS,
+		 * then delete the current event from array.
+		 */
+		if (InterruptPending)
+		{
+			if (lsn != InvalidXLogRecPtr)
+				DeleteEvent();
+			ProcessInterrupts();
+		}
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+
+		if (lsn && rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+	}
+
+	if (lsn != InvalidXLogRecPtr)
+		DeleteEvent();
+
+	if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+		elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+	else
+		res = 1;
+
+	return res;
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+	int			ret;
+	float8		val;
+	Oid			types[] = { time->consttype };
+	Datum		values[] = { time->constvalue };
+	char		nulls[] = { " " };
+	Datum		result;
+	bool		isnull;
+
+	SPI_connect();
+
+	if (time->consttype == 1083)
+		ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+									1, types, values, nulls, true, 0);
+	else if (time->consttype == 1266)
+		ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+									1, types, values, nulls, true, 0);
+	else
+		ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+									1, types, values, nulls, true, 0);
+
+	Assert(ret >= 0);
+	result = SPI_getbinval(SPI_tuptable->vals[0],
+						   SPI_tuptable->tupdesc,
+						   1, &isnull);
+
+	Assert(!isnull);
+	val = DatumGetFloat8(result);
+
+	elog(INFO, "time: %f", val);
+
+	SPI_finish();
+	return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+	TupleDesc	tupdesc;
+	TupOutputState *tstate;
+	XLogRecPtr	lsn = InvalidXLogRecPtr;
+	int		res = 0;
+
+	lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+												 CStringGetDatum(stmt->lsn)));
+	res = WaitUtility(lsn, stmt->delay);
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send it */
+	do_text_output_oneline(tstate, res?"t":"f");
+	end_tup_output(tstate);
+	return res;
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 7a1dfb6364..5f3a7edf6a 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -405,7 +405,6 @@ transformStmt(ParseState *pstate, Node *parseTree)
 			result = transformCallStmt(pstate,
 									   (CallStmt *) parseTree);
 			break;
-
 		default:
 
 			/*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index d631ac89a9..215dabfdb7 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -312,7 +312,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 		SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
 		UnlistenStmt UpdateStmt VacuumStmt
 		VariableResetStmt VariableSetStmt VariableShowStmt
-		ViewStmt CheckPointStmt CreateConversionStmt
+		ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
 		DeallocateStmt PrepareStmt ExecuteStmt
 		DropOwnedStmt ReassignOwnedStmt
 		AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -644,6 +644,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
+%type <ival>		wait_time
+%type <node>		wait_for
 
 %type <node>	json_format_clause_opt
 				json_value_expr
@@ -1102,6 +1104,7 @@ stmt:
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| WaitStmt
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -10911,12 +10914,13 @@ TransactionStmt:
 					n->location = -1;
 					$$ = (Node *) n;
 				}
-			| START TRANSACTION transaction_mode_list_or_empty
+			| START TRANSACTION transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 
 					n->kind = TRANS_STMT_START;
 					n->options = $3;
+					n->wait = $4;
 					n->location = -1;
 					$$ = (Node *) n;
 				}
@@ -11015,12 +11019,13 @@ TransactionStmt:
 		;
 
 TransactionStmtLegacy:
-			BEGIN_P opt_transaction transaction_mode_list_or_empty
+			BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 
 					n->kind = TRANS_STMT_BEGIN;
 					n->options = $3;
+					n->wait = $4;
 					n->location = -1;
 					$$ = (Node *) n;
 				}
@@ -15854,6 +15859,37 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*****************************************************************************
+ *
+ *		QUERY:
+ *				AFTER LSN_value [WITHIN delay timestamp]
+ *
+ *****************************************************************************/
+WaitStmt:
+			AFTER Sconst wait_time
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = $2;
+					n->delay = $3;
+					$$ = (Node *)n;
+				}
+		;
+wait_for:
+			AFTER Sconst wait_time
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = $2;
+					n->delay = $3;
+					$$ = (Node *)n;
+				}
+			| /* EMPTY */		{ $$ = NULL; }
+		;
+
+wait_time:
+			WITHIN Iconst		{ $$ = $2; }
+			| /* EMPTY */           { $$ = 0; }
+		;
+
 
 /*
  * Aggregate decoration clauses
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2225a4a6e6..0364f5d195 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -145,6 +146,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, AsyncShmemSize());
 	size = add_size(size, StatsShmemSize());
 	size = add_size(size, WaitEventExtensionShmemSize());
+	size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -237,6 +239,11 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize subsystems */
 	CreateOrAttachShmemStructs();
 
+	/*
+	 * Init array of Latches in shared memory for wait lsn
+	 */
+	WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 366a27ae8e..2abe394fad 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include <float.h>
 
 #include "access/htup_details.h"
 #include "access/reloptions.h"
@@ -59,6 +60,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -72,6 +74,9 @@
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
 
 /* Hook for plugins to get control in ProcessUtility() */
 ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -272,6 +277,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 		case T_LoadStmt:
 		case T_PrepareStmt:
 		case T_UnlistenStmt:
+		case T_WaitStmt:
 		case T_VariableSetStmt:
 			{
 				/*
@@ -612,6 +618,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_START:
 						{
 							ListCell   *lc;
+							WaitStmt   *waitstmt = (WaitStmt *) stmt->wait;
+
+							/* If needed to WAIT FOR something but failed */
+							if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+								break;
 
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
@@ -1069,6 +1080,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				break;
 			}
 
+		case T_WaitStmt:
+			{
+				WaitStmt   *stmt = (WaitStmt *) parsetree;
+				WaitMain(stmt, dest);
+				break;
+			}
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2847,6 +2865,10 @@ CreateCommandTag(Node *parsetree)
 			tag = CMDTAG_NOTIFY;
 			break;
 
+		case T_WaitStmt:
+			tag = CMDTAG_WAIT;
+			break;
+
 		case T_ListenStmt:
 			tag = CMDTAG_LISTEN;
 			break;
@@ -3495,6 +3517,10 @@ GetCommandLogLevel(Node *parsetree)
 			lev = LOGSTMT_ALL;
 			break;
 
+		case T_WaitStmt:
+			lev = LOGSTMT_ALL;
+			break;
+
 		case T_ListenStmt:
 			lev = LOGSTMT_ALL;
 			break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..0270160d44
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif   /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e494309da8..cc6ab3b73e 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3524,6 +3524,7 @@ typedef struct TransactionStmt
 	/* for two-phase-commit related commands */
 	char	   *gid pg_node_attr(query_jumble_ignore);
 	bool		chain;			/* AND CHAIN option */
+	Node		*wait;			/* wait lsn clause */
 	/* token location, or -1 if unknown */
 	int			location pg_node_attr(query_jumble_location);
 } TransactionStmt;
@@ -4075,4 +4076,16 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/* ----------------------
+ *		AFTER Statement + AFTER clause of BEGIN statement
+ * ----------------------
+ */
+
+typedef struct WaitStmt
+{
+	NodeTag		type;
+	char	   *lsn;		/* LSN */
+	int			delay;		/* TIMEOUT */
+} WaitStmt;
+
 #endif							/* PARSENODES_H */
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 320ee91512..7fd69e1c7e 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
 PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
 PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
 PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "AFTER", false, false, false)
diff --git a/src/test/recovery/t/040_begin_after.pl b/src/test/recovery/t/040_begin_after.pl
new file mode 100644
index 0000000000..b22e63603c
--- /dev/null
+++ b/src/test/recovery/t/040_begin_after.pl
@@ -0,0 +1,86 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN AFTER '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary AFTER");
+
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "AFTER" command
+#===============================================================================
+# We need to check that AFTER works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, AFTER which ensures a max value of 40.
+# Inside the transaction, AFTER that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ AFTER '$lsn3';
+	SELECT max(a) FROM wait_test;
+	BEGIN AFTER '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "AFTER works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after AFTER.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to AFTER");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/041_after.pl b/src/test/recovery/t/041_after.pl
new file mode 100644
index 0000000000..480ba6b33b
--- /dev/null
+++ b/src/test/recovery/t/041_after.pl
@@ -0,0 +1,99 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 2;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "AFTER '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary AFTER");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"AFTER '$lsn2' WITHIN 1");
+ok($reached_lsn eq "f", "AFTER doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, AFTER which ensures a max value of 40.
+# Inside the transaction, AFTER that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	AFTER '$lsn3';
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+	SELECT max(a) FROM wait_test;
+	AFTER '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "AFTER works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after AFTER.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to AFTER");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 54b5f22d6e..18695e013e 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY update             SYSTEM "update.sgml">
 <!ENTITY vacuum             SYSTEM "vacuum.sgml">
 <!ENTITY values             SYSTEM "values.sgml">
+<!ENTITY wait               SYSTEM "wait.sgml">
 
 <!-- applications and utilities -->
 <!ENTITY clusterdb          SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b021487..b3af16c09f 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+    WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN lsn_value
+    TIMEOUT number_of_milliseconds
+    timestamp
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e345..1b54ed2084 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,21 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_for_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_for_event</replaceable> is:</phrase>
+    WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>and <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN lsn_value
+    TIMEOUT number_of_milliseconds
+    timestamp
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/doc/src/sgml/ref/wait.sgml b/doc/src/sgml/ref/wait.sgml
new file mode 100644
index 0000000000..26cae3ad85
--- /dev/null
+++ b/doc/src/sgml/ref/wait.sgml
@@ -0,0 +1,146 @@
+<!--
+doc/src/sgml/ref/wait.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="sql-wait">
+ <indexterm zone="sql-wait">
+  <primary>WAIT FOR</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle>WAIT FOR</refentrytitle>
+  <manvolnum>7</manvolnum>
+  <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>WAIT FOR</refname>
+  <refpurpose>wait for the target <acronym>LSN</acronym> to be replayed or for specified time to pass</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAIT FOR [ANY | ALL] <replaceable class="parameter">event</replaceable> [, ...]
+
+<phrase>where <replaceable class="parameter">event</replaceable> is one of:</phrase>
+    LSN value
+    TIMEOUT number_of_milliseconds
+    timestamp
+
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>'
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>
+WAIT FOR LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ALL LSN '<replaceable class="parameter">lsn_number</replaceable>' TIMEOUT <replaceable class="parameter">wait_timeout</replaceable>, TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+WAIT FOR ANY LSN '<replaceable class="parameter">lsn_number</replaceable>', TIMESTAMP <replaceable class="parameter">wait_time</replaceable>
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <command>WAIT FOR</command> provides a simple interprocess communication
+   mechanism to wait for timestamp, timeout or the target log sequence number
+   (<acronym>LSN</acronym>) on standby in <productname>PostgreSQL</productname>
+   databases with master-standby asynchronous replication. When run with the
+   <replaceable>LSN</replaceable> option, the <command>WAIT FOR</command>
+   command waits for the specified <acronym>LSN</acronym> to be replayed.
+   If no timestamp or timeout was specified, wait time is unlimited.
+   Waiting can be interrupted using <literal>Ctrl+C</literal>, or
+   by shutting down the <literal>postgres</literal> server.
+  </para>
+
+ </refsect1>
+
+ <refsect1>
+  <title>Parameters</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><replaceable class="parameter">LSN</replaceable></term>
+    <listitem>
+     <para>
+      Specify the target log sequence number to wait for.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>TIMEOUT <replaceable class="parameter">wait_timeout</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time interval to wait for the LSN to be replayed.
+      The specified <replaceable>wait_timeout</replaceable> must be an integer
+      and is measured in milliseconds.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term>UNTIL TIMESTAMP <replaceable class="parameter">wait_time</replaceable></term>
+    <listitem>
+     <para>
+      Limit the time to wait for the LSN to be replayed.
+      The specified <replaceable>wait_time</replaceable> must be timestamp.
+     </para>
+    </listitem>
+   </varlistentry>
+
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   Run <literal>WAIT FOR</literal> from <application>psql</application>,
+   limiting wait time to 10000 milliseconds:
+
+<screen>
+WAIT FOR LSN '0/3F07A6B1' TIMEOUT 10000;
+NOTICE:  LSN is not reached. Try to increase wait time.
+LSN reached
+-------------
+ f
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Wait until the specified <acronym>LSN</acronym> is replayed:
+<screen>
+WAIT FOR LSN '0/3F07A611';
+LSN reached
+-------------
+ t
+(1 row)
+</screen>
+  </para>
+
+  <para>
+   Limit <acronym>LSN</acronym> wait time to 500000 milliseconds,
+   and then cancel the command if <acronym>LSN</acronym> was not reached:
+<screen>
+WAIT FOR LSN '0/3F0FF791' TIMEOUT 500000;
+^CCancel request sent
+NOTICE:  LSN is not reached. Try to increase wait time.
+ERROR:  canceling statement due to user request
+ LSN reached
+-------------
+ f
+(1 row)
+</screen>
+</para>
+ </refsect1>
+
+ <refsect1>
+  <title>Compatibility</title>
+
+  <para>
+   There is no <command>WAIT FOR</command> statement in the SQL
+   standard.
+  </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index e11b4b6130..a83ff4551e 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -216,6 +216,7 @@
    &update;
    &vacuum;
    &values;
+   &wait;
 
  </reference>
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index c61566666a..5cc2196927 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1780,6 +1781,13 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for,
+			 * set latches in shared memory array to notify the waiter.
+			 */
+			if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWait())
+				WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 42cced9ebe..ec6ab7722a 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'wait.c',
 )
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..994f023f64
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,403 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2020, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void AddEvent(XLogRecPtr lsn_to_wait);
+static void DeleteEvent(void);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	pg_atomic_uint64	min_lsn;
+	slock_t		mutex;
+	XLogRecPtr	waited_lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static volatile WaitState *state;
+
+/* Add the event of the current backend to the shared memory array */
+static void
+AddEvent(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->waited_lsn[MyBackendId] = lsn_to_wait;
+
+	if (lsn_to_wait < state->min_lsn.value)
+		state->min_lsn.value = lsn_to_wait;
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete event of the current backend from the shared memory array.
+ *
+ * TODO: Consider state cleanup on backend failure.
+ * Check:
+ * 1) nomal|smart|fast|immediate stop
+ * 2) SIGKILL and SIGTERM
+ */
+static void
+DeleteEvent(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete = state->waited_lsn[MyBackendId];
+
+	state->waited_lsn[MyBackendId] = InvalidXLogRecPtr;
+
+	SpinLockAcquire(&state->mutex);
+
+	/* If we need to choose the next min_lsn, update state->min_lsn */
+	if (state->min_lsn.value == lsn_to_delete)
+	{
+		state->min_lsn.value = PG_UINT64_MAX;
+		for (i = 2; i <= state->backend_maxid; i++)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr &&
+				state->waited_lsn[i] < state->min_lsn.value)
+				state->min_lsn.value = state->waited_lsn[i];
+	}
+
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MyBackendId); i >= 2; i--)
+			if (state->waited_lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, waited_lsn);
+	size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/* Init array of events in shared memory */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < (MaxBackends + 1); i++)
+			state->waited_lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		state->min_lsn.value = PG_UINT64_MAX;
+	}
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int 		backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+	SpinLockRelease(&state->mutex);
+
+	for (i = 2; i <= backend_maxid; i++)
+	{
+		backend = BackendIdGetProc(i);
+		if (state->waited_lsn[i] != 0)
+		{
+			if (backend && state->waited_lsn[i] <= cur_lsn)
+				SetLatch(&backend->procLatch);
+		}
+	}
+}
+
+/* Get minimal LSN that will be next */
+XLogRecPtr
+GetMinWait(void)
+{
+	return pg_atomic_read_u64(&state->min_lsn);
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed,
+ * postmaster dies or timeout happens.
+ */
+int
+WaitUtility(XLogRecPtr lsn, const float8 secs)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	int			latch_events;
+	float8		endtime;
+	uint		res = 0;
+
+	if (!RecoveryInProgress())
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Work only in standby mode")));
+		return false;
+	}
+
+#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
+	endtime = GetNowFloat() + secs;
+
+	latch_events = WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	if (lsn != InvalidXLogRecPtr)
+	{
+		/* Just check if we reached */
+		if (lsn < cur_lsn || secs < 0)
+			return (lsn < cur_lsn);
+
+		latch_events |= WL_LATCH_SET;
+		AddEvent(lsn);
+	}
+	else if (!secs)
+		return 1;
+
+	for (;;)
+	{
+		int			rc;
+		float8		delay = 0;
+		long		delay_ms;
+
+		/* If LSN has been replayed */
+		if (lsn && lsn <= cur_lsn)
+			break;
+
+		if (secs > 0)
+			delay = endtime - GetNowFloat();
+		else if (secs == 0)
+			/*
+			* If we wait forever, then 1 minute timeout to check
+			* for Interupts.
+			*/
+			delay = 60;
+
+		if (delay > 0.0)
+			delay_ms = (long) ceil(delay * 1000.0);
+		else
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS,
+		 * then delete the current event from array.
+		 */
+		if (InterruptPending)
+		{
+			if (lsn != InvalidXLogRecPtr)
+				DeleteEvent();
+			ProcessInterrupts();
+		}
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+
+		if (lsn && rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+	}
+
+	if (lsn != InvalidXLogRecPtr)
+		DeleteEvent();
+
+	if (lsn != InvalidXLogRecPtr && lsn > cur_lsn)
+		elog(NOTICE,"LSN is not reached. Try to increase wait time.");
+	else
+		res = 1;
+
+	return res;
+}
+
+/*
+ * Get the amount of seconds left till the specified time.
+ */
+float8
+WaitTimeResolve(Const *time)
+{
+	int			ret;
+	float8		val;
+	Oid			types[] = { time->consttype };
+	Datum		values[] = { time->constvalue };
+	char		nulls[] = { " " };
+	Datum		result;
+	bool		isnull;
+
+	SPI_connect();
+
+	if (time->consttype == 1083)
+		ret = SPI_execute_with_args("select extract (epoch from ($1 - now()::time))",
+									1, types, values, nulls, true, 0);
+	else if (time->consttype == 1266)
+		ret = SPI_execute_with_args("select extract (epoch from (timezone('UTC',$1)::time - timezone('UTC', now()::timetz)::time))",
+									1, types, values, nulls, true, 0);
+	else
+		ret = SPI_execute_with_args("select extract (epoch from ($1 - now()))",
+									1, types, values, nulls, true, 0);
+
+	Assert(ret >= 0);
+	result = SPI_getbinval(SPI_tuptable->vals[0],
+						   SPI_tuptable->tupdesc,
+						   1, &isnull);
+
+	Assert(!isnull);
+	val = DatumGetFloat8(result);
+
+	elog(INFO, "time: %f", val);
+
+	SPI_finish();
+	return val;
+}
+
+/* Implementation of WAIT FOR */
+int
+WaitMain(WaitStmt *stmt, DestReceiver *dest)
+{
+	ListCell   *events;
+	TupleDesc	tupdesc;
+	TupOutputState *tstate;
+	float8		delay = 0;
+	float8		final_delay = 0;
+	XLogRecPtr	lsn = InvalidXLogRecPtr;
+	XLogRecPtr	final_lsn = InvalidXLogRecPtr;
+	bool		has_lsn = false;
+	bool		wait_forever = true;
+	int			res = 0;
+
+	if (stmt->strategy == WAIT_FOR_ANY)
+	{
+		/* Prepare to find minimum LSN and delay */
+		final_delay = DBL_MAX;
+		final_lsn = PG_UINT64_MAX;
+	}
+
+	/* Extract options from the statement node tree */
+	foreach(events, stmt->events)
+	{
+		WaitStmt   *event = (WaitStmt *) lfirst(events);
+
+		/* LSN to wait for */
+		if (event->lsn)
+		{
+			has_lsn = true;
+			lsn = DatumGetLSN(
+						DirectFunctionCall1(pg_lsn_in,
+							CStringGetDatum(event->lsn)));
+
+			/*
+			 * When waiting for ALL, select max LSN to wait for.
+			 * When waiting for ANY, select min LSN to wait for.
+			 */
+			if ((stmt->strategy == WAIT_FOR_ALL && final_lsn <= lsn) ||
+				(stmt->strategy == WAIT_FOR_ANY && final_lsn > lsn))
+			{
+				final_lsn = lsn;
+			}
+		}
+
+		/* Time delay to wait for */
+		if (event->time || event->delay)
+		{
+			wait_forever = false;
+
+			if (event->delay)
+				delay = event->delay / 1000.0;
+
+			if (event->time)
+			{
+				Const *time = (Const *) event->time;
+				delay = WaitTimeResolve(time);
+			}
+
+			/*
+			 * When waiting for ALL, select max delay to wait for.
+			 * When waiting for ANY, select min delay to wait for.
+			 */
+			if ((stmt->strategy == WAIT_FOR_ALL && final_delay <= delay) ||
+				(stmt->strategy == WAIT_FOR_ANY && final_delay > delay))
+			{
+				final_delay = delay;
+			}
+		}
+	}
+	if (wait_forever)
+		final_delay = 0;
+	if (!has_lsn)
+		final_lsn = InvalidXLogRecPtr;
+
+	res = WaitUtility(final_lsn, final_delay);
+
+	/* Need a tuple descriptor representing a single TEXT column */
+	tupdesc = CreateTemplateTupleDesc(1);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "LSN reached", TEXTOID, -1, 0);
+	/* Prepare for projection of tuples */
+	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsMinimalTuple);
+
+	/* Send it */
+	do_text_output_oneline(tstate, res?"t":"f");
+	end_tup_output(tstate);
+	return res;
+}
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 7a1dfb6364..f1fe335bef 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -85,6 +85,7 @@ static Query *transformCreateTableAsStmt(ParseState *pstate,
 										 CreateTableAsStmt *stmt);
 static Query *transformCallStmt(ParseState *pstate,
 								CallStmt *stmt);
+static void transformWaitForStmt(ParseState *pstate, WaitStmt *stmt);
 static void transformLockingClause(ParseState *pstate, Query *qry,
 								   LockingClause *lc, bool pushedDown);
 #ifdef RAW_EXPRESSION_COVERAGE_TEST
@@ -405,7 +406,20 @@ transformStmt(ParseState *pstate, Node *parseTree)
 			result = transformCallStmt(pstate,
 									   (CallStmt *) parseTree);
 			break;
-
+		case T_WaitStmt:
+			transformWaitForStmt(pstate, (WaitStmt *) parseTree);
+			result = makeNode(Query);
+			result->commandType = CMD_UTILITY;
+			result->utilityStmt = (Node *) parseTree;
+			break;
+		case T_TransactionStmt:
+			{
+				TransactionStmt *stmt = (TransactionStmt *) parseTree;
+				if ((stmt->kind == TRANS_STMT_BEGIN ||
+						stmt->kind == TRANS_STMT_START) && stmt->wait)
+					transformWaitForStmt(pstate, (WaitStmt *) stmt->wait);
+			}
+			/* no break here - we want to fall through to the default */
 		default:
 
 			/*
@@ -3559,6 +3573,23 @@ applyLockingClause(Query *qry, Index rtindex,
 	qry->rowMarks = lappend(qry->rowMarks, rc);
 }
 
+/*
+ * transformWaitForStmt -
+ *	transform the WAIT FOR clause of the BEGIN statement
+ *	transform the WAIT FOR statement (TODO: remove this line if we don't keep it)
+ */
+static void
+transformWaitForStmt(ParseState *pstate, WaitStmt *stmt)
+{
+	ListCell   *events;
+
+	foreach(events, stmt->events)
+	{
+		WaitStmt   *event = (WaitStmt *) lfirst(events);
+		event->time = transformExpr(pstate, event->time, EXPR_KIND_OTHER);
+	}
+}
+
 /*
  * Coverage testing for raw_expression_tree_walker().
  *
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index d631ac89a9..b5d86ff32b 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -312,7 +312,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 		SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
 		UnlistenStmt UpdateStmt VacuumStmt
 		VariableResetStmt VariableSetStmt VariableShowStmt
-		ViewStmt CheckPointStmt CreateConversionStmt
+		ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
 		DeallocateStmt PrepareStmt ExecuteStmt
 		DropOwnedStmt ReassignOwnedStmt
 		AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -536,7 +536,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	case_expr case_arg when_clause case_default
 %type <list>	when_clause_list
 %type <node>	opt_search_clause opt_cycle_clause
-%type <ival>	sub_type opt_materialized
+%type <ival>	sub_type wait_strategy opt_materialized
 %type <node>	NumericOnly
 %type <list>	NumericOnly_list
 %type <alias>	alias_clause opt_alias_clause opt_alias_clause_for_join_using
@@ -644,6 +644,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <partboundspec> PartitionBoundSpec
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
+%type <list>		wait_list
+%type <node>		WaitEvent wait_for
 
 %type <node>	json_format_clause_opt
 				json_value_expr
@@ -729,7 +731,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN
 
 	MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE METHOD
 	MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -763,7 +765,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P SYSTEM_USER
 
 	TABLE TABLES TABLESAMPLE TABLESPACE TEMP TEMPLATE TEMPORARY TEXT_P THEN
-	TIES TIME TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
+	TIES TIME TIMEOUT TIMESTAMP TO TRAILING TRANSACTION TRANSFORM
 	TREAT TRIGGER TRIM TRUE_P
 	TRUNCATE TRUSTED TYPE_P TYPES_P
 
@@ -773,7 +775,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAIT WHEN WHERE WHITESPACE_P WINDOW
+	WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
 	XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -1102,6 +1105,7 @@ stmt:
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| WaitStmt
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -10911,12 +10915,13 @@ TransactionStmt:
 					n->location = -1;
 					$$ = (Node *) n;
 				}
-			| START TRANSACTION transaction_mode_list_or_empty
+			| START TRANSACTION transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 
 					n->kind = TRANS_STMT_START;
 					n->options = $3;
+					n->wait = $4;
 					n->location = -1;
 					$$ = (Node *) n;
 				}
@@ -11015,12 +11020,13 @@ TransactionStmt:
 		;
 
 TransactionStmtLegacy:
-			BEGIN_P opt_transaction transaction_mode_list_or_empty
+			BEGIN_P opt_transaction transaction_mode_list_or_empty wait_for
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
 
 					n->kind = TRANS_STMT_BEGIN;
 					n->options = $3;
+					n->wait = $4;
 					n->location = -1;
 					$$ = (Node *) n;
 				}
@@ -15854,6 +15860,74 @@ xml_passing_mech:
 			| BY VALUE_P
 		;
 
+/*****************************************************************************
+ *
+ *		QUERY:
+ *				WAIT FOR <event> [, <event> ...]
+ *				event is one of:
+ *					LSN value
+ *					TIMEOUT delay
+ *					timestamp
+ *
+ *****************************************************************************/
+WaitStmt:
+			WAIT FOR wait_strategy wait_list
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->strategy = $3;
+					n->events = $4;
+					$$ = (Node *)n;
+				}
+			;
+wait_for:
+			WAIT FOR wait_strategy wait_list
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->strategy = $3;
+					n->events = $4;
+					$$ = (Node *)n;
+				}
+			| /* EMPTY */		{ $$ = NULL; };
+
+wait_strategy:
+			ALL					{ $$ = WAIT_FOR_ALL; }
+			| ANY				{ $$ = WAIT_FOR_ANY; }
+			| /* EMPTY */		{ $$ = WAIT_FOR_ALL; }
+		;
+
+wait_list:
+			WaitEvent					{ $$ = list_make1($1); }
+			| wait_list ',' WaitEvent	{ $$ = lappend($1, $3); }
+			| wait_list WaitEvent		{ $$ = lappend($1, $2); }
+		;
+
+WaitEvent:
+			LSN Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = $2;
+					n->delay = 0;
+					n->time = NULL;
+					$$ = (Node *)n;
+				}
+			| TIMEOUT Iconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = NULL;
+					n->delay = $2;
+					n->time = NULL;
+					$$ = (Node *)n;
+				}
+			| ConstDatetime Sconst
+				{
+					WaitStmt *n = makeNode(WaitStmt);
+					n->lsn = NULL;
+					n->delay = 0;
+					n->time = makeStringConstCast($2, @2, $1);
+					$$ = (Node *)n;
+				}
+			;
+
 
 /*
  * Aggregate decoration clauses
@@ -17239,6 +17313,7 @@ unreserved_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATCHED
@@ -17371,6 +17446,7 @@ unreserved_keyword:
 			| TEMPORARY
 			| TEXT_P
 			| TIES
+			| TIMEOUT
 			| TRANSACTION
 			| TRANSFORM
 			| TRIGGER
@@ -17397,6 +17473,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
@@ -17829,6 +17906,7 @@ bare_label_keyword:
 			| LOCK_P
 			| LOCKED
 			| LOGGED
+			| LSN
 			| MAPPING
 			| MATCH
 			| MATCHED
@@ -17991,6 +18069,7 @@ bare_label_keyword:
 			| THEN
 			| TIES
 			| TIME
+			| TIMEOUT
 			| TIMESTAMP
 			| TRAILING
 			| TRANSACTION
@@ -18028,6 +18107,7 @@ bare_label_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAIT
 			| WHEN
 			| WHITESPACE_P
 			| WORK
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2225a4a6e6..aaa2dc3388 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -145,6 +146,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, AsyncShmemSize());
 	size = add_size(size, StatsShmemSize());
 	size = add_size(size, WaitEventExtensionShmemSize());
+	size = add_size(size, WaitShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -345,6 +347,11 @@ CreateOrAttachShmemStructs(void)
 	AsyncShmemInit();
 	StatsShmemInit();
 	WaitEventExtensionShmemInit();
+
+	/*
+	 * Init array of Latches in shared memory for WAIT
+	 */
+	WaitShmemInit();
 }
 
 /*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 366a27ae8e..2abe394fad 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,6 +15,7 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include <float.h>
 
 #include "access/htup_details.h"
 #include "access/reloptions.h"
@@ -59,6 +60,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -72,6 +74,9 @@
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
 
 /* Hook for plugins to get control in ProcessUtility() */
 ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -272,6 +277,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
 		case T_LoadStmt:
 		case T_PrepareStmt:
 		case T_UnlistenStmt:
+		case T_WaitStmt:
 		case T_VariableSetStmt:
 			{
 				/*
@@ -612,6 +618,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_START:
 						{
 							ListCell   *lc;
+							WaitStmt   *waitstmt = (WaitStmt *) stmt->wait;
+
+							/* If needed to WAIT FOR something but failed */
+							if (stmt->wait && WaitMain(waitstmt, dest) == 0)
+								break;
 
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
@@ -1069,6 +1080,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 				break;
 			}
 
+		case T_WaitStmt:
+			{
+				WaitStmt   *stmt = (WaitStmt *) parsetree;
+				WaitMain(stmt, dest);
+				break;
+			}
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2847,6 +2865,10 @@ CreateCommandTag(Node *parsetree)
 			tag = CMDTAG_NOTIFY;
 			break;
 
+		case T_WaitStmt:
+			tag = CMDTAG_WAIT;
+			break;
+
 		case T_ListenStmt:
 			tag = CMDTAG_LISTEN;
 			break;
@@ -3495,6 +3517,10 @@ GetCommandLogLevel(Node *parsetree)
 			lev = LOGSTMT_ALL;
 			break;
 
+		case T_WaitStmt:
+			lev = LOGSTMT_ALL;
+			break;
+
 		case T_ListenStmt:
 			lev = LOGSTMT_ALL;
 			break;
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..0270160d44
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern int WaitUtility(XLogRecPtr lsn, const float8 delay);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWait(void);
+extern float8 WaitTimeResolve(Const *time);
+extern int WaitMain(WaitStmt *stmt, DestReceiver *dest);
+
+#endif   /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index e494309da8..58e8b14a1b 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3524,6 +3524,7 @@ typedef struct TransactionStmt
 	/* for two-phase-commit related commands */
 	char	   *gid pg_node_attr(query_jumble_ignore);
 	bool		chain;			/* AND CHAIN option */
+	Node	   *wait;			/* WAIT clause: list of events to wait for */
 	/* token location, or -1 if unknown */
 	int			location pg_node_attr(query_jumble_location);
 } TransactionStmt;
@@ -4075,4 +4076,26 @@ typedef struct DropSubscriptionStmt
 	DropBehavior behavior;		/* RESTRICT or CASCADE behavior */
 } DropSubscriptionStmt;
 
+/* ----------------------
+ *		WAIT FOR Statement + WAIT FOR clause of BEGIN statement
+ *		TODO: if we only pick one, remove the other
+ * ----------------------
+ */
+
+typedef enum WaitForStrategy
+{
+	WAIT_FOR_ANY = 0,
+	WAIT_FOR_ALL
+} WaitForStrategy;
+
+typedef struct WaitStmt
+{
+	NodeTag		type;
+	WaitForStrategy strategy;
+	List	   *events;		/* used as a pointer to the next WAIT event */
+	char	   *lsn;		/* WAIT FOR LSN */
+	int			delay;		/* WAIT FOR TIMEOUT */
+	Node	   *time;		/* WAIT FOR TIMESTAMP or TIME */
+} WaitStmt;
+
 #endif							/* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 5984dcfa4b..dadcd3b267 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -260,6 +260,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("lsn", LSN, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -433,6 +434,7 @@ PG_KEYWORD("text", TEXT_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("then", THEN, RESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("ties", TIES, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("time", TIME, COL_NAME_KEYWORD, BARE_LABEL)
+PG_KEYWORD("timeout", TIMEOUT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("timestamp", TIMESTAMP, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("to", TO, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("trailing", TRAILING, RESERVED_KEYWORD, BARE_LABEL)
@@ -473,6 +475,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index 320ee91512..7bcfebde38 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
 PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
 PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
 PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT FOR", false, false, false)
diff --git a/src/test/recovery/t/040_begin_wait.pl b/src/test/recovery/t/040_begin_wait.pl
new file mode 100644
index 0000000000..f1e5b5b23d
--- /dev/null
+++ b/src/test/recovery/t/040_begin_wait.pl
@@ -0,0 +1,146 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "BEGIN WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+	"WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+#===============================================================================
+# TODO: remove this test if we remove the standalone "WAIT FOR" command
+#===============================================================================
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ WAIT FOR LSN '$lsn3';
+	SELECT max(a) FROM wait_test;
+	BEGIN WAIT FOR LSN '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+	"WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+	"WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+	"BEGIN WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+	"WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/test/recovery/t/041_wait.pl b/src/test/recovery/t/041_wait.pl
new file mode 100644
index 0000000000..6f9d549416
--- /dev/null
+++ b/src/test/recovery/t/041_wait.pl
@@ -0,0 +1,145 @@
+# Checks WAIT FOR
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+
+# Make sure that WAIT FOR LSN works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that WAIT is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '$lsn1'");
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+my $lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn1'::pg_lsn)");
+ok($compare_lsns eq 0, "standby reached the same LSN as primary after WAIT");
+
+
+
+# Check that timeouts work on their own and let us wait for specified time (1s)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $one_second = 1000; # in milliseconds
+my $start_time = time();
+# While we're at it, also make sure that the syntax with commas works fine and
+# that by default we use WAIT FOR ALL strategy, which means waiting for max time
+$node_standby->safe_psql('postgres',
+	"WAIT FOR TIMEOUT $one_second, TIMESTAMP '$current_time'");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $one_second, "WAIT FOR TIMEOUT waits for enough time");
+
+# Now, check that timeouts work as expected when waiting for LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"WAIT FOR LSN '$lsn2' TIMEOUT 1");
+ok($reached_lsn eq "f", "WAIT doesn't reach LSN if given too little wait time");
+
+
+
+# We need to check that WAIT works fine inside transactions. For that, let's
+# get two LSNs that will correspond to two different max values in our table.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn3 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Before starting transaction, wait for LSN which ensures a max value of 40.
+# Inside the transaction, wait for LSN that ensures a max value of 50.
+# Due to ISOLATION LEVEL REPEATABLE READ, we should NOT see the new max value.
+my $standby_results = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '$lsn3';
+	BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
+	SELECT max(a) FROM wait_test;
+	WAIT FOR LSN '$lsn4';
+	SELECT pg_last_wal_replay_lsn();
+	SELECT max(a) FROM wait_test;
+	COMMIT;
+]);
+
+# Make sure that we indeed reach primary's last LSN inside the transaction.
+# For that, check that calling pg_last_wal_replay_lsn returned that LSN.
+my $last_lsn_reached = $standby_results =~ /$lsn4/;
+ok($last_lsn_reached, "WAIT FOR LSN works inside a transaction");
+
+# Check that transaction doesn't break and show us the new max value after WAIT.
+# For that, make sure that the older max value is repeated twice in the results.
+my $count = () = $standby_results =~ /40/g;
+ok($count eq 2, "transaction isolation level doesn't get broken due to WAIT");
+
+
+
+# Get multiple LSNs for testing WAIT FOR ANY / WAIT FOR ALL
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 70000))");
+my $lsn6 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(61, 800000))");
+my $lsn7 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Check that WAIT FOR ANY works fine
+$node_standby->safe_psql('postgres',
+	"WAIT FOR ANY LSN '$lsn5' LSN '$lsn6' LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn5'::pg_lsn)");
+ok($compare_lsns ge 0,
+	"WAIT FOR ANY makes us reach at least the minimum LSN from the list");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+# TODO: Could this somehow fail due to the machine being very fast at applying LSN?
+ok($compare_lsns lt 0,
+	"WAIT FOR ANY didn't make us reach the maximum LSN from the list");
+
+# Check that WAIT FOR ALL works fine
+$node_standby->safe_psql('postgres',
+	"WAIT FOR ALL LSN '$lsn5', LSN '$lsn6', LSN '$lsn7'");
+$lsn_standby = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+$compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$lsn_standby'::pg_lsn, '$lsn7'::pg_lsn)");
+ok($compare_lsns eq 0,
+	"WAIT FOR ALL makes us reach the maximum LSN from the list");
+
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index c61566666a..edbbe208cb 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/wait.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1780,6 +1781,15 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for,
+			 * set latches in shared memory array to notify the waiter.
+			 */
+			if (XLogRecoveryCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN())
+			{
+				WaitSetLatch(XLogRecoveryCtl->lastReplayedEndRecPtr);
+			}
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..d8f6965d8c 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	wait.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 42cced9ebe..ec6ab7722a 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'wait.c',
 )
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 0000000000..4994e5f4c3
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,286 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ *	  Implements wait lsn, which allows waiting for events such as
+ *	  LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2023, Regents of PostgresPro
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogdefs.h"
+#include "commands/wait.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/backendid.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+
+/* Add to shared memory array */
+static void AddWaitedLSN(XLogRecPtr lsn_to_wait);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			backend_maxid;
+	pg_atomic_uint64 min_lsn; /* XLogRecPtr of minimal waited for LSN */
+	slock_t		mutex;
+	/* LSNs that different backends are waiting */
+	XLogRecPtr	lsn[FLEXIBLE_ARRAY_MEMBER];
+} WaitState;
+
+static WaitState *state;
+
+/*
+ * Add the wait event of the current backend to shared memory array
+ */
+static void
+AddWaitedLSN(XLogRecPtr lsn_to_wait)
+{
+	SpinLockAcquire(&state->mutex);
+	if (state->backend_maxid < MyBackendId)
+		state->backend_maxid = MyBackendId;
+
+	state->lsn[MyBackendId] = lsn_to_wait;
+
+	if (lsn_to_wait < state->min_lsn.value)
+		state->min_lsn.value = lsn_to_wait;
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete wait event of the current backend from the shared memory array.
+ */
+void
+DeleteWaitedLSN(void)
+{
+	int			i;
+	XLogRecPtr	lsn_to_delete;
+
+	SpinLockAcquire(&state->mutex);
+
+	lsn_to_delete = state->lsn[MyBackendId];
+	state->lsn[MyBackendId] = InvalidXLogRecPtr;
+
+	/* If we are deleting the minimal LSN, then choose the next min_lsn */
+	if (lsn_to_delete != InvalidXLogRecPtr &&
+		lsn_to_delete == state->min_lsn.value)
+	{
+		state->min_lsn.value = PG_UINT64_MAX;
+		for (i = 2; i <= state->backend_maxid; i++)
+			if (state->lsn[i] != InvalidXLogRecPtr &&
+				state->lsn[i] < state->min_lsn.value)
+				state->min_lsn.value = state->lsn[i];
+	}
+
+	/* If deleting from the end of the array, shorten the array's used part */
+	if (state->backend_maxid == MyBackendId)
+		for (i = (MyBackendId); i >= 2; i--)
+			if (state->lsn[i] != InvalidXLogRecPtr)
+			{
+				state->backend_maxid = i;
+				break;
+			}
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Report amount of shared memory space needed for WaitState
+ */
+Size
+WaitShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitState, lsn);
+	size = add_size(size, mul_size(MaxBackends + 1, sizeof(XLogRecPtr)));
+	return size;
+}
+
+/*
+ * Initialize an array of events to wait for in shared memory
+ */
+void
+WaitShmemInit(void)
+{
+	bool		found;
+	uint32		i;
+
+	state = (WaitState *) ShmemInitStruct("pg_wait_lsn",
+										  WaitShmemSize(),
+										  &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+
+		for (i = 0; i < (MaxBackends + 1); i++)
+			state->lsn[i] = InvalidXLogRecPtr;
+
+		state->backend_maxid = 0;
+		state->min_lsn.value = PG_UINT64_MAX;
+	}
+}
+
+/*
+ * Set latches in shared memory to signal that new LSN has been replayed
+ */
+void
+WaitSetLatch(XLogRecPtr cur_lsn)
+{
+	uint32		i;
+	int			backend_maxid;
+	PGPROC	   *backend;
+
+	SpinLockAcquire(&state->mutex);
+	backend_maxid = state->backend_maxid;
+
+	for (i = 2; i <= backend_maxid; i++)
+	{
+		backend = BackendIdGetProc(i);
+
+		if (backend && state->lsn[i] != 0 &&
+			state->lsn[i] <= cur_lsn)
+		{
+			SetLatch(&backend->procLatch);
+		}
+	}
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Get minimal LSN that someone waits for
+ */
+XLogRecPtr
+GetMinWaitedLSN(void)
+{
+	return pg_atomic_read_u64(&state->min_lsn);
+}
+
+/*
+ * On WAIT use a latch to wait till LSN is replayed,
+ * postmaster dies or timeout happens. Timeout is specified in milliseconds.
+ * Returns true if LSN was reached and false otherwise.
+ */
+bool
+WaitUtility(XLogRecPtr target_lsn, const int timeout_ms)
+{
+	XLogRecPtr	cur_lsn = GetXLogReplayRecPtr(NULL);
+	TransactionId	xmin = MyProc->xmin;
+	int			latch_events;
+	float8		endtime;
+	bool		res = false;
+	bool		wait_forever = (timeout_ms <= 0);
+
+	if (!RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("Work only in standby mode")));
+
+	/*
+	 * In transactions, that have isolation level repeatable read or higher
+	 * wait lsn creates a snapshot if called first in a block, which can
+	 * lead the transaction to working incorrectly
+	 */
+
+	if (IsTransactionBlock() && XactIsoLevel != XACT_READ_COMMITTED) {
+		ereport(WARNING,
+				errmsg("Waitlsn may work incorrectly in this isolation level"),
+				errhint("Call wait lsn before starting the transaction"));
+	}
+
+	endtime = GetNowFloat() + timeout_ms / 1000.0;
+
+	latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+	/* Check if we already reached the needed LSN */
+	if (cur_lsn >= target_lsn)
+		return true;
+
+	/* A little hack similar to SnapshotResetXmin to work out of snapshot */
+	MyProc->xmin = InvalidTransactionId;
+
+	AddWaitedLSN(target_lsn);
+
+	for (;;)
+	{
+		int			rc;
+		float8		time_left = 0;
+		long		time_left_ms = 0;
+
+		/* If LSN has been replayed */
+		if (target_lsn <= cur_lsn)
+			break;
+
+		time_left = endtime - GetNowFloat();
+
+		/* Use 100 ms as the default timeout to check for interrupts */
+		if (wait_forever || time_left < 0 || time_left > 0.1)
+			time_left_ms = 100;
+		else
+			time_left_ms = (long) ceil(time_left * 1000.0);
+
+		/* If interrupt, LockErrorCleanup() will do DeleteWaitedLSN() for us */
+		CHECK_FOR_INTERRUPTS();
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, time_left_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		ResetLatch(MyLatch);
+
+		if (rc & WL_LATCH_SET)
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+
+		if (rc & WL_TIMEOUT)
+		{
+			cur_lsn = GetXLogReplayRecPtr(NULL);
+			/* If the time specified by user has passed, stop waiting */
+			time_left = endtime - GetNowFloat();
+			if (!wait_forever && time_left <= 0.0)
+				break;
+		}
+	}
+
+	MyProc->xmin = xmin;
+	DeleteWaitedLSN();
+
+	if (cur_lsn < target_lsn)
+		ereport(WARNING,
+				 errmsg("LSN was not reached"),
+				 errhint("Try to increase wait time."));
+	else
+		res = true;
+
+	return res;
+}
+
+Datum
+pg_wait_lsn(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr		trg_lsn = PG_GETARG_LSN(0);
+	uint64_t		delay = PG_GETARG_INT32(1);
+
+	PG_RETURN_BOOL(WaitUtility(trg_lsn, delay));
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2225a4a6e6..f6d2e5ba21 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -144,6 +145,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, SyncScanShmemSize());
 	size = add_size(size, AsyncShmemSize());
 	size = add_size(size, StatsShmemSize());
+	size = add_size(size, WaitShmemSize());
 	size = add_size(size, WaitEventExtensionShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
@@ -237,6 +239,11 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize subsystems */
 	CreateOrAttachShmemStructs();
 
+	/*
+	 * Init array of events for the wait clause in shared memory
+	 */
+	WaitShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b6451d9d08..d4521e079f 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "commands/wait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -731,6 +732,9 @@ LockErrorCleanup(void)
 
 	AbortStrongLockAcquire();
 
+	/* If wait lsn was interrupted, then stop waiting for that LSN */
+	DeleteWaitedLSN();
+
 	/* Nothing to do if we weren't waiting for a lock */
 	if (lockAwaited == NULL)
 	{
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 5d78d6dc06..fbc96db198 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -384,8 +384,6 @@ pg_sleep(PG_FUNCTION_ARGS)
 	 * less than the specified time when WaitLatch is terminated early by a
 	 * non-query-canceling signal such as SIGHUP.
 	 */
-#define GetNowFloat()	((float8) GetCurrentTimestamp() / 1000000.0)
-
 	endtime = GetNowFloat() + secs;
 
 	for (;;)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fb58dee3bc..0859ec6682 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12092,6 +12092,11 @@
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
 
+{ oid => '16387', descr => 'wait for LSN until timeout',
+  proname => 'pg_wait_lsn', prorettype => 'bool', proargtypes => 'pg_lsn int8',
+  proargnames => '{trg_lsn,delay}',
+  prosrc => 'pg_wait_lsn' },
+
 { oid => '6291', descr => 'arbitrary value from among input values',
   proname => 'any_value', prokind => 'a', proisstrict => 'f',
   prorettype => 'anyelement', proargtypes => 'anyelement',
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 0000000000..bd52664de9
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2023, Regents of PostgresPro
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+#include "postgres.h"
+#include "tcop/dest.h"
+#include "nodes/parsenodes.h"
+
+extern bool WaitUtility(XLogRecPtr lsn, const int timeout_ms);
+extern Size WaitShmemSize(void);
+extern void WaitShmemInit(void);
+extern void WaitSetLatch(XLogRecPtr cur_lsn);
+extern XLogRecPtr GetMinWaitedLSN(void);
+extern void DeleteWaitedLSN(void);
+
+#endif							/* WAIT_H */
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index c4dd96c8c9..6d763d1175 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -144,4 +144,6 @@ extern int	date2isoyearday(int year, int mon, int mday);
 
 extern bool TimestampTimestampTzRequiresRewrite(void);
 
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
 #endif							/* TIMESTAMP_H */
diff --git a/src/test/recovery/t/040_waitlsn.pl b/src/test/recovery/t/040_waitlsn.pl
new file mode 100644
index 0000000000..dc9e899671
--- /dev/null
+++ b/src/test/recovery/t/040_waitlsn.pl
@@ -0,0 +1,76 @@
+# Checks wait lsn
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Using the backup, create a streaming standby with a 1 second delay
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay        = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+# Check that timeouts make us wait for the specified time (1s here)
+my $current_time = $node_standby->safe_psql('postgres', "SELECT now()");
+my $two_seconds = 2000; # in milliseconds
+my $start_time = time();
+$node_standby->safe_psql('postgres',
+	"SELECT pg_wait_lsn('0/FFFFFFFF', $two_seconds)");
+my $time_waited = (time() - $start_time) * 1000; # convert to milliseconds
+ok($time_waited >= $two_seconds, "wait lsn waits for enough time");
+
+# Check that timeouts let us stop waiting right away, before reaching target LSN
+# Wait for no wait
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my ($ret, $out, $err) = $node_standby->psql('postgres',
+	"SELECT pg_wait_lsn('$lsn1', 1)");
+
+ok($ret == 0, "zero return value when failed to wait lsn on standby");
+ok($err =~ /WARNING:  LSN was not reached/,
+	"correct error message when failed to wait lsn on standby");
+ok($out eq "f", "if given too little wait time, WAIT doesn't reach target LSN");
+
+
+# Check that wait lsn works fine and reaches target LSN if given no timeout
+# Wait for infinite
+
+# Add data on primary, memorize primary's last LSN
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+
+# Wait for it to appear on replica, memorize replica's last LSN
+$node_standby->safe_psql('postgres',
+	"SELECT pg_wait_lsn('$lsn2', 0)");
+my $reached_lsn = $node_standby->safe_psql('postgres',
+	"SELECT pg_last_wal_replay_lsn()");
+
+# Make sure that primary's and replica's LSNs are the same after WAIT
+my $compare_lsns = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp('$reached_lsn'::pg_lsn, '$lsn2'::pg_lsn)");
+ok($compare_lsns eq 0,
+	"standby reached the same LSN as primary before starting transaction");
+
+$node_standby->stop;
+$node_primary->stop;
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d659adbfd6..9cdcebce3c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3015,6 +3015,7 @@ WaitEventIPC
 WaitEventSet
 WaitEventTimeout
 WaitPMResult
+WaitState
 WalCloseMethod
 WalCompression
 WalLevel

Reply via email to