From dc6733afbf27c4d1d9baab180e7fe4a60dd88028 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 30 May 2025 12:15:12 +0800
Subject: [PATCH] Improve initial slot synchronization in
 pg_sync_replication_slots()

During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.

The SQL API would fail immediately when synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.

To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. We deliberately avoid retaining temporary
slot as with the slotsync worker, because we could not predict when (or if) the
SQL function might be executed again, and the creating session might persist
after promotion. Without automatic cleanup, this could lead to temporary slots
being retained for a longer time.
---
 src/backend/replication/logical/slotsync.c    | 189 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 2 files changed, 180 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 656e66e0ae0..43f2d6b579f 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -146,6 +146,7 @@ typedef struct RemoteSlot
 	ReplicationSlotInvalidationCause invalidated;
 } RemoteSlot;
 
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
 static void slotsync_failure_callback(int code, Datum arg);
 static void update_synced_slots_inactive_since(void);
 
@@ -549,6 +550,160 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
 	}
 }
 
+/*
+ * Wait for remote slot to pass locally reserved position.
+ *
+ * Return true if remote_slot could catch up with the locally reserved
+ * position. Return false in all other cases.
+ */
+static bool
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
+{
+#define WAIT_OUTPUT_COLUMN_COUNT 4
+
+	StringInfoData cmd;
+
+	Assert(!AmLogicalSlotSyncWorkerProcess());
+
+	ereport(LOG,
+			errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+				   " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+				   remote_slot->name,
+				   LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+				   remote_slot->catalog_xmin,
+				   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+				   MyReplicationSlot->data.catalog_xmin));
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd,
+					 "SELECT invalidation_reason IS NOT NULL, restart_lsn,"
+					 " confirmed_flush_lsn, catalog_xmin"
+					 " FROM pg_catalog.pg_replication_slots"
+					 " WHERE slot_name = %s",
+					 quote_literal_cstr(remote_slot->name));
+
+	for (;;)
+	{
+		bool		new_invalidated;
+		XLogRecPtr	new_restart_lsn;
+		XLogRecPtr	new_confirmed_lsn;
+		TransactionId new_catalog_xmin;
+		WalRcvExecResult *res;
+		TupleTableSlot *tupslot;
+		Datum		d;
+		int			rc;
+		int			col = 0;
+		bool		isnull;
+		Oid			slotRow[WAIT_OUTPUT_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, XIDOID};
+
+		/* Handle any termination request if any */
+		ProcessSlotSyncInterrupts(wrconn);
+
+		res = walrcv_exec(wrconn, cmd.data, WAIT_OUTPUT_COLUMN_COUNT, slotRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					errmsg("could not fetch slot \"%s\" info from the"
+						   " primary server: %s",
+						   remote_slot->name, res->err));
+
+		tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+		{
+			ereport(WARNING,
+					errmsg("aborting initial sync for slot \"%s\"",
+						   remote_slot->name),
+					errdetail("This slot was not found on the primary server."));
+
+			pfree(cmd.data);
+			walrcv_clear_result(res);
+
+			return false;
+		}
+
+		/*
+		 * It is possible to get null value for restart_lsn if the slot is
+		 * invalidated on the primary server, so handle accordingly.
+		 */
+		new_invalidated = DatumGetBool(slot_getattr(tupslot, ++col, &isnull));
+		Assert(!isnull);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		new_restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
+		if (new_invalidated || XLogRecPtrIsInvalid(new_restart_lsn))
+		{
+			/*
+			 * The slot won't be persisted by the caller; it will be cleaned up
+			 * at the end of synchronization.
+			 */
+			ereport(WARNING,
+					errmsg("aborting initial sync for slot \"%s\"",
+						   remote_slot->name),
+					errdetail("This slot was invalidated on the primary server."));
+
+			pfree(cmd.data);
+			ExecClearTuple(tupslot);
+			walrcv_clear_result(res);
+
+			return false;
+		}
+
+		/*
+		 * It is possible to get null values for confirmed_lsn and
+		 * catalog_xmin if on the primary server the slot is just created with
+		 * a valid restart_lsn and slot-sync worker has fetched the slot
+		 * before the primary server could set valid confirmed_lsn and
+		 * catalog_xmin.
+		 */
+		d = slot_getattr(tupslot, ++col, &isnull);
+		new_confirmed_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		new_catalog_xmin = isnull ? InvalidXLogRecPtr : DatumGetTransactionId(d);
+
+		ExecClearTuple(tupslot);
+		walrcv_clear_result(res);
+
+		if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn &&
+			!XLogRecPtrIsInvalid(new_confirmed_lsn) &&
+			TransactionIdFollowsOrEquals(new_catalog_xmin,
+										 MyReplicationSlot->data.catalog_xmin))
+		{
+			/* Update new values in remote_slot */
+			remote_slot->restart_lsn = new_restart_lsn;
+			remote_slot->confirmed_lsn = new_confirmed_lsn;
+			remote_slot->catalog_xmin = new_catalog_xmin;
+
+			ereport(LOG,
+					errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)"
+						   " and catalog xmin (%u) has now passed local slot LSN"
+						   " (%X/%X) and catalog xmin (%u)",
+						   remote_slot->name,
+						   LSN_FORMAT_ARGS(new_restart_lsn),
+						   new_catalog_xmin,
+						   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+						   MyReplicationSlot->data.catalog_xmin));
+
+			pfree(cmd.data);
+
+			return true;
+		}
+
+		/*
+		 * XXX: Is waiting for 2 seconds before retrying enough or more or
+		 * less?
+		 */
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					   2000L,
+					   WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+	}
+}
+
 /*
  * If the remote restart_lsn and catalog_xmin have caught up with the
  * local ones, then update the LSNs and persist the local synced slot for
@@ -558,7 +713,8 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
  * false.
  */
 static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(WalReceiverConn *wrconn,
+									 RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 	bool		found_consistent_snapshot = false;
@@ -577,12 +733,22 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/*
 		 * The remote slot didn't catch up to locally reserved position.
 		 *
-		 * We do not drop the slot because the restart_lsn can be ahead of the
-		 * current location when recreating the slot in the next cycle. It may
-		 * take more time to create such a slot. Therefore, we keep this slot
-		 * and attempt the synchronization in the next cycle.
+		 * For the slotsync worker, we do not drop the slot because the
+		 * restart_lsn can be ahead of the current location when recreating the
+		 * slot in the next cycle. It may take more time to create such a slot.
+		 * Therefore, we keep this slot and attempt the synchronization in the
+		 * next cycle.
+		 *
+		 * For SQL API synchronization, we wait for the remote slot to catch up
+		 * rather than leaving temporary slots. This is because we could not
+		 * predict when (or if) the SQL function might be executed again, and
+		 * the creating session might persist after promotion. Without
+		 * automatic cleanup, this could lead to temporary slots being retained
+		 * for a longer time.
 		 */
-		return false;
+		if (AmLogicalSlotSyncWorkerProcess() ||
+			!wait_for_primary_slot_catchup(wrconn, remote_slot))
+			return false;
 	}
 
 	/*
@@ -622,7 +788,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
  * Returns TRUE if the local slot is updated.
  */
 static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
+					 Oid remote_dbid)
 {
 	ReplicationSlot *slot;
 	XLogRecPtr	latestFlushPtr;
@@ -715,7 +882,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/* Slot not ready yet, let's attempt to make it sync-ready now. */
 		if (slot->data.persistency == RS_TEMPORARY)
 		{
-			slot_updated = update_and_persist_local_synced_slot(remote_slot,
+			slot_updated = update_and_persist_local_synced_slot(wrconn,
+																remote_slot,
 																remote_dbid);
 		}
 
@@ -785,7 +953,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
 
-		update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+		update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid);
 
 		slot_updated = true;
 	}
@@ -927,7 +1095,8 @@ synchronize_slots(WalReceiverConn *wrconn)
 		 */
 		LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 
-		some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+		some_slot_updated |= synchronize_one_slot(wrconn, remote_slot,
+												  remote_dbid);
 
 		UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 	}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da68312b5f..ba82cc186be 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -64,6 +64,7 @@ LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication paralle
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
 REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP	"Waiting for the primary to catch-up."
 SYSLOGGER_MAIN	"Waiting in main loop of syslogger process."
 WAL_RECEIVER_MAIN	"Waiting in main loop of WAL receiver process."
 WAL_SENDER_MAIN	"Waiting in main loop of WAL sender process."
-- 
2.30.0.windows.2

