From 732109c42e2be45de154102e78c617904492c35c Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <akorotkov@postgresql.org>
Date: Thu, 28 Mar 2024 02:19:50 +0200
Subject: [PATCH v12] Implement pg_wait_for_wal_replay_lsn() stored procedure

pg_wait_for_wal_replay_lsn is to be used on standby and specifies waiting for
the specific WAL location to be replayed before starting the transaction.
This option is useful when the user makes some data changes on primary and
needs a guarantee to see these changes on standby.

The queue of waiters is stored in the shared memory array sorted by LSN.
During replay of WAL waiters whose LSNs are already replayed are deleted from
the shared memory array and woken up by setting of their latches.

Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru
Author: Kartyshov Ivan, Alexander Korotkov
Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila
Reviewed-by: Alexander Lakhin, Bharath Rupireddy
---
 doc/src/sgml/func.sgml                    |  96 +++++++
 src/backend/access/transam/xlogrecovery.c |  10 +
 src/backend/catalog/system_functions.sql  |   3 +
 src/backend/commands/Makefile             |   3 +-
 src/backend/commands/meson.build          |   1 +
 src/backend/commands/waitlsn.c            | 303 ++++++++++++++++++++++
 src/backend/storage/ipc/ipci.c            |   7 +
 src/backend/storage/lmgr/proc.c           |   6 +
 src/include/catalog/pg_proc.dat           |   5 +
 src/include/commands/waitlsn.h            |  43 +++
 src/test/recovery/meson.build             |   1 +
 src/test/recovery/t/043_wait_lsn.pl       |  77 ++++++
 src/tools/pgindent/typedefs.list          |   2 +
 13 files changed, 556 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/commands/waitlsn.c
 create mode 100644 src/include/commands/waitlsn.h
 create mode 100644 src/test/recovery/t/043_wait_lsn.pl

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8ecc02f2b90..69dc53f24c4 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -28227,6 +28227,102 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
     the pause, the rate of WAL generation and available disk space.
    </para>
 
+   <para>
+    There are also procedures to control the progress of recovery.
+    They are shown in <xref linkend="procedures-recovery-control-table"/>.
+    These procedures may be executed only during recovery.
+   </para>
+
+   <table id="procedures-recovery-control-table">
+    <title>Recovery Control Procedures</title>
+    <tgroup cols="1">
+     <thead>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        Procedure
+       </para>
+       <para>
+        Description
+       </para></entry>
+      </row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_wait_for_wal_replay_lsn</primary>
+        </indexterm>
+        <function>pg_wait_for_wal_replay_lsn</function> (
+          <parameter>target_lsn</parameter> <type>pg_lsn</type>,
+          <parameter>timeout</parameter> <type>float8</type> <literal>DEFAULT</literal> <literal>0</literal>)
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+        Throws an ERROR if the target <acronym>lsn</acronym> was not replayed
+        on standby within given timeout.  Parameter <parameter>timeout</parameter>
+        is the time in seconds to wait for the<parameter>target_lsn</parameter>  replay.
+        When <parameter>timeout</parameter> value equals to zero no timeout
+        is applied.
+       </para></entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+   <para>
+    <function>pg_wait_for_wal_replay_lsn</function> waits till
+    <parameter>target_lsn</parameter> to be replayed on standby.
+    That is, after this function execution, the value returned by
+    <function>pg_last_wal_replay_lsn</function> should be greater or equal
+    to the <parameter>target_lsn</parameter> value.  This is useful to achieve
+    read-your-writes-consistency, while using async replica for reads and
+    primary for writes.  In that case <acronym>lsn</acronym> of the last
+    modification should be stored on the client application side or the
+    connection pooler side.
+   </para>
+
+   <para>
+    You can use <function>pg_wait_for_wal_replay_lsn</function> to wait the <type>pg_lsn</type>
+    value.  For example, an application could update the
+    <literal>movie</literal> table and get the <acronym>lsn</acronym> of
+    changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
+    to get the <acronym>lsn</acronym> given that <varname>synchronous_commit</varname>
+    could be set to <literal>off</literal>.
+
+   <programlisting>
+postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
+UPDATE 100
+postgres=# SELECT pg_current_wal_insert_lsn();
+pg_current_wal_insert_lsn
+--------------------
+0/306EE20
+(1 row)
+   </programlisting>
+
+   Then an application could run <function>pg_wait_for_wal_replay_lsn</function>
+   with the <acronym>lsn</acronym> obtained from primary.  After that the
+   changes made of primary should be guaranteed to be visible on replica.
+
+   <programlisting>
+postgres=# CALL pg_wait_for_wal_replay_lsn('0/306EE20');
+CALL
+postgres=# SELECT * FROM movie WHERE genre = 'Drama';
+ genre
+-------
+(0 rows)
+   </programlisting>
+
+   It may also happen that target <acronym>lsn</acronym> is not achieved
+   within the timeout.  In that case the error is thrown.
+
+   <programlisting>
+postgres=# CALL pg_wait_lsn('0/306EE20', 0.1);
+ERROR:  canceling waiting for LSN due to timeout
+   </programlisting>
+
+   </para>
+
   </sect2>
 
   <sect2 id="functions-snapshot-synchronization">
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 29c5bec0847..509616c2749 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/waitlsn.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1828,6 +1829,15 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for, set latches
+			 * in shared memory array to notify the waiter.
+			 */
+			if (waitLSN &&
+				(XLogRecoveryCtl->lastReplayedEndRecPtr >=
+				 pg_atomic_read_u64(&waitLSN->minLSN)))
+				WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index fe2bb50f46d..6ca29c28fa1 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -414,6 +414,9 @@ CREATE OR REPLACE FUNCTION
   json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
   RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100  AS 'json_populate_recordset' PARALLEL SAFE;
 
+CREATE OR REPLACE PROCEDURE pg_wait_for_wal_replay_lsn(target_lsn pg_lsn, timeout float8 DEFAULT 0)
+  LANGUAGE internal AS 'pg_wait_for_wal_replay_lsn';
+
 CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
     IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
     OUT lsn pg_lsn, OUT xid xid, OUT data text)
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91c..cede90c3b98 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	waitlsn.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abde..7549be5dc3b 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'waitlsn.c',
 )
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 00000000000..80ac9fc268c
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,303 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ *	  Implements waiting for the given LSN, which is used in
+ *	  CALL pg_wait_lsn(wait_lsn pg_lsn, timeout int).
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/waitlsn.h"
+#include "executor/spi.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+#include "utils/timestamp.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void addLSNWaiter(XLogRecPtr lsn);
+static void deleteLSNWaiter(void);
+
+struct WaitLSNState *waitLSN = NULL;
+static volatile sig_atomic_t haveShmemItem = false;
+
+/*
+ * Report the amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitLSNState, procInfos);
+	size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+	return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory */
+void
+WaitLSNShmemInit(void)
+{
+	bool		found;
+
+	waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+											   WaitLSNShmemSize(),
+											   &found);
+	if (!found)
+	{
+		SpinLockInit(&waitLSN->mutex);
+		waitLSN->numWaitedProcs = 0;
+		pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+	}
+}
+
+/*
+ * Add the information about the LSN waiter backend to the shared memory
+ * array.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+	WaitLSNProcInfo cur;
+	int			i;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	cur.procnum = MyProcNumber;
+	cur.waitLSN = lsn;
+
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
+		{
+			WaitLSNProcInfo tmp;
+
+			tmp = waitLSN->procInfos[i];
+			waitLSN->procInfos[i] = cur;
+			cur = tmp;
+		}
+	}
+	waitLSN->procInfos[i] = cur;
+	waitLSN->numWaitedProcs++;
+
+	pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete the information about the LSN waiter backend from the shared memory
+ * array.
+ */
+static void
+deleteLSNWaiter(void)
+{
+	int			i;
+	bool		found = false;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		if (waitLSN->procInfos[i].procnum == MyProcNumber)
+			found = true;
+
+		if (found && i < waitLSN->numWaitedProcs - 1)
+		{
+			waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
+		}
+	}
+
+	if (!found)
+	{
+		SpinLockRelease(&waitLSN->mutex);
+		return;
+	}
+	waitLSN->numWaitedProcs--;
+
+	if (waitLSN->numWaitedProcs != 0)
+		pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	else
+		pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Set all latches in shared memory to signal that new LSN has been replayed
+*/
+void
+WaitLSNSetLatches(XLogRecPtr curLSN)
+{
+	uint32		i,
+				numWakeUpProcs;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	/*
+	 * Set latches for process, whose waited LSNs are already replayed.
+	 */
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		PGPROC	   *backend;
+
+		if (waitLSN->procInfos[i].waitLSN > curLSN)
+			break;
+
+		backend = GetPGProcByNumber(waitLSN->procInfos[i].procnum);
+		SetLatch(&backend->procLatch);
+	}
+
+	/*
+	 * Immediately remove those processes from the shmem array.  Otherwise,
+	 * shmem array items will be here till corresponding processes wake up and
+	 * delete themselves.
+	 */
+	numWakeUpProcs = i;
+	for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
+		waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
+	waitLSN->numWaitedProcs -= numWakeUpProcs;
+
+	if (waitLSN->numWaitedProcs != 0)
+		pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	else
+		pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+	if (haveShmemItem)
+		deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch to wait till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSN(XLogRecPtr lsn, float8 timeout)
+{
+	XLogRecPtr	curLSN;
+	int			latch_events;
+	TimestampTz endtime;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(waitLSN);
+
+	/* Should be only called by a backend */
+	Assert(MyBackendType == B_BACKEND);
+
+	if (!RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("recovery is not in progress"),
+				 errhint("Waiting for LSN can only be executed during recovery.")));
+
+	endtime = TimestampTzPlusSeconds(GetCurrentTimestamp(), timeout);
+
+	latch_events = WL_TIMEOUT | WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+	addLSNWaiter(lsn);
+	haveShmemItem = true;
+
+	for (;;)
+	{
+		int			rc;
+		long		delay_ms;
+
+		/* Check if the waited LSN has been replayed */
+		curLSN = GetXLogReplayRecPtr(NULL);
+		if (lsn <= curLSN)
+			break;
+
+		if (timeout > 0.0)
+			delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
+		else
+			/* If no timeout is set then wake up in 1 minute for interrupts */
+			delay_ms = 60000;
+
+		if (delay_ms <= 0)
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS, then delete
+		 * the current event from array.
+		 */
+		CHECK_FOR_INTERRUPTS();
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	if (lsn > curLSN)
+	{
+		deleteLSNWaiter();
+		haveShmemItem = false;
+		ereport(ERROR,
+				(errcode(ERRCODE_QUERY_CANCELED),
+				 errmsg("canceling waiting for LSN due to timeout")));
+	}
+	else
+	{
+		haveShmemItem = false;
+	}
+}
+
+Datum
+pg_wait_for_wal_replay_lsn(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	target_lsn = PG_GETARG_LSN(0);
+	float8		timeout = PG_GETARG_FLOAT8(1);
+	CallContext *context = (CallContext *) fcinfo->context;
+
+	if (context->atomic)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("pg_wait_lsn() must be only called in non-atomic context")));
+
+	if (ActiveSnapshotSet())
+		PopActiveSnapshot();
+	Assert(!ActiveSnapshotSet());
+
+	(void) WaitForLSN(target_lsn, timeout);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 521ed5418cc..5aed90c9355 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, WaitEventExtensionShmemSize());
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
+	size = add_size(size, WaitLSNShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize subsystems */
 	CreateOrAttachShmemStructs();
 
+	/*
+	 * Init array of Latches in shared memory for wait lsn
+	 */
+	WaitLSNShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 162b1f919db..4b830dc3c85 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -862,6 +863,11 @@ ProcKill(int code, Datum arg)
 	 */
 	LWLockReleaseAll();
 
+	/*
+	 * Cleanup waiting for LSN if any.
+	 */
+	WaitLSNCleanup();
+
 	/* Cancel any pending condition variable sleep, too */
 	ConditionVariableCancelSleep();
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 2f7cfc02c6d..e611e21762b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12138,6 +12138,11 @@
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
 
+{ oid => '16387', descr => 'wait for LSN with timeout',
+  proname => 'pg_wait_for_wal_replay_lsn', prokind => 'p', prorettype => 'void',
+  proargtypes => 'pg_lsn float8', proargnames => '{target_lsn,timeout}',
+  prosrc => 'pg_wait_for_wal_replay_lsn' },
+
 { oid => '6291', descr => 'arbitrary value from among input values',
   proname => 'any_value', prokind => 'a', proisstrict => 'f',
   prorettype => 'anyelement', proargtypes => 'anyelement',
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 00000000000..8c1d8838348
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ *	  Declarations for LSN waiting routines.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_LSN_H
+#define WAIT_LSN_H
+
+#include "postgres.h"
+#include "port/atomics.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+/* Shared memory structures */
+typedef struct WaitLSNProcInfo
+{
+	int			procnum;
+	XLogRecPtr	waitLSN;
+} WaitLSNProcInfo;
+
+typedef struct WaitLSNState
+{
+	pg_atomic_uint64 minLSN;
+	slock_t		mutex;
+	int			numWaitedProcs;
+	WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+extern PGDLLIMPORT struct WaitLSNState *waitLSN;
+
+extern void WaitForLSN(XLogRecPtr lsn, float8 sec);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatches(XLogRecPtr curLSN);
+extern void WaitLSNCleanup(void);
+
+#endif							/* WAIT_LSN_H */
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec1..bc47c93902c 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/043_wait_lsn.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/043_wait_lsn.pl b/src/test/recovery/t/043_wait_lsn.pl
new file mode 100644
index 00000000000..2b5b05b34ee
--- /dev/null
+++ b/src/test/recovery/t/043_wait_lsn.pl
@@ -0,0 +1,77 @@
+# Checks waiting for the lsn replay on standby using
+# pg_wait_for_wal_replay_lsn() procedure.
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 3;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf(
+	'postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Make sure that pg_wait_for_wal_replay_lsn() works: add new content to
+# primary and memorize primary's insert LSN, then wait for that LSN to be
+# replayed on standby. 
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $output = $node_standby->safe_psql(
+	'postgres', qq[
+	CALL pg_wait_for_wal_replay_lsn('${lsn1}', 1000);
+	SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Make sure the current LSN on standby and is the same as primary's LSN
+ok($output eq 0, "standby reached the same LSN as primary after pg_wait_for_wal_replay_lsn()");
+
+# Check that waiting for unreachable LSN triggers the timeout.
+my $lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn() + 1");
+my $stderr;
+$node_standby->safe_psql('postgres', "CALL pg_wait_for_wal_replay_lsn('${lsn1}', 0.01);");
+$node_standby->psql(
+	'postgres',
+	"CALL pg_wait_for_wal_replay_lsn('${lsn2}', 1);",
+	stderr => \$stderr);
+ok( $stderr =~ /canceling waiting for LSN due to timeout/,
+	"get timeout on waiting for unreachable LSN");
+
+# Check that new data is visible after calling pg_wait_for_wal_replay_lsn()
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn3 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	CALL pg_wait_for_wal_replay_lsn('${lsn3}');
+	SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the current LSN on standby and is the same as primary's LSN
+ok($output eq 30, "standby reached the same LSN as primary");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cfa9d5aaeac..f35ea6212b1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3053,6 +3053,8 @@ WaitEventIO
 WaitEventIPC
 WaitEventSet
 WaitEventTimeout
+WaitLSNProcInfo
+WaitLSNState
 WaitPMResult
 WalCloseMethod
 WalCompression
-- 
2.39.3 (Apple Git-145)

