On Mon, Feb 28, 2022, at 9:18 PM, Euler Taveira wrote:
> Long time, no patch. Here it is. I will provide documentation in the next
> version. I would appreciate some feedback.
This patch is broken since commit 705e20f8550c0e8e47c0b6b20b5f5ffd6ffd9e33. I
rebased it.

I added documentation that explains how this parameter works. I decided to
rename the parameter from apply_delay to min_apply_delay to use the same
terminology from the physical replication. IMO the new name seems clear that
there isn't a guarantee that we are always x ms behind the publisher. Indeed,
due to processing/transferring the delay might be higher than the specified
interval.

I refactored the way the delay is applied. The previous patch is only covering
a regular transaction. This new one also covers prepared transaction. The
current design intercepts the transaction during the first change (at the time
it will start the transaction to apply the changes) and applies the delay
before effectively starting the transaction. The previous patch uses
begin_replication_step() as this point. However, to support prepared
transactions I changed the apply_delay signature to accepts a timestamp
parameter (because we use another variable to calculate the delay for prepared
transactions -- prepare_time). Hence, the apply_delay() moved to another places
-- apply_handle_begin and apply_handle_begin_prepare().

The new code does not apply the delay in 2 situations:

* STREAM START: streamed transactions might not have commit_time or
  prepare_time set. I'm afraid it is not possible to use the referred variables
  because at STREAM START time we don't have a transaction commit time. The
  protocol could provide a timestamp that indicates when it starts streaming
  the transaction then we could use it to apply the delay. Unfortunately, we
  don't have it. Having said that this new patch does not apply delay for
  streamed transactions.
* non-transaction messages: the delay could be applied to non-transaction
  messages too. It is sent independently of the transaction that contains it.
  Since the logical replication does not send messages to the subscriber, this
  is not an issue. However, consumers that use pgoutput and wants to implement
  a delay will require it.

I'm still looking for a way to support streamed transactions without much
surgery into the logical replication protocol.


--
Euler Taveira
EDB   https://www.enterprisedb.com/
From 9ad783c1259aed9bef81877265c648aae84ed5d8 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.tave...@enterprisedb.com>
Date: Sat, 6 Nov 2021 11:31:10 -0300
Subject: [PATCH v2] Time-delayed logical replication subscriber

Similar to physical replication, a time-delayed copy of the data for
logical replication is useful for some scenarios (specially to fix
errors that might cause data loss).

If the subscriber sets min_apply_delay parameter, the logical
replication worker will delay the transaction commit for min_apply_delay
milliseconds.

The delay is calculated between the WAL time stamp and the current time
on the subscriber.

The delay occurs only on WAL records for transaction begins. Regular and
prepared transactions are covered. Streamed transactions are not
delayed. It should be implemented in future releases.

Author: Euler Taveira
Discussion: https://postgr.es/m/CAB-JLwYOYwL=xtyaxkih5ctm_vm8kjkh7aaitckvmch4rzr...@mail.gmail.com
---
 doc/src/sgml/ref/alter_subscription.sgml   |   5 +-
 doc/src/sgml/ref/create_subscription.sgml  |  33 +++++
 src/backend/catalog/pg_subscription.c      |   1 +
 src/backend/catalog/system_views.sql       |   4 +-
 src/backend/commands/subscriptioncmds.c    |  46 ++++++-
 src/backend/replication/logical/worker.c   |  61 +++++++++
 src/backend/utils/adt/timestamp.c          |   8 ++
 src/bin/pg_dump/pg_dump.c                  |  13 +-
 src/bin/pg_dump/pg_dump.h                  |   1 +
 src/bin/psql/describe.c                    |  11 +-
 src/include/catalog/pg_subscription.h      |   2 +
 src/include/utils/timestamp.h              |   2 +
 src/test/regress/expected/subscription.out | 141 +++++++++++++--------
 src/test/regress/sql/subscription.sql      |  20 +++
 src/test/subscription/t/030_apply_delay.pl |  76 +++++++++++
 15 files changed, 358 insertions(+), 66 deletions(-)
 create mode 100644 src/test/subscription/t/030_apply_delay.pl

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 58b78a94ea..b764f8eba8 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -204,8 +204,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, <literal>streaming</literal>, and
-      <literal>disable_on_error</literal>.
+      <literal>binary</literal>, <literal>streaming</literal>,
+      <literal>disable_on_error</literal>, and
+      <literal>min_apply_delay</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index b701752fc9..2bc8deb132 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -302,6 +302,39 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>min_apply_delay</literal> (<type>integer</type>)</term>
+        <listitem>
+         <para>
+          By default, subscriber applies changes as soon as possible. Similar
+          to the physical replication feature
+          (<xref linkend="guc-recovery-min-apply-delay"/>), it may be useful to
+          have a time-delayed copy of data for logical replication. This
+          parameter allows you to delay the application of changes by a
+          specified amount of time. If this value is specificed without units,
+          it is taken as milliseconds. The default is zero, adding no delay.
+         </para>
+         <para>
+          The delay occurs only after the initial table synchronization. It is
+          possible that the replication delay between publisher and subscriber
+          exceeds the value of this parameter, in which case no delay is added.
+          Note that the delay is calculated between the WAL time stamp as
+          written on publisher and the current time on the subscriber. Delays
+          in logical decoding and in transfer the transaction may reduce the
+          actual wait time. If the system clocks on publisher and subscriber
+          are not synchronized, this may lead to apply changes earlier than
+          expected. This is not a major issue because a typical setting of this
+          parameter are much larger than typical time deviations between
+          servers.
+         </para>
+         <para>
+          The delay occurs only on WAL records for transaction begins. Streamed
+          transactions do not impose a delay. It should be implemented in future
+          releases.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </para>
 
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a6304f5f81..48e6d0af70 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -70,6 +70,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->applydelay = subform->subapplydelay;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index bb1ac30cd1..d7c018b81f 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,8 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subdisableonerr, subslotname,
-              subsynccommit, subpublications)
+              substream, subtwophasestate, subdisableonerr, subapplydelay,
+              subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3922658bbc..4b9890e6e6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -46,6 +46,7 @@
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
+#include "utils/timestamp.h"
 
 /*
  * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
@@ -62,6 +63,7 @@
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
+#define SUBOPT_MIN_APPLY_DELAY		0x00000800
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -84,6 +86,7 @@ typedef struct SubOpts
 	bool		streaming;
 	bool		twophase;
 	bool		disableonerr;
+	int64		min_apply_delay;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -262,12 +265,35 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
 			opts->disableonerr = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_MIN_APPLY_DELAY) &&
+				 strcmp(defel->defname, "min_apply_delay") == 0)
+		{
+			char	   *val;
+			Interval   *interval;
+
+			if (IsSet(opts->specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MIN_APPLY_DELAY;
+			val = defGetString(defel);
+
+			interval = DatumGetIntervalP(DirectFunctionCall3(interval_in,
+																	CStringGetDatum(val),
+																	ObjectIdGetDatum(InvalidOid),
+																	Int32GetDatum(-1)));
+			opts->min_apply_delay = interval_to_ms(interval);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
 	}
 
+	if (opts->min_apply_delay < 0)
+		ereport(ERROR,
+				errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				errmsg("option \"%s\" must not be negative", "min_apply_delay"));
+
 	/*
 	 * We've been explicitly asked to not connect, that requires some
 	 * additional processing.
@@ -404,7 +430,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_MIN_APPLY_DELAY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -479,6 +505,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
+	values[Anum_pg_subscription_subapplydelay - 1] = Int64GetDatum(opts.min_apply_delay);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -935,6 +962,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subdisableonerr - 1]
 						= true;
 				}
+				if (IsSet(opts.specified_opts, SUBOPT_MIN_APPLY_DELAY))
+				{
+					values[Anum_pg_subscription_subapplydelay - 1] =
+						Int64GetDatum(opts.min_apply_delay);
+					replaces[Anum_pg_subscription_subapplydelay - 1] = true;
+				}
 
 				update_tuple = true;
 				break;
@@ -958,6 +991,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				if (opts.enabled)
 					ApplyLauncherWakeupAtCommit();
 
+				/*
+				 * If this subscription has been disabled and it has an apply
+				 * delay set, wake up the logical replication worker to finish
+				 * it as soon as possible.
+				 */
+				if (!opts.enabled && sub->applydelay > 0)
+				{
+					elog(DEBUG1, "subscription has been disabled");
+					logicalrep_worker_wakeup(sub->oid, InvalidOid);
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 03e069c7cd..241810c62d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -250,6 +250,7 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 
 Subscription *MySubscription = NULL;
 bool		MySubscriptionValid = false;
+TimestampTz	MySubscriptionMinApplyDelayUntil = 0;
 
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
@@ -307,6 +308,8 @@ static void maybe_reread_subscription(void);
 
 static void DisableSubscriptionAndExit(void);
 
+static void apply_delay(TimestampTz ts);
+
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
@@ -782,6 +785,58 @@ slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot,
 	ExecStoreVirtualTuple(slot);
 }
 
+/*
+ * Apply the informed delay for the transaction.
+ *
+ * A regular transaction uses the commit time to calculate the delay. A
+ * prepared transaction uses the prepare time to calculate the delay (the
+ * commit time is unknown at prepare time).
+ *
+ * FIXME A streamed transaction could be delayed too but there is no timestamp
+ * in the STREAM START protocol message. Hence, if it is a streamed (regular or
+ * prepared) transaction, no delay is applied.
+ */
+static void
+apply_delay(TimestampTz ts)
+{
+	/* nothing to do if no delay set */
+	if (MySubscription->applydelay <= 0)
+		return;
+
+	/* set apply delay */
+	MySubscriptionMinApplyDelayUntil = TimestampTzPlusMilliseconds(TimestampTzGetDatum(ts),
+												MySubscription->applydelay);
+
+	while (true)
+	{
+		int			diffms;
+
+		diffms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), MySubscriptionMinApplyDelayUntil);
+
+		elog(DEBUG2, "logical replication apply delay: %u ms", diffms);
+
+		/*
+		 * Exit without arming the latch if it's already past time to apply
+		 * this transaction.
+		 */
+		if (diffms <= 0)
+			break;
+
+		WaitLatch(MyLatch,
+				  WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+				  diffms,
+				  WAIT_EVENT_RECOVERY_APPLY_DELAY);
+		ResetLatch(MyLatch);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/*
+	 * Delay applied. Reset state.
+	 */
+	MySubscriptionMinApplyDelayUntil = 0;
+}
+
 /*
  * Handle BEGIN message.
  */
@@ -793,6 +848,9 @@ apply_handle_begin(StringInfo s)
 	logicalrep_read_begin(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
+	/* Should we delay the current transaction? */
+	apply_delay(begin_data.committime);
+
 	remote_final_lsn = begin_data.final_lsn;
 
 	in_remote_transaction = true;
@@ -845,6 +903,9 @@ apply_handle_begin_prepare(StringInfo s)
 	logicalrep_read_begin_prepare(s, &begin_data);
 	set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
+	/* Should we delay the current prepared transaction? */
+	apply_delay(begin_data.prepare_time);
+
 	remote_final_lsn = begin_data.prepare_lsn;
 
 	in_remote_transaction = true;
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index ae36ff3328..1eda3ed57c 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -2379,6 +2379,14 @@ interval_cmp_internal(Interval *interval1, Interval *interval2)
 	return int128_compare(span1, span2);
 }
 
+int64
+interval_to_ms(const Interval *interval)
+{
+	INT128		span = interval_cmp_value(interval) / 1000;
+
+	return span;
+}
+
 Datum
 interval_eq(PG_FUNCTION_ARGS)
 {
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 725cd2e4eb..c1cc5476cc 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4325,6 +4325,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_subapplydelay;
 	int			i,
 				ntups;
 
@@ -4369,11 +4370,13 @@ getSubscriptions(Archive *fout)
 	if (fout->remoteVersion >= 150000)
 		appendPQExpBufferStr(query,
 							 " s.subtwophasestate,\n"
-							 " s.subdisableonerr\n");
+							 " s.subdisableonerr,\n"
+							 " s.subapplydelay\n");
 	else
 		appendPQExpBuffer(query,
 						  " '%c' AS subtwophasestate,\n"
-						  " false AS subdisableonerr\n",
+						  " false AS subdisableonerr,\n"
+						  " 0 AS subapplydelay\n",
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
 	appendPQExpBufferStr(query,
@@ -4397,6 +4400,7 @@ getSubscriptions(Archive *fout)
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
+	i_subapplydelay = PQfnumber(res, "subapplydelay");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4426,6 +4430,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+		subinfo[i].subapplydelay =
+			strtoi64(PQgetvalue(res, i, i_subapplydelay), NULL, 10);
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4502,6 +4508,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
+	if (subinfo->subapplydelay > 0)
+		appendPQExpBuffer(query, ", min_apply_delay = '" INT64_FORMAT " ms'", subinfo->subapplydelay);
+
 	appendPQExpBufferStr(query, ");\n");
 
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 772dc0cf7a..15a05227f9 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -659,6 +659,7 @@ typedef struct _SubscriptionInfo
 	char	   *subtwophasestate;
 	char	   *subdisableonerr;
 	char	   *subsynccommit;
+	int64		subapplydelay;
 	char	   *subpublications;
 } SubscriptionInfo;
 
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 991bfc1546..cd387ec62c 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6139,13 +6139,18 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Binary"),
 							  gettext_noop("Streaming"));
 
-		/* Two_phase and disable_on_error are only supported in v15 and higher */
+		/*
+		 * two_phase, disable_on_error and min_apply_delay are only supported
+		 * in v15 and higher.
+		 */
 		if (pset.sversion >= 150000)
 			appendPQExpBuffer(&buf,
 							  ", subtwophasestate AS \"%s\"\n"
-							  ", subdisableonerr AS \"%s\"\n",
+							  ", subdisableonerr AS \"%s\"\n"
+							  ", subapplydelay AS \"%s\"\n",
 							  gettext_noop("Two phase commit"),
-							  gettext_noop("Disable on error"));
+							  gettext_noop("Disable on error"),
+							  gettext_noop("Apply delay"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index e2befaf351..e8840a7d14 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -69,6 +69,7 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		subdisableonerr;	/* True if a worker error should cause the
 									 * subscription to be disabled */
+	int64		subapplydelay;		/* Replication apply delay */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
@@ -109,6 +110,7 @@ typedef struct Subscription
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
 								 * occurs */
+	int64		applydelay;		/* Replication apply delay */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index c1a74f8e2b..58a6e6b6da 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -78,6 +78,8 @@ extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 									   TimestampTz stop_time,
 									   int msec);
 
+extern int64 interval_to_ms(const Interval *interval);
+
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
 extern pg_time_t timestamptz_to_time_t(TimestampTz t);
 
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index ad8003fae1..aa1fdc8a11 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -94,10 +94,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                                   List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2
+                                                                                          List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+-------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                |           0 | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -129,10 +129,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                     List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2
+                                                                                            List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+-------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                |           0 | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +165,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +188,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +215,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +233,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +270,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +282,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +294,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -309,18 +309,47 @@ ERROR:  disable_on_error requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                |           0 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                               List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                |           0 | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+ERROR:  invalid input syntax for type interval: "foo"
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+ERROR:  option "min_apply_delay" must not be negative
+-- success -- 123 ms
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                |      123000 | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- success -- interval is converted into ms and stored as integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = '4h 27min 35s');
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                                      List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Apply delay | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+-------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                |    16055000 | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index a7c15b1daf..9b282b8263 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -243,6 +243,26 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail -- min_apply_delay must be a non-negative integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = foo);
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = -1);
+
+-- success -- 123 ms
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = 123);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
+-- success -- interval is converted into ms and stored as integer
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, min_apply_delay = '4h 27min 35s');
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/030_apply_delay.pl b/src/test/subscription/t/030_apply_delay.pl
new file mode 100644
index 0000000000..25433c45e9
--- /dev/null
+++ b/src/test/subscription/t/030_apply_delay.pl
@@ -0,0 +1,76 @@
+
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Test replication apply delay
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	"log_min_messages = debug2");
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (min_apply_delay = '2s')"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(2|1|2), 'check initial data was copied to subscriber');
+
+# new row to trigger apply delay
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab VALUES (3, 'baz')");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM test_tab");
+is($result, qq(3|1|3), 'check the new row was applied to subscriber');
+
+my $logfile = slurp_file($node_subscriber->logfile());
+ok( $logfile =~
+	  qr/logical replication apply delay/,
+	'check if replication apply delay is triggered');
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();
-- 
2.30.2

Reply via email to