From 47f0fb869551667ccb2de3e9155a6adba9c11fa1 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Tue, 9 Sep 2025 11:03:36 +0800
Subject: [PATCH v72] Allow conflict-relevant data retention to resume

This commit enables automatic recovery of conflict-relevant data retention for a
subscription. If the retention duration for a subscription previously exceeded
the max_retention_duration and caused retention to stop, the retention can
resume once the duration falls within the acceptable limits.
---
 doc/src/sgml/ref/create_subscription.sgml  |   9 +-
 src/backend/replication/logical/launcher.c |  49 +++-
 src/backend/replication/logical/worker.c   | 262 ++++++++++++++++++---
 src/include/replication/worker_internal.h  |   6 +
 src/test/subscription/t/035_conflicts.pl   |  27 +++
 5 files changed, 302 insertions(+), 51 deletions(-)

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index fc314437311..ed82cf1809e 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -538,10 +538,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           <literal>retain_dead_tuples</literal> is enabled, confirm that the
           retention duration has exceeded the
           <literal>max_retention_duration</literal> set within the corresponding
-          subscription. The retention will not be automatically resumed unless a
-          new subscription is created with <literal>retain_dead_tuples =
-          true</literal>, or the user manually re-enables
-          <literal>retain_dead_tuples</literal>.
+          subscription. The retention will automatically resume when at least one
+          apply worker confirms that the retention duration is within the
+          specified limit, or when a new subscription is created with
+          <literal>retain_dead_tuples = true</literal>. Alternatively, retention
+          can be manually resumed by re-enabling <literal>retain_dead_tuples</literal>.
          </para>
          <para>
           Note that overall retention will not stop if other subscriptions that
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index add2e2e066c..7bccc2ee796 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -101,7 +101,9 @@ static int	logicalrep_pa_worker_count(Oid subid);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
-static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
+static void compute_min_nonremovable_xid(LogicalRepWorker *worker,
+										 bool can_update_xmin,
+										 TransactionId *xmin);
 static bool acquire_conflict_slot_if_exists(void);
 static void update_conflict_slot_xmin(TransactionId new_xmin);
 static void init_conflict_slot_xmin(void);
@@ -468,6 +470,7 @@ retry:
 	worker->oldest_nonremovable_xid = retain_dead_tuples
 		? MyReplicationSlot->data.xmin
 		: InvalidTransactionId;
+	worker->wait_for_initial_xid = false;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -1270,10 +1273,8 @@ ApplyLauncherMain(Datum main_arg)
 				 * required for conflict detection among all running apply
 				 * workers.
 				 */
-				if (sub->retaindeadtuples &&
-					sub->retentionactive &&
-					can_update_xmin)
-					compute_min_nonremovable_xid(w, &xmin);
+				if (sub->retaindeadtuples && sub->retentionactive)
+					compute_min_nonremovable_xid(w, can_update_xmin, &xmin);
 
 				/* worker is running already */
 				continue;
@@ -1382,11 +1383,16 @@ ApplyLauncherMain(Datum main_arg)
  * Determine the minimum non-removable transaction ID across all apply workers
  * for subscriptions that have retain_dead_tuples enabled. Store the result
  * in *xmin.
+ *
+ * Additionally, if an apply worker has an invalid XID and is requesting to
+ * resume retention, assign the slot's xmin value to it.
  */
 static void
-compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
+compute_min_nonremovable_xid(LogicalRepWorker *worker, bool can_update_xmin,
+							 TransactionId *xmin)
 {
 	TransactionId nonremovable_xid;
+	bool		wait_for_xid;
 
 	Assert(worker != NULL);
 
@@ -1398,16 +1404,43 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
 
 	SpinLockAcquire(&worker->relmutex);
 	nonremovable_xid = worker->oldest_nonremovable_xid;
+	wait_for_xid = worker->wait_for_initial_xid;
 	SpinLockRelease(&worker->relmutex);
 
 	/*
-	 * Return if the apply worker has stopped retention concurrently.
+	 * Assign slot.xmin to the apply worker's oldest_nonremovable_xid if
+	 * requested. This ensures the apply worker continues to maintain the
+	 * oldest_nonremovable_xid (see resume_conflict_info_retention).
+	 */
+	if (wait_for_xid)
+	{
+		Assert(!TransactionIdIsValid(nonremovable_xid) &&
+			   TransactionIdIsValid(MyReplicationSlot->data.xmin));
+
+		nonremovable_xid = MyReplicationSlot->data.xmin;
+
+		SpinLockAcquire(&worker->relmutex);
+		worker->oldest_nonremovable_xid = nonremovable_xid;
+		SpinLockRelease(&worker->relmutex);
+
+		/* Notify the apply worker to start the next cycle of management */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		logicalrep_worker_wakeup_ptr(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	/*
+	 * Return if the apply worker has stopped retention concurrently and has
+	 * not yet resumed.
 	 *
 	 * Although this function is invoked only when retentionactive is true,
 	 * the apply worker might stop retention after the launcher fetches the
 	 * retentionactive flag.
 	 */
-	if (!TransactionIdIsValid(nonremovable_xid))
+	else if (!TransactionIdIsValid(nonremovable_xid))
+		return;
+
+	if (!can_update_xmin)
 		return;
 
 	if (!TransactionIdIsValid(*xmin) ||
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ee6ac22329f..c5e9b1bd4a1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -181,6 +181,19 @@
  *   pg_subscription.subretentionactive is updated to false within a new
  *   transaction, and oldest_nonremovable_xid is set to InvalidTransactionId.
  *
+ * - RDT_RESUME_CONFLICT_INFO_RETENTION:
+ *   This phase is required only when max_retention_duration is defined. We
+ *   enter this phase if the retention was previously stopped, and the time
+ *   required to advance the non-removable transaction ID in the
+ *   RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
+ *   (or if max_retention_duration is set to 0). During this phase,
+ *   pg_subscription.subretentionactive is updated to true within a new
+ *   transaction, and we wait for the launcher to initialize the
+ *   oldest_nonremovable_xid before proceeding to RDT_GET_CANDIDATE_XID phase.
+ *   Note that the state could transition to RDT_RESUME_CONFLICT_INFO_RETENTION
+ *   at any phase if the retention has been stopped, but max_retention_duration
+ *   is now set to 0.
+ *
  * The overall state progression is: GET_CANDIDATE_XID ->
  * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
  * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
@@ -381,7 +394,8 @@ typedef enum
 	RDT_REQUEST_PUBLISHER_STATUS,
 	RDT_WAIT_FOR_PUBLISHER_STATUS,
 	RDT_WAIT_FOR_LOCAL_FLUSH,
-	RDT_STOP_CONFLICT_INFO_RETENTION
+	RDT_STOP_CONFLICT_INFO_RETENTION,
+	RDT_RESUME_CONFLICT_INFO_RETENTION,
 } RetainDeadTuplesPhase;
 
 /*
@@ -568,6 +582,10 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
 static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
 static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
 static void stop_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static bool should_resume_retention_immediately(RetainDeadTuplesData *rdt_data,
+												bool status_received);
+static void resume_conflict_info_retention(RetainDeadTuplesData *rdt_data);
+static bool update_retention_status(bool active);
 static void reset_retention_data_fields(RetainDeadTuplesData *rdt_data);
 static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
 										bool new_xid_found);
@@ -4345,6 +4363,13 @@ maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
 	if (!can_advance_nonremovable_xid(rdt_data))
 		return;
 
+	/*
+	 * Resume retention immediately if required. (See
+	 * should_resume_retention_immediately() for details).
+	 */
+	if (should_resume_retention_immediately(rdt_data, status_received))
+		rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
+
 	process_rdt_phase_transition(rdt_data, status_received);
 }
 
@@ -4367,10 +4392,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
 	if (!MySubscription->retaindeadtuples)
 		return false;
 
-	/* No need to advance if we have already stopped retaining */
-	if (!MySubscription->retentionactive)
-		return false;
-
 	return true;
 }
 
@@ -4399,6 +4420,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
 		case RDT_STOP_CONFLICT_INFO_RETENTION:
 			stop_conflict_info_retention(rdt_data);
 			break;
+		case RDT_RESUME_CONFLICT_INFO_RETENTION:
+			resume_conflict_info_retention(rdt_data);
+			break;
 	}
 }
 
@@ -4672,6 +4696,22 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	if (last_flushpos < rdt_data->remote_lsn)
 		return;
 
+	/*
+	 * Reaching this point implies should_stop_conflict_info_retention()
+	 * returned false earlier, indicating that the most recent duration for
+	 * advancing the non-removable transaction ID is within the
+	 * max_retention_duration.
+	 *
+	 * Therefore, if conflict info retention was previously halted due to a
+	 * timeout, proceed to resume retention now.
+	 */
+	if (!MySubscription->retentionactive)
+	{
+		rdt_data->phase = RDT_RESUME_CONFLICT_INFO_RETENTION;
+		process_rdt_phase_transition(rdt_data, false);
+		return;
+	}
+
 	/*
 	 * Reaching here means the remote WAL position has been received, and all
 	 * transactions up to that position on the publisher have been applied and
@@ -4701,10 +4741,6 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
  * If retention should be stopped, transition to the
  * RDT_STOP_CONFLICT_INFO_RETENTION phase and return true. Otherwise, return
  * false.
- *
- * Note: Retention won't be resumed automatically. The user must manually
- * disable retain_dead_tuples and re-enable it after confirming that the
- * replication slot maintained by the launcher has been dropped.
  */
 static bool
 should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
@@ -4735,10 +4771,20 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
 									rdt_data->table_sync_wait_time))
 		return false;
 
-	rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
+	/* Stop retention if not yet */
+	if (MySubscription->retentionactive)
+	{
+		rdt_data->phase = RDT_STOP_CONFLICT_INFO_RETENTION;
+		process_rdt_phase_transition(rdt_data, false);
+		return true;
+	}
 
-	/* process the next phase */
-	process_rdt_phase_transition(rdt_data, false);
+	/*
+	 * If retention has been stopped, reset to the initial phase to retry all
+	 * phases. This is required to recalculate the current wait time and
+	 * resume retention if the time falls within max_retention_duration.
+	 */
+	reset_retention_data_fields(rdt_data);
 
 	return true;
 }
@@ -4748,6 +4794,146 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
  */
 static void
 stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+	/*
+	 * Return if unable to update subretentionactive (see
+	 * update_retention_status).
+	 */
+	if (!update_retention_status(false))
+		return;
+
+	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+	ereport(LOG,
+			errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
+				   MySubscription->name),
+			errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
+					  MySubscription->maxretention));
+
+	reset_retention_data_fields(rdt_data);
+}
+
+/*
+ * Check whether retention should be resumed immediately if it has been
+ * previously stopped, but max_retention_duration is now set to 0.
+ */
+static bool
+should_resume_retention_immediately(RetainDeadTuplesData *rdt_data, bool status_received)
+{
+	/* Return false if retention is already being resumed */
+	if (rdt_data->phase == RDT_RESUME_CONFLICT_INFO_RETENTION)
+		return false;
+
+	/* Return false if max_retention_duration is not 0 */
+	if (MySubscription->maxretention)
+		return false;
+
+	/*
+	 * Do not resume when waiting for publisher status, as doing so may result
+	 * in the message being processed after the data and phase have been
+	 * reset, potentially causing it to be mistakenly identified as a new
+	 * message. This could lead to the premature advancement of
+	 * oldest_nonremovable_xid.
+	 */
+	if (rdt_data->phase == RDT_WAIT_FOR_PUBLISHER_STATUS &&
+		!status_received)
+		return false;
+
+	/*
+	 * Resume retention if we are in the process of stopping or have already
+	 * stopped retention.
+	 */
+	return rdt_data->phase == RDT_STOP_CONFLICT_INFO_RETENTION ||
+		!MySubscription->retentionactive;
+}
+
+/*
+ * Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase.
+ */
+static void
+resume_conflict_info_retention(RetainDeadTuplesData *rdt_data)
+{
+	TransactionId nonremovable_xid;
+
+	/* Update the pg_subscription.retentionactive if not yet */
+	if (!MySubscription->retentionactive)
+	{
+		/*
+		 * Return if unable to update subretentionactive (see
+		 * update_retention_status).
+		 */
+		if (!update_retention_status(true))
+			return;
+
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		MyLogicalRepWorker->wait_for_initial_xid = true;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+		ereport(LOG,
+				errmsg("logical replication worker for subscription \"%s\" will resume retaining the information for detecting conflicts",
+					   MySubscription->name),
+				MySubscription->maxretention
+				? errdetail("Retention of information used for conflict detection is now within the max_retention_duration of %u ms.",
+							MySubscription->maxretention)
+				: errdetail("Retention of information used for conflict detection is now indefinite."));
+	}
+
+	Assert(MyLogicalRepWorker->wait_for_initial_xid);
+
+	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	nonremovable_xid = MyLogicalRepWorker->oldest_nonremovable_xid;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+	/*
+	 * Return if the launcher has not initialized oldest_nonremovable_xid.
+	 *
+	 * It might seem feasible to directly check the conflict detection
+	 * slot.xmin instead of relying on the launcher to assign the worker's
+	 * oldest_nonremovable_xid; however, that could lead to a race condition
+	 * where slot.xmin is set to InvalidTransactionId immediately after the
+	 * check. In such cases, oldest_nonremovable_xid would no longer be
+	 * protected by a replication slot and could become unreliable if a
+	 * wraparound occurs.
+	 *
+	 * XXX An alternative could be directly restarting the worker to ensure
+	 * the launcher initializes oldest_nonremovable_xid prior to starting.
+	 * However, restarting may not be preferable if initialization can be
+	 * managed on-the-fly.
+	 */
+	if (!TransactionIdIsValid(nonremovable_xid))
+		return;
+
+	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	MyLogicalRepWorker->wait_for_initial_xid = false;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+	/*
+	 * Proceed to the next phase if either the launcher has initialized
+	 * slot.xmin and assigned it to oldest_nonremovable_xid, or retention has
+	 * not been stopped yet. The latter situation arises when transitioning
+	 * from the RDT_STOP_CONFLICT_INFO_RETENTION phase but subretentionactive
+	 * has not been updated due to the inability to start a new transaction
+	 * (see stop_conflict_info_retention).
+	 */
+	Assert(MySubscription->retentionactive);
+
+	reset_retention_data_fields(rdt_data);
+
+	/* process the next phase */
+	process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Update pg_subscription.subretentionactive to the given value within a new
+ * transaction.
+ *
+ * Returns true upon successful update; however, if currently within an active
+ * transaction, skip the update and return false.
+ */
+static bool
+update_retention_status(bool active)
 {
 	/*
 	 * Do not update the catalog during an active transaction. The transaction
@@ -4755,7 +4941,7 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
 	 * rollback of catalog updates if the application fails subsequently.
 	 */
 	if (IsTransactionState())
-		return;
+		return false;
 
 	StartTransactionCommand();
 
@@ -4765,26 +4951,18 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
 	 */
 	PushActiveSnapshot(GetTransactionSnapshot());
 
-	/* Set pg_subscription.subretentionactive to false */
-	UpdateDeadTupleRetentionStatus(MySubscription->oid, false);
+	/* Update pg_subscription.subretentionactive */
+	UpdateDeadTupleRetentionStatus(MySubscription->oid, active);
 
 	PopActiveSnapshot();
 	CommitTransactionCommand();
 
-	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-	MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
-	SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-	ereport(LOG,
-			errmsg("logical replication worker for subscription \"%s\" has stopped retaining the information for detecting conflicts",
-				   MySubscription->name),
-			errdetail("Retention of information used for conflict detection has exceeded max_retention_duration of %u ms.",
-					  MySubscription->maxretention));
-
-	/* Notify launcher to update the conflict slot */
+	/* Notify launcher to update the xmin of the conflict slot */
 	ApplyLauncherWakeup();
 
-	reset_retention_data_fields(rdt_data);
+	MySubscription->retentionactive = active;
+
+	return true;
 }
 
 /*
@@ -4809,19 +4987,20 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
 /*
  * Adjust the interval for advancing non-removable transaction IDs.
  *
- * If there is no activity on the node, we progressively double the interval
- * used to advance non-removable transaction ID. This helps conserve CPU
- * and network resources when there's little benefit to frequent updates.
+ * If there is no activity on the node or retention has been stopped, we
+ * progressively double the interval used to advance non-removable transaction
+ * ID. This helps conserve CPU and network resources when there's little benefit
+ * to frequent updates.
  *
  * The interval is capped by the lowest of the following:
  * - wal_receiver_status_interval (if set),
  * - a default maximum of 3 minutes,
- * - max_retention_duration.
+ * - max_retention_duration (if retention is active).
  *
- * This ensures the interval never exceeds the retention boundary, even if
- * other limits are higher. Once activity resumes on the node, the interval
- * is reset to lesser of 100ms and max_retention_duration, allowing timely
- * advancement of non-removable transaction ID.
+ * This ensures the interval never exceeds the retention boundary, even if other
+ * limits are higher. Once activity resumes on the node and the retention is
+ * active, the interval is reset to lesser of 100ms and max_retention_duration,
+ * allowing timely advancement of non-removable transaction ID.
  *
  * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
  * consider the other interval or a separate GUC if the need arises.
@@ -4829,7 +5008,8 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
 static void
 adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
 {
-	if (!new_xid_found && rdt_data->xid_advance_interval)
+	if (rdt_data->xid_advance_interval &&
+		(!new_xid_found || !MySubscription->retentionactive))
 	{
 		int			max_interval = wal_receiver_status_interval
 			? wal_receiver_status_interval * 1000
@@ -4851,9 +5031,13 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
 		rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
 	}
 
-	/* Ensure the wait time remains within the maximum limit */
-	rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
-										 MySubscription->maxretention);
+	/*
+	 * Ensure the wait time remains within the maximum limit when retention is
+	 * active.
+	 */
+	if (MySubscription->retentionactive)
+		rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval,
+											 MySubscription->maxretention);
 }
 
 /*
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index de003802612..d776949a04e 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -100,6 +100,12 @@ typedef struct LogicalRepWorker
 	 */
 	TransactionId oldest_nonremovable_xid;
 
+	/*
+	 * Indicates whether the apply worker is resuming retention and is waiting
+	 * for the launcher to initialize oldest_nonremovable_xid.
+	 */
+	bool		wait_for_initial_xid;
+
 	/* Stats. */
 	XLogRecPtr	last_lsn;
 	TimestampTz last_send_time;
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index db0d5b464e8..947ea131c4d 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -628,6 +628,33 @@ $node_B->safe_psql('postgres',
 $node_B->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
 $node_B->reload;
 
+###############################################################################
+# Check that dead tuple retention resumes when the max_retention_duration is set
+# 0.
+###############################################################################
+
+$log_offset = -s $node_A->logfile;
+
+# Set max_retention_duration to 0
+$node_A->safe_psql('postgres',
+	"ALTER SUBSCRIPTION $subname_AB SET (max_retention_duration = 0);");
+
+# Confirm that the retention resumes
+$node_A->wait_for_log(
+	qr/logical replication worker for subscription "tap_sub_a_b" will resume retaining the information for detecting conflicts
+.*DETAIL:.* Retention of information used for conflict detection is now indefinite.*/,
+	$log_offset);
+
+ok( $node_A->poll_query_until(
+		'postgres',
+		"SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+	),
+	"the xmin value of slot 'pg_conflict_detection' is valid on Node A");
+
+$result = $node_A->safe_psql('postgres',
+	"SELECT subretentionactive FROM pg_subscription WHERE subname='$subname_AB';");
+is($result, qq(t), 'retention is active');
+
 ###############################################################################
 # Check that the replication slot pg_conflict_detection is dropped after
 # removing all the subscriptions.
-- 
2.51.0.windows.1

