Thank you for your feedback.

On 2024-03-20 12:11, Alexander Korotkov wrote:
On Wed, Mar 20, 2024 at 12:34 AM Kartyshov Ivan
<i.kartys...@postgrespro.ru> wrote:
> 4.2 With an unreasonably high future LSN, BEGIN command waits
> unboundedly, shouldn't we check if the specified LSN is more than
> pg_last_wal_receive_lsn() error out?

I think limiting wait lsn by current received lsn would destroy the
whole value of this feature.  The value is to wait till given LSN is
replayed, whether it's already received or not.

Ok sounds reasonable, I`ll rollback the changes.

But I don't see a problem here. On the replica, it's out of our
control to check which lsn is good and which is not.  We can't check
whether the lsn, which is in future for the replica, is already issued
by primary.

For the case of wrong lsn, which could cause potentially infinite
wait, there is the timeout and the manual query cancel.

Fully agree with this take.

> 4.3 With an unreasonably high wait time, BEGIN command waits
> unboundedly, shouldn't we restrict the wait time to some max
value,
> say a day or so?
> SELECT pg_last_wal_receive_lsn() + 1 AS future_receive_lsn \gset
> BEGIN AFTER :'future_receive_lsn' WITHIN 100000;

Good idea, I put it 1 day. But this limit we should to discuss.

Do you think that specifying timeout in milliseconds is suitable?  I
would prefer to switch to seconds (with ability to specify fraction of
second).  This was expressed before by Alexander Lakhin.

It sounds like an interesting idea. Please review the result.

> https://github.com/macdice/redo-bench or similar tools?

Ivan, could you do this?

Yes, test redo-bench/crash-recovery.sh
This patch on master
91.327, 1.973
105.907, 3.338
98.412, 4.579
95.818, 4.19

REL_13-STABLE
116.645, 3.005
113.212, 2.568
117.644, 3.183
111.411, 2.782

master
124.712, 2.047
117.012, 1.736
116.328, 2.035
115.662, 1.797

Strange behavior, patched version is faster then REL_13-STABLE and master.

I don't see this change in the patch.  Normally if a process gets a
signal, that causes WaitLatch() to exit immediately.  It also exists
immediately on query cancel.  IIRC, this 1 minute timeout is needed to
handle some extreme cases when an interrupt is missing.  Other places
have it equal to 1 minute. I don't see why we should have it
different.

Ok, I`ll rollback my changes.

4) added and expanded sections in the documentation

I don't see this in the patch.  I see only a short description in
func.sgml, which is definitely not sufficient.  We need at least
everything we have in the docs before to be adjusted with the current
approach of procedure.

I didn't find another section where to add the description of pg_wait_lsn().
So I extend description on the bottom of the table.

5) add default variant of timeout
pg_wait_lsn(trg_lsn pg_lsn, delay int8 DEFAULT 0)
example: pg_wait_lsn('0/31B1B60') equal pg_wait_lsn('0/31B1B60', 0)

Does zero here mean no timeout?  I think this should be documented.
Also, I would prefer to see the timeout by default.  Probably one
minute would be good for default.

Lets discuss this point. Loop in function WaitForLSN is made that way,
if we choose delay=0, only then we can wait infinitely to wait LSN
without timeout. So default must be 0.

Please take one more look on the patch.

--
Ivan Kartyshov
Postgres Professional: www.postgrespro.com
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 5b225ccf4f..6e1f69962d 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27861,10 +27861,54 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         extension.
        </para></entry>
       </row>
+
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_wait_lsn</primary>
+        </indexterm>
+        <function>pg_wait_lsn</function> (trg_lsn pg_lsn, delay int8 DEFAULT 0)
+       </para>
+       <para>
+        Returns ERROR if target LSN was not replayed on standby.
+        Parameter <parameter>delay</parameter> sets seconds, the time to wait to the LSN.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
 
+   <para>
+    <function>pg_wait_lsn</function> waits till <parameter>wait_lsn</parameter>
+    to be replayed on standby to achieve read-your-writes-consistency, while
+    using async replica for reads and primary for writes. In that case lsn of
+    last modification is stored inside application.
+   </para>
+
+   <para>
+    You can use <function>pg_wait_lsn</function> to wait the <type>pg_lsn</type>
+    value. For example:
+   <programlisting>
+   Replica:
+   postgresql.conf
+     recovery_min_apply_delay = 10s;
+
+   Primary: Application update table and get lsn of changes
+   postgres=# UPDATE films SET kind = 'Dramatic' WHERE kind = 'Drama';
+   postgres=# SELECT pg_current_wal_lsn();
+   pg_current_wal_lsn
+   --------------------
+   0/306EE20
+   (1 row)
+
+   Replica: Wait and read updated data
+   postgres=# CALL pg_wait_lsn('0/306EE20', 100); // Timeout 100ms is insufficent
+   ERROR:  canceling waiting for LSN due to timeout
+   postgres=# CALL pg_wait_lsn('0/306EE20');
+   postgres=# SELECT * FROM films WHERE kind = 'Drama';
+   </programlisting>
+   </para>
+
    <para>
     The functions shown in <xref
     linkend="functions-recovery-control-table"/> control the progress of recovery.
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 29c5bec084..0b783dc733 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -43,6 +43,7 @@
 #include "backup/basebackup.h"
 #include "catalog/pg_control.h"
 #include "commands/tablespace.h"
+#include "commands/waitlsn.h"
 #include "common/file_utils.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1828,6 +1829,14 @@ PerformWalRecovery(void)
 				break;
 			}
 
+			/*
+			 * If we replayed an LSN that someone was waiting for, set latches
+			 * in shared memory array to notify the waiter.
+			 */
+			if (waitLSN &&
+				(XLogRecoveryCtl->lastReplayedEndRecPtr >= pg_atomic_read_u64(&waitLSN->minLSN)))
+				WaitLSNSetLatches(XLogRecoveryCtl->lastReplayedEndRecPtr);
+
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
 		} while (record != NULL);
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index fe2bb50f46..5c4f9c78a1 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -414,6 +414,9 @@ CREATE OR REPLACE FUNCTION
   json_populate_recordset(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
   RETURNS SETOF anyelement LANGUAGE internal STABLE ROWS 100  AS 'json_populate_recordset' PARALLEL SAFE;
 
+CREATE OR REPLACE PROCEDURE pg_wait_lsn(trg_lsn pg_lsn, delay float8 DEFAULT 0)
+  LANGUAGE internal AS 'pg_wait_lsn';
+
 CREATE OR REPLACE FUNCTION pg_logical_slot_get_changes(
     IN slot_name name, IN upto_lsn pg_lsn, IN upto_nchanges int, VARIADIC options text[] DEFAULT '{}',
     OUT lsn pg_lsn, OUT xid xid, OUT data text)
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 48f7348f91..cede90c3b9 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -61,6 +61,7 @@ OBJS = \
 	vacuum.o \
 	vacuumparallel.o \
 	variable.o \
-	view.o
+	view.o \
+	waitlsn.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index 6dd00a4abd..7549be5dc3 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -50,4 +50,5 @@ backend_sources += files(
   'vacuumparallel.c',
   'variable.c',
   'view.c',
+  'waitlsn.c',
 )
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 0000000000..bd9ea6b7db
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,303 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ *	  Implements waiting for the given LSN, which is used in
+ *	  CALL pg_wait_lsn(wait_lsn pg_lsn, timeout int).
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "pgstat.h"
+#include "fmgr.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogrecovery.h"
+#include "catalog/pg_type.h"
+#include "commands/waitlsn.h"
+#include "executor/spi.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+#include "utils/timestamp.h"
+#include "utils/fmgrprotos.h"
+
+/* Add to / delete from shared memory array */
+static void addLSNWaiter(XLogRecPtr lsn);
+static void deleteLSNWaiter(void);
+
+struct WaitLSNState *waitLSN = NULL;
+static volatile sig_atomic_t haveShmemItem = false;
+
+/*
+ * Report the amount of shared memory space needed for WaitLSNState
+ */
+Size
+WaitLSNShmemSize(void)
+{
+	Size		size;
+
+	size = offsetof(WaitLSNState, procInfos);
+	size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
+	return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory */
+void
+WaitLSNShmemInit(void)
+{
+	bool		found;
+
+	waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+											   WaitLSNShmemSize(),
+											   &found);
+	if (!found)
+	{
+		SpinLockInit(&waitLSN->mutex);
+		waitLSN->numWaitedProcs = 0;
+		pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+	}
+}
+
+/*
+ * Add the information about the LSN waiter backend to the shared memory
+ * array.
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn)
+{
+	WaitLSNProcInfo cur;
+	int			i;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	cur.procnum = MyProcNumber;
+	cur.waitLSN = lsn;
+
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
+		{
+			WaitLSNProcInfo tmp;
+
+			tmp = waitLSN->procInfos[i];
+			waitLSN->procInfos[i] = cur;
+			cur = tmp;
+		}
+	}
+	waitLSN->procInfos[i] = cur;
+	waitLSN->numWaitedProcs++;
+
+	pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete the information about the LSN waiter backend from the shared memory
+ * array.
+ */
+static void
+deleteLSNWaiter(void)
+{
+	int			i;
+	bool		found = false;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		if (waitLSN->procInfos[i].procnum == MyProcNumber)
+			found = true;
+
+		if (found && i < waitLSN->numWaitedProcs - 1)
+		{
+			waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
+		}
+	}
+
+	if (!found)
+	{
+		SpinLockRelease(&waitLSN->mutex);
+		return;
+	}
+	waitLSN->numWaitedProcs--;
+
+	if (waitLSN->numWaitedProcs != 0)
+		pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	else
+		pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Set all latches in shared memory to signal that new LSN has been replayed
+*/
+void
+WaitLSNSetLatches(XLogRecPtr curLSN)
+{
+	uint32		i,
+				numWakeUpProcs;
+
+	SpinLockAcquire(&waitLSN->mutex);
+
+	/*
+	 * Set latches for process, whose waited LSNs are already replayed.
+	 */
+	for (i = 0; i < waitLSN->numWaitedProcs; i++)
+	{
+		PGPROC	   *backend;
+
+		if (waitLSN->procInfos[i].waitLSN > curLSN)
+			break;
+
+		backend = GetPGProcByNumber(waitLSN->procInfos[i].procnum);
+		SetLatch(&backend->procLatch);
+	}
+
+	/*
+	 * Immediately remove those processes from the shmem array.  Otherwise,
+	 * shmem array items will be here till corresponding processes wake up and
+	 * delete themselves.
+	 */
+	numWakeUpProcs = i;
+	for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
+		waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
+	waitLSN->numWaitedProcs -= numWakeUpProcs;
+
+	if (waitLSN->numWaitedProcs != 0)
+		pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
+	else
+		pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);
+
+	SpinLockRelease(&waitLSN->mutex);
+}
+
+/*
+ * Delete our item from shmem array if any.
+ */
+void
+WaitLSNCleanup(void)
+{
+	if (haveShmemItem)
+		deleteLSNWaiter();
+}
+
+/*
+ * Wait using MyLatch to wait till the given LSN is replayed, the postmaster dies or
+ * timeout happens.
+ */
+void
+WaitForLSN(XLogRecPtr lsn, float8 sec)
+{
+	XLogRecPtr	curLSN;
+	int			latch_events;
+	TimestampTz endtime;
+
+	/* Shouldn't be called when shmem isn't initialized */
+	Assert(waitLSN);
+
+	/* Should be only called by a backend */
+	Assert(MyBackendType == B_BACKEND);
+
+	if (!RecoveryInProgress())
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("recovery is not in progress"),
+				 errhint("Waiting for LSN can only be executed during recovery.")));
+
+	endtime = GetCurrentTimestamp() + (int64)(sec * 1000 * 1000);
+
+	latch_events = WL_TIMEOUT | WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
+	addLSNWaiter(lsn);
+	haveShmemItem = true;
+
+	for (;;)
+	{
+		int			rc;
+		long		delay_ms;
+
+		/* Check if the waited LSN has been replayed */
+		curLSN = GetXLogReplayRecPtr(NULL);
+		if (lsn <= curLSN)
+			break;
+
+		if (sec >= 1)
+			delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
+		else
+			/* If no timeout is set then wake up in 1 minute for interrupts */
+			delay_ms = 60000;
+
+		if (delay_ms <= 0)
+			break;
+
+		/*
+		 * If received an interruption from CHECK_FOR_INTERRUPTS, then delete
+		 * the current event from array.
+		 */
+		CHECK_FOR_INTERRUPTS();
+
+		/* If postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		rc = WaitLatch(MyLatch, latch_events, delay_ms,
+					   WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+
+	if (lsn > curLSN)
+	{
+		deleteLSNWaiter();
+		haveShmemItem = false;
+		ereport(ERROR,
+				(errcode(ERRCODE_QUERY_CANCELED),
+				 errmsg("canceling waiting for LSN due to timeout")));
+	}
+	else
+	{
+		haveShmemItem = false;
+	}
+}
+
+Datum
+pg_wait_lsn(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	trg_lsn = PG_GETARG_LSN(0);
+	float8		delay = PG_GETARG_FLOAT8(1);
+	CallContext *context = (CallContext *) fcinfo->context;
+
+	if (context->atomic)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("pg_wait_lsn() must be only called in non-atomic context")));
+
+	if (ActiveSnapshotSet())
+		PopActiveSnapshot();
+	Assert(!ActiveSnapshotSet());
+
+	(void) WaitForLSN(trg_lsn, delay);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 521ed5418c..5aed90c935 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -25,6 +25,7 @@
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "commands/async.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -152,6 +153,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, WaitEventExtensionShmemSize());
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
+	size = add_size(size, WaitLSNShmemSize());
 #ifdef EXEC_BACKEND
 	size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -244,6 +246,11 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize subsystems */
 	CreateOrAttachShmemStructs();
 
+	/*
+	 * Init array of Latches in shared memory for wait lsn
+	 */
+	WaitLSNShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 162b1f919d..4b830dc3c8 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xlogutils.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -862,6 +863,11 @@ ProcKill(int code, Datum arg)
 	 */
 	LWLockReleaseAll();
 
+	/*
+	 * Cleanup waiting for LSN if any.
+	 */
+	WaitLSNCleanup();
+
 	/* Cancel any pending condition variable sleep, too */
 	ConditionVariableCancelSleep();
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 177d81a891..0d9c012e52 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12135,6 +12135,11 @@
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
 
+{ oid => '16387', descr => 'wait for LSN until timeout',
+  proname => 'pg_wait_lsn', prokind => 'p', prorettype => 'void',
+  proargtypes => 'pg_lsn float8', proargnames => '{trg_lsn,delay}',
+  prosrc => 'pg_wait_lsn' },
+
 { oid => '6291', descr => 'arbitrary value from among input values',
   proname => 'any_value', prokind => 'a', proisstrict => 'f',
   prorettype => 'anyelement', proargtypes => 'anyelement',
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 0000000000..53ab9b7b6b
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,43 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ *	  Declarations for LSN waiting routines.
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_LSN_H
+#define WAIT_LSN_H
+
+#include "postgres.h"
+#include "port/atomics.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+/* Shared memory structure */
+typedef struct WaitLSNProcInfo
+{
+	int			procnum;
+	XLogRecPtr	waitLSN;
+}			WaitLSNProcInfo;
+
+typedef struct WaitLSNState
+{
+	pg_atomic_uint64 minLSN;
+	slock_t		mutex;
+	int			numWaitedProcs;
+	WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+}			WaitLSNState;
+
+extern PGDLLIMPORT struct WaitLSNState *waitLSN;
+
+extern void WaitForLSN(XLogRecPtr lsn, float8 sec);
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatches(XLogRecPtr curLSN);
+extern void WaitLSNCleanup(void);
+
+#endif							/* WAIT_LSN_H */
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..bc47c93902 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/043_wait_lsn.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/043_wait_lsn.pl b/src/test/recovery/t/043_wait_lsn.pl
new file mode 100644
index 0000000000..1875702c93
--- /dev/null
+++ b/src/test/recovery/t/043_wait_lsn.pl
@@ -0,0 +1,77 @@
+# Checks waiting for lsn on standby pg_wait_lsn()
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+	"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 3;
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+$node_standby->append_conf(
+	'postgresql.conf', qq[
+	recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+
+# Make sure that pg_wait_lsn() works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that pg_wait_lsn() is
+# able to setup an infinite waiting loop and exit it if given no wait timeout.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $output = $node_standby->safe_psql(
+	'postgres', qq[
+	CALL pg_wait_lsn('${lsn1}', 1000000);
+	SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+ok($output eq 0, "standby reached the same LSN as primary pg_wait_lsn()");
+
+my $lsn2 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn() + 1");
+my $stderr;
+$node_standby->safe_psql('postgres', "CALL pg_wait_lsn('${lsn1}', 1);");
+$node_standby->psql(
+	'postgres',
+	"CALL pg_wait_lsn('${lsn2}', 1);",
+	stderr => \$stderr);
+ok( $stderr =~ /canceling waiting for LSN due to timeout/,
+	"get timeout on waiting for unreachable LSN");
+
+# Make sure that pg_wait_lsn() works: add new content to primary and memorize
+# primary's new LSN, then wait for primary's LSN on standby. Prove that pg_wait_lsn() is
+# able to setup an infinite waiting loop and exit it if current LSN replayed.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn3 =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	CALL pg_wait_lsn('${lsn3}');
+	SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn3}'::pg_lsn);
+]);
+
+# Get the current LSN on standby and make sure it's the same as primary's LSN
+ok($output eq 0, "standby reached the same LSN as primary");
+
+$node_standby->stop;
+$node_primary->stop;
+done_testing();

Reply via email to