From c4f254488222c811712d0005365cb116dea3268b Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Wed, 23 Jul 2025 14:41:37 +0800
Subject: [PATCH v52 3/3] Re-create the replication slot if the conflict
 retention duration reduced

The patch allows the launcher to drop and re-create the invalidated slot, if at
least one apply worker has confirmed that the retention duration is now within
the max_conflict_retention_duration.
---
 doc/src/sgml/config.sgml                   |  5 +-
 src/backend/replication/logical/launcher.c | 15 ++++-
 src/backend/replication/logical/worker.c   | 64 +++++++++++++++-------
 src/backend/utils/adt/pg_upgrade_support.c |  2 +-
 src/include/replication/logicallauncher.h  |  2 +-
 5 files changed, 63 insertions(+), 25 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 0d6616857e7..e616381e44a 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5425,7 +5425,10 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
         <literal>max_conflict_retention_duration</literal>. If the replication
         slot is invalidated, you can disable
         <literal>retain_dead_tuples</literal> and re-enable it after
-        confirming this replication slot has been dropped.
+        confirming this replication slot has been dropped. Alternatively, the
+        invalidated slot will be automatically dropped and re-created once the
+        apply worker confirms that the retention duration is within the
+        specified limit.
        </para>
        <para>
         This option is effective only if a subscription with
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index c0e785b1a8c..50d52cbe77f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1252,7 +1252,7 @@ ApplyLauncherMain(Datum main_arg)
 				 * applying remote changes that occurred before the
 				 * subscription was enabled.
 				 */
-				CreateConflictDetectionSlot();
+				CreateConflictDetectionSlot(!sub->enabled);
 			}
 
 			if (!sub->enabled)
@@ -1289,6 +1289,12 @@ ApplyLauncherMain(Datum main_arg)
 			{
 				can_advance_xmin = false;
 				stop_retention = false;
+
+				/*
+				 * Re-create the slot if it has been invalidated, as retention
+				 * has now resumed.
+				 */
+				CreateConflictDetectionSlot(true);
 			}
 
 			/*
@@ -1515,10 +1521,15 @@ invalidate_conflict_slot(void)
  * conflict detection, if not yet.
  */
 void
-CreateConflictDetectionSlot(void)
+CreateConflictDetectionSlot(bool recreate_if_invalid)
 {
 	TransactionId xmin_horizon;
 
+	/* Drop the invalidated slot and re-create it if requested */
+	if (recreate_if_invalid && MyReplicationSlot &&
+		MyReplicationSlot->data.invalidated == RS_INVAL_CONFLICT_RETENTION_DURATION)
+		ReplicationSlotDropAcquired();
+
 	/* Exit early, if the replication slot is already created and acquired */
 	if (MyReplicationSlot)
 		return;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cc0309a6f3c..f5a9b046f05 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -586,6 +586,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 									   LogicalRepTupleData *newtup,
 									   CmdType operation);
 
+static void apply_worker_exit(void);
+
 /* Functions for skipping changes */
 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
 static void stop_skipping_changes(void);
@@ -4210,10 +4212,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
 	if (!MySubscription->retaindeadtuples)
 		return false;
 
-	/* No need to advance if we have already stopped retaining */
-	if (MyLogicalRepWorker->stop_conflict_info_retention)
-		return false;
-
 	return true;
 }
 
@@ -4495,6 +4493,25 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	if (last_flushpos < rdt_data->remote_lsn)
 		return;
 
+	/*
+	 * If conflict info retention was previously stopped due to a timeout, and
+	 * the time required to advance the non-removable transaction ID has now
+	 * decreased to within acceptable limits, log a message and exit. This
+	 * allows the launcher to recreate the replication slot prior to
+	 * restarting the worker.
+	 */
+	if (MyLogicalRepWorker->stop_conflict_info_retention)
+	{
+		ereport(LOG,
+				errmsg("logical replication worker for subscription \"%s\" will restart to resume retaining conflict information",
+					   MySubscription->name),
+				errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.",
+						  LSN_FORMAT_ARGS(rdt_data->remote_lsn),
+						  max_conflict_retention_duration));
+
+		apply_worker_exit();
+	}
+
 	/*
 	 * Reaching here means the remote WAL position has been received, and all
 	 * transactions up to that position on the publisher have been applied and
@@ -4502,6 +4519,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	 */
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 	MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
+	MyLogicalRepWorker->stop_conflict_info_retention = false;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
 	elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
@@ -4544,9 +4562,8 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
  * LogicalRepWorker->stop_conflict_info_retention to true, notify the launcher to
  * invalidate the slot, and return true. Return false otherwise.
  *
- * Currently, the retention will not resume automatically unless user manually
- * disables retain_dead_tuples and re-enables it after confirming that the
- * replication slot has been dropped.
+ * The retention will resume automatically if the worker has confirmed that the
+ * retention duration is now within the max_conflict_retention_duration.
  */
 static bool
 should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
@@ -4576,19 +4593,26 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
 									rdt_data->table_sync_wait_time))
 		return false;
 
-	ereport(LOG,
-			errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information",
-				   MySubscription->name),
-			errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.",
-					  max_conflict_retention_duration));
-
-	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-	MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
-	MyLogicalRepWorker->stop_conflict_info_retention = true;
-	SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-	/* Notify launcher to invalidate the conflict slot */
-	ApplyLauncherWakeup();
+	/*
+	 * Log a message and reset relevant data when the worker is about to stop
+	 * retaining conflict information.
+	 */
+	if (!MyLogicalRepWorker->stop_conflict_info_retention)
+	{
+		ereport(LOG,
+				errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information",
+					   MySubscription->name),
+				errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.",
+						  max_conflict_retention_duration));
+
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
+		MyLogicalRepWorker->stop_conflict_info_retention = true;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+		/* Notify launcher to invalidate the conflict slot */
+		ApplyLauncherWakeup();
+	}
 
 	reset_retention_data_fields(rdt_data);
 
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index a4f8b4faa90..e20fc44adda 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -423,7 +423,7 @@ binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS)
 {
 	CHECK_IS_BINARY_UPGRADE;
 
-	CreateConflictDetectionSlot();
+	CreateConflictDetectionSlot(false);
 
 	ReplicationSlotRelease();
 
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 6e3007db5f0..5052c394c8f 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -29,7 +29,7 @@ extern void ApplyLauncherWakeupAtCommit(void);
 extern void ApplyLauncherWakeup(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
-extern void CreateConflictDetectionSlot(void);
+extern void CreateConflictDetectionSlot(bool recreate_if_invalid);
 
 extern bool IsLogicalLauncher(void);
 
-- 
2.50.1.windows.1

