From f9d41b7158509d6cab21276f220dbebd85ca67e1 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorotkov@postgresql.org>
Date: Tue, 5 Mar 2024 13:35:03 +0200
Subject: [PATCH v9] Implement AFTER clause for BEGIN command

The new clause is to be used on standby and specifies waiting for the specific
WAL location to be replayed before starting the transaction.   This option is
useful when the user makes some data changes on primary and needs a guarantee
to see these changes on standby.

The queue of waiters is stored in the shared memory array sorted by LSN.
During replay of WAL waiters whose LSNs are already replayed are deleted from
the shared memory array and woken up by setting of their latches.

Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru
Author: Kartyshov Ivan, Alexander Korotkov
Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila
---
 doc/src/sgml/ref/begin.sgml               |  44 +++-
 doc/src/sgml/ref/start_transaction.sgml   |   5 +-
 src/backend/access/transam/xlogrecovery.c |   7 +
 src/backend/commands/Makefile             |   3 +-
 src/backend/commands/meson.build          |   1 +
 src/backend/commands/waitlsn.c            | 305 ++++++++++++++++++++++
 src/backend/parser/gram.y                 |  32 +++
 src/backend/storage/ipc/ipci.c            |   7 +
 src/backend/storage/lmgr/proc.c           |   6 +
 src/backend/tcop/utility.c                |  12 +
 src/include/commands/waitlsn.h            |  25 ++
 src/include/nodes/parsenodes.h            |   2 +
 src/test/recovery/meson.build             |   1 +
 src/test/recovery/t/043_begin_after.pl    |  62 +++++
 src/tools/pgindent/typedefs.list          |   2 +
 15 files changed, 511 insertions(+), 3 deletions(-)
 create mode 100644 src/backend/commands/waitlsn.c
 create mode 100644 src/include/commands/waitlsn.h
 create mode 100644 src/test/recovery/t/043_begin_after.pl

diff --git a/doc/src/sgml/ref/begin.sgml b/doc/src/sgml/ref/begin.sgml
index 016b0214874..4ed7002683e 100644
--- a/doc/src/sgml/ref/begin.sgml
+++ b/doc/src/sgml/ref/begin.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+    AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN <replaceable class="parameter">number_of_milliseconds</replaceable> ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -78,6 +81,36 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><literal>AFTER</literal> <replaceable class="parameter">lsn_value</replaceable></term>
+    <listitem>
+     <para>
+       <command>AFTER</command> clause is used on standby in
+       <link linkend="streaming-replication">physical streaming replication</link>
+       and specifies waiting for the specific WAL location (<acronym>LSN</acronym>)
+       to be replayed before starting the transaction.
+     </para>
+     <para>
+       This option is useful when the user makes some data changes on primary
+       and needs a guarantee to see these changes on standby.  The LSN to wait
+       could be obtained on the primary using
+       <link linkend="functions-admin-backup"><function>pg_current_wal_insert_lsn</function></link>
+       function after committing the relevant changes.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>WITHIN</literal> <replaceable class="parameter">number_of_milliseconds</replaceable></term>
+    <listitem>
+     <para>
+       Provides the timeout for the <command>AFTER</command> clause.
+       Especially helpful to prevent freezing on streaming replication
+       connection failures.
+     </para>
+    </listitem>
+   </varlistentry>
   </variablelist>
 
   <para>
@@ -123,6 +156,15 @@ BEGIN [ WORK | TRANSACTION ] [ <replaceable class="parameter">transaction_mode</
 <programlisting>
 BEGIN;
 </programlisting></para>
+
+  <para>
+   To begin a transaction block after replaying the given <acronym>LSN</acronym>.
+   The command will be canceled if the given <acronym>LSN</acronym> is not
+   reached within the timeout of one second.
+<programlisting>
+BEGIN AFTER '0/3F0FF791' WITHIN 1000;
+</programlisting></para>
+
  </refsect1>
 
  <refsect1>
diff --git a/doc/src/sgml/ref/start_transaction.sgml b/doc/src/sgml/ref/start_transaction.sgml
index 74ccd7e3456..46a3bcf1a80 100644
--- a/doc/src/sgml/ref/start_transaction.sgml
+++ b/doc/src/sgml/ref/start_transaction.sgml
@@ -21,13 +21,16 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ]
+START TRANSACTION [ <replaceable class="parameter">transaction_mode</replaceable> [, ...] ] <replaceable class="parameter">wait_event</replaceable>
 
 <phrase>where <replaceable class="parameter">transaction_mode</replaceable> is one of:</phrase>
 
     ISOLATION LEVEL { SERIALIZABLE | REPEATABLE READ | READ COMMITTED | READ UNCOMMITTED }
     READ WRITE | READ ONLY
     [ NOT ] DEFERRABLE
+
+<phrase>where <replaceable class="parameter">wait_event</replaceable> is:</phrase>
+    AFTER <replaceable class="parameter">lsn_value</replaceable> [ WITHIN number_of_milliseconds ]
 </synopsis>
  </refsynopsisdiv>
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 29c5bec0847..c211230f6b5 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/waitlsn.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1828,6 +1829,12 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for, set latches
+			 * in shared memory array to notify the waiter.
+			 */
+			WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91c..cede90c3b98 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	waitlsn.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abde..7549be5dc3b 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'waitlsn.c',
 )
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 00000000000..fd7c8f44ee7
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,305 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ *	  Implements WAIT FOR, which allows waiting for events such as
+ *	  time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include <float.h>
+#include <math.h>
+#include "postgres.h"
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/waitlsn.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/timestamp.h"
+#include "executor/spi.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void addLSNWaiter(XLogRecPtr lsn_to_wait);
+static void deleteLSNWaiter(void);
+
+/* Shared memory structure */
+typedef struct
+{
+	int			procnum;
+	XLogRecPtr	waitLSN;
+} WaitLSNProcInfo;
+
+typedef struct
+{
+	pg_atomic_uint64 minLSN;
+	slock_t		mutex;
+	int			numWaitedProcs;
+	WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+static WaitLSNState *state = NULL;
+static volatile sig_atomic_t haveShmemItem = false;
+
+/*
+ * Report the amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitLSNState, procInfos);
+	size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+	return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory */
+void
+WaitLSNShmemInit(void)
+{
+	bool		found;
+
+	state = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+											 WaitLSNShmemSize(),
+											 &found);
+	if (!found)
+	{
+		SpinLockInit(&state->mutex);
+		state->numWaitedProcs = 0;
+		pg_atomic_init_u64(&state->minLSN, PG_UINT64_MAX);
+	}
+}
+
+/*
+ * Add the information about the LSN waiter backend to the shared memory
+ * array.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+	WaitLSNProcInfo cur;
+	int			i;
+
+	SpinLockAcquire(&state->mutex);
+
+	cur.procnum = MyProcNumber;
+	cur.waitLSN = lsn;
+
+	for (i = 0; i < state->numWaitedProcs; i++)
+	{
+		if (state->procInfos[i].waitLSN >= cur.waitLSN)
+		{
+			WaitLSNProcInfo tmp;
+
+			tmp = state->procInfos[i];
+			state->procInfos[i] = cur;
+			cur = tmp;
+		}
+	}
+	state->procInfos[i] = cur;
+	state->numWaitedProcs++;
+
+	pg_atomic_write_u64(&state->minLSN, state->procInfos[i].waitLSN);
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete the information about the LSN waiter backend from the shared memory
+ * array.
+ */
+static void
+deleteLSNWaiter(void)
+{
+	int			i;
+	bool		found = false;
+
+	SpinLockAcquire(&state->mutex);
+
+	for (i = 0; i < state->numWaitedProcs; i++)
+	{
+		if (state->procInfos[i].procnum == MyProcNumber)
+			found = true;
+
+		if (found && i < state->numWaitedProcs - 1)
+		{
+			state->procInfos[i] = state->procInfos[i + 1];
+		}
+	}
+
+	if (!found)
+	{
+		SpinLockRelease(&state->mutex);
+		return;
+	}
+	state->numWaitedProcs--;
+
+	if (state->numWaitedProcs != 0)
+		pg_atomic_write_u64(&state->minLSN, state->procInfos[i].waitLSN);
+	else
+		pg_atomic_write_u64(&state->minLSN, PG_UINT64_MAX);
+
+	SpinLockRelease(&state->mutex);
+}
+
+/* Set all latches in shared memory to signal that new LSN has been replayed */
+void
+WaitLSNSetLatches(XLogRecPtr curLSN)
+{
+	uint32		i,
+				numWakeUpProcs;
+
+	if (!state)
+		return;
+
+	/*
+	 * Fast check if we reached the LSN at least one process is waiting for.
+	 * This is important because saves us from acquiring the spinlock for
+	 * every WAL record replayed.
+	 */
+	if (pg_atomic_read_u64(&state->minLSN) > curLSN)
+		return;
+
+	SpinLockAcquire(&state->mutex);
+
+	/*
+	 * Set latches for process, whose waited LSNs are already replayed.
+	 */
+	for (i = 0; i < state->numWaitedProcs; i++)
+	{
+		PGPROC	   *backend;
+
+		if (state->procInfos[i].waitLSN > curLSN)
+			break;
+
+		backend = GetPGProcByNumber(state->procInfos[i].procnum);
+		SetLatch(&backend->procLatch);
+	}
+
+	/*
+	 * Immediately remove those processes from the shmem array.  Otherwise,
+	 * shmem array items will be here till corresponding processes wake up and
+	 * delete themselves.
+	 */
+	numWakeUpProcs = i;
+	for (i = 0; i < state->numWaitedProcs - numWakeUpProcs; i++)
+		state->procInfos[i] = state->procInfos[i + numWakeUpProcs];
+	state->numWaitedProcs -= numWakeUpProcs;
+
+	if (state->numWaitedProcs != 0)
+		pg_atomic_write_u64(&state->minLSN, state->procInfos[i].waitLSN);
+	else
+		pg_atomic_write_u64(&state->minLSN, PG_UINT64_MAX);
+
+	SpinLockRelease(&state->mutex);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+	if (haveShmemItem)
+		deleteLSNWaiter();
+}
+
+/*
+ * On WAIT use MyLatch to wait till LSN is replayed, postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSN(XLogRecPtr lsn, int millisecs)
+{
+	XLogRecPtr	curLSN;
+	int			latch_events;
+	TimestampTz endtime;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(state);
+
+	/* Should be only called by a backend */
+	Assert(MyBackendType == B_BACKEND);
+
+	if (!RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("recovery is not in progress"),
+				 errhint("Waiting for LSN can only be executed during recovery.")));
+
+	endtime = GetCurrentTimestamp() + millisecs * 1000;
+
+	latch_events = WL_TIMEOUT | WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+	addLSNWaiter(lsn);
+	haveShmemItem = true;
+
+	for (;;)
+	{
+		int			rc;
+		long		delay_ms;
+
+		/* Check if the waited LSN has been replayed */
+		curLSN = GetXLogReplayRecPtr(NULL);
+		if (lsn <= curLSN)
+			break;
+
+		if (millisecs > 0)
+			delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
+		else
+
+			/* If no timeout is set then wake up in 1 minute for interupts */
+			delay_ms = 60000;
+
+		if (delay_ms <= 0)
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS, then delete
+		 * the current event from array.
+		 */
+		CHECK_FOR_INTERRUPTS();
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_CLIENT_READ);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	if (lsn > curLSN)
+	{
+		deleteLSNWaiter();
+		haveShmemItem = false;
+		ereport(ERROR,
+				(errcode(ERRCODE_QUERY_CANCELED),
+				 errmsg("canceling waiting for LSN due to timeout")));
+	}
+	else
+	{
+		haveShmemItem = false;
+	}
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c6e2f679fd5..be97f5a8700 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -647,6 +647,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <list>		hash_partbound
 %type <defelt>		hash_partbound_elem
 
+%type <ival>		after_lsn_timeout
+
 %type <node>	json_format_clause
 				json_format_clause_opt
 				json_value_expr
@@ -3193,6 +3195,14 @@ hash_partbound:
 			}
 		;
 
+/*
+ * WITHIN timeout optional clause for BEGIN AFTER lsn
+ */
+after_lsn_timeout:
+			WITHIN Iconst		{ $$ = $2; }
+			| /* EMPTY */           { $$ = 0; }
+		;
+
 /*****************************************************************************
  *
  *	ALTER TYPE
@@ -10949,6 +10959,17 @@ TransactionStmt:
 					n->location = -1;
 					$$ = (Node *) n;
 				}
+			| START TRANSACTION transaction_mode_list_or_empty AFTER Sconst after_lsn_timeout
+				{
+					TransactionStmt *n = makeNode(TransactionStmt);
+
+					n->kind = TRANS_STMT_START;
+					n->options = $3;
+					n->afterLsn = $5;
+					n->afterLsnTimeout = $6;
+					n->location = -1;
+					$$ = (Node *) n;
+				}
 			| COMMIT opt_transaction opt_transaction_chain
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
@@ -11053,6 +11074,17 @@ TransactionStmtLegacy:
 					n->location = -1;
 					$$ = (Node *) n;
 				}
+			| BEGIN_P opt_transaction transaction_mode_list_or_empty AFTER Sconst after_lsn_timeout
+				{
+					TransactionStmt *n = makeNode(TransactionStmt);
+
+					n->kind = TRANS_STMT_BEGIN;
+					n->options = $3;
+					n->afterLsn = $5;
+					n->afterLsnTimeout = $6;
+					n->location = -1;
+					$$ = (Node *) n;
+				}
 			| END_P opt_transaction opt_transaction_chain
 				{
 					TransactionStmt *n = makeNode(TransactionStmt);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 521ed5418cc..5aed90c9355 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, WaitEventExtensionShmemSize());
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
+	size = add_size(size, WaitLSNShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize subsystems */
 	CreateOrAttachShmemStructs();
 
+	/*
+	 * Init array of Latches in shared memory for wait lsn
+	 */
+	WaitLSNShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 162b1f919db..4b830dc3c85 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -862,6 +863,11 @@ ProcKill(int code, Datum arg)
 	 */
 	LWLockReleaseAll();
 
+	/*
+	 * Cleanup waiting for LSN if any.
+	 */
+	WaitLSNCleanup();
+
 	/* Cancel any pending condition variable sleep, too */
 	ConditionVariableCancelSleep();
 
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 83f86a42f79..bf9a685a9ae 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -56,6 +56,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -63,8 +64,10 @@
 #include "storage/fd.h"
 #include "tcop/utility.h"
 #include "utils/acl.h"
+#include "utils/fmgrprotos.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
 
 /* Hook for plugins to get control in ProcessUtility() */
 ProcessUtility_hook_type ProcessUtility_hook = NULL;
@@ -606,6 +609,15 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 						{
 							ListCell   *lc;
 
+							if (stmt->afterLsn)
+							{
+								XLogRecPtr	lsn = InvalidXLogRecPtr;
+
+								lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+																	  CStringGetDatum(stmt->afterLsn)));
+								WaitForLSN(lsn, stmt->afterLsnTimeout);
+							}
+
 							BeginTransactionBlock();
 							foreach(lc, stmt->options)
 							{
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 00000000000..bfc90a4102d
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ *	  prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_LSN_H
+#define WAIT_LSN_H
+
+#include "postgres.h"
+#include "tcop/dest.h"
+
+extern void WaitForLSN(XLogRecPtr lsn, int millisecs);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatches(XLogRecPtr cur_lsn);
+extern void WaitLSNCleanup(void);
+
+#endif							/* WAIT_LSN_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index aadaf67f574..c2077b5252e 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3527,6 +3527,8 @@ typedef struct TransactionStmt
 	/* for two-phase-commit related commands */
 	char	   *gid pg_node_attr(query_jumble_ignore);
 	bool		chain;			/* AND CHAIN option */
+	char	   *afterLsn;		/* target LSN to wait */
+	int			afterLsnTimeout;	/* LSN waiting timeout */
 	/* token location, or -1 if unknown */
 	int			location pg_node_attr(query_jumble_location);
 } TransactionStmt;
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec1..a1c2a0b13d2 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/043_begin_after.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/043_begin_after.pl b/src/test/recovery/t/043_begin_after.pl
new file mode 100644
index 00000000000..f7b43f5db1e
--- /dev/null
+++ b/src/test/recovery/t/043_begin_after.pl
@@ -0,0 +1,62 @@
+# Checks waiting for lsn on standby AFTER
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf(
+	'postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Make sure that AFTER works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that AFTER is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $output = $node_standby->safe_psql(
+	'postgres', qq[
+	BEGIN AFTER '${lsn1}';
+	SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+ok($output eq 0, "standby reached the same LSN as primary AFTER");
+
+my $lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn() + 1");
+my $stderr;
+$node_standby->safe_psql('postgres', "BEGIN AFTER '$lsn1' WITHIN 1");
+$node_standby->psql(
+	'postgres',
+	"BEGIN AFTER '$lsn2' WITHIN 1",
+	stderr => \$stderr);
+ok( $stderr =~ /canceling waiting for LSN due to timeout/,
+	"get timeout on waiting for unreachable LSN");
+
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index aa7a25b8f8c..7a2ac178bc3 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3035,6 +3035,8 @@ WaitEventIO
 WaitEventIPC
 WaitEventSet
 WaitEventTimeout
+WaitLSNProcInfo
+WaitLSNState
 WaitPMResult
 WalCloseMethod
 WalCompression
-- 
2.39.3 (Apple Git-145)

