From 661fcb4ced929bed2dd4d90e126c0a1cb39114c0 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Thu, 31 Jul 2025 05:33:42 -0400
Subject: [PATCH v3] Improve initial slot synchronization in
 pg_sync_replication_slots()

During initial slot synchronization on a standby, the operation may fail if
required catalog rows or WALs have been removed or are at risk of removal. The
slotsync worker handles this by creating a temporary slot for initial sync and
retain it even in case of failure. It will keep retrying until the slot on the
primary has been advanced to a position where all the required data are also
available on the standby. However, pg_sync_replication_slots() had
no such protection mechanism.

The SQL API would fail immediately if synchronization requirements weren't
met. This could lead to permanent failure as the standby might continue
removing the still-required data.

To address this, we now make pg_sync_replication_slots() wait for the primary
slot to advance to a suitable position before completing synchronization and
before removing the temporary slot. Once the slot advances to a suitable
position, we retry synchronization. Additionally, if a promotion occurs on
the standby during this wait, the process exits gracefully and the
temporary slot is removed.
---
 doc/src/sgml/func.sgml                          |   4 +-
 doc/src/sgml/logicaldecoding.sgml               |  40 ++---
 src/backend/replication/logical/slotsync.c      | 220 ++++++++++++++++++++++--
 src/backend/utils/activity/wait_event_names.txt |   1 +
 4 files changed, 225 insertions(+), 40 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 74a16af..4092677 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -30034,9 +30034,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
-        Note that this function is primarily intended for testing and
-        debugging purposes and should be used with caution. Additionally,
-        this function cannot be executed if
+        Note that this function cannot be executed if
         <link linkend="guc-sync-replication-slots"><varname>
         sync_replication_slots</varname></link> is enabled and the slotsync
         worker is already running to perform the synchronization of slots.
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 593f784..edad0e9 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
    <sect2 id="logicaldecoding-replication-slots-synchronization">
     <title>Replication Slot Synchronization</title>
     <para>
-     The logical replication slots on the primary can be synchronized to
-     the hot standby by using the <literal>failover</literal> parameter of
+     The logical replication slots on the primary can be enabled for
+     synchronization to the hot standby by using the
+     <literal>failover</literal> parameter of
      <link linkend="pg-create-logical-replication-slot">
      <function>pg_create_logical_replication_slot</function></link>, or by
      using the <link linkend="sql-createsubscription-params-with-failover">
      <literal>failover</literal></link> option of
-     <command>CREATE SUBSCRIPTION</command> during slot creation.
-     Additionally, enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link> on the standby
-     is required. By enabling <link linkend="guc-sync-replication-slots">
-     <varname>sync_replication_slots</varname></link>
-     on the standby, the failover slots can be synchronized periodically in
+     <command>CREATE SUBSCRIPTION</command> during slot creation. After that,
+     synchronization can be be performed either manually by calling
+     <link linkend="pg-sync-replication-slots">
+     <function>pg_sync_replication_slots</function></link>
+     on the standby, or automatically by enabling
+     <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> on the standby.
+     When <link linkend="guc-sync-replication-slots">
+     <varname>sync_replication_slots</varname></link> is enabled
+     on the standby, the failover slots are periodically synchronized by
      the slotsync worker. For the synchronization to work, it is mandatory to
      have a physical replication slot between the primary and the standby (i.e.,
      <link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
@@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      receiving the WAL up to the latest flushed position on the primary server.
     </para>
 
-    <note>
-     <para>
-      While enabling <link linkend="guc-sync-replication-slots">
-      <varname>sync_replication_slots</varname></link> allows for automatic
-      periodic synchronization of failover slots, they can also be manually
-      synchronized using the <link linkend="pg-sync-replication-slots">
-      <function>pg_sync_replication_slots</function></link> function on the standby.
-      However, this function is primarily intended for testing and debugging and
-      should be used with caution. Unlike automatic synchronization, it does not
-      include cyclic retries, making it more prone to synchronization failures,
-      particularly during initial sync scenarios where the required WAL files
-      or catalog rows for the slot may have already been removed or are at risk
-      of being removed on the standby. In contrast, automatic synchronization
-      via <varname>sync_replication_slots</varname> provides continuous slot
-      updates, enabling seamless failover and supporting high availability.
-      Therefore, it is the recommended method for synchronizing slots.
-     </para>
-    </note>
-
     <para>
      When slot synchronization is configured as recommended,
      and the initial synchronization is performed either automatically or
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2f0c08b..3308772 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -146,6 +146,7 @@ typedef struct RemoteSlot
 	ReplicationSlotInvalidationCause invalidated;
 } RemoteSlot;
 
+static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn);
 static void slotsync_failure_callback(int code, Datum arg);
 static void update_synced_slots_inactive_since(void);
 
@@ -550,6 +551,185 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
 }
 
 /*
+ * Wait for remote slot to pass locally reserved position.
+ *
+ * Return true if remote_slot could catch up with the locally reserved
+ * position. Return false in all other cases.
+ */
+static bool
+wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
+{
+#define SLOT_QUERY_COLUMN_COUNT 4
+
+	StringInfoData cmd;
+	int			   wait_iterations = 0;
+
+	Assert(!AmLogicalSlotSyncWorkerProcess());
+
+	ereport(LOG,
+			errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+				   " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+				   remote_slot->name,
+				   LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+				   remote_slot->catalog_xmin,
+				   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+				   MyReplicationSlot->data.catalog_xmin));
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd,
+					 "SELECT invalidation_reason IS NOT NULL, restart_lsn,"
+					 " confirmed_flush_lsn, catalog_xmin"
+					 " FROM pg_catalog.pg_replication_slots"
+					 " WHERE slot_name = %s",
+					 quote_literal_cstr(remote_slot->name));
+
+	for (;;)
+	{
+		bool		new_invalidated;
+		XLogRecPtr	new_restart_lsn;
+		XLogRecPtr	new_confirmed_lsn;
+		TransactionId new_catalog_xmin;
+		WalRcvExecResult *res;
+		TupleTableSlot *tupslot;
+		Datum		d;
+		int			rc;
+		int			col = 0;
+		bool		isnull;
+		Oid			slotRow[SLOT_QUERY_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, XIDOID};
+
+		/* Handle any termination request if any */
+		ProcessSlotSyncInterrupts(wrconn);
+
+		res = walrcv_exec(wrconn, cmd.data, SLOT_QUERY_COLUMN_COUNT, slotRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					errmsg("could not fetch slot \"%s\" info from the"
+						   " primary server: %s",
+						   remote_slot->name, res->err));
+
+		tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
+		{
+			ereport(WARNING,
+					errmsg("aborting initial sync for slot \"%s\"",
+						   remote_slot->name),
+					errdetail("This slot was not found on the primary server."));
+
+			pfree(cmd.data);
+			walrcv_clear_result(res);
+
+			return false;
+		}
+
+		/*
+		 * It is possible that the slot was invalidated on the primary, if so
+		 * handle accordingly.
+		 */
+		new_invalidated = DatumGetBool(slot_getattr(tupslot, ++col, &isnull));
+		Assert(!isnull);
+
+		if (new_invalidated)
+		{
+			/*
+			 * The slot won't be persisted by the caller; it will be cleaned
+			 * up at the end of synchronization.
+			 */
+			ereport(WARNING,
+					errmsg("aborting initial sync for slot \"%s\"",
+						   remote_slot->name),
+					errdetail("This slot was invalidated on the primary server."));
+
+			pfree(cmd.data);
+			ExecClearTuple(tupslot);
+			walrcv_clear_result(res);
+
+			return false;
+		}
+
+		/* Any slot with NULL in these fields should not have made it this far */
+		d = slot_getattr(tupslot, ++col, &isnull);
+		Assert(!isnull);
+		new_restart_lsn = DatumGetLSN(d);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		Assert(!isnull);
+		new_confirmed_lsn = DatumGetLSN(d);
+
+		d = slot_getattr(tupslot, ++col, &isnull);
+		Assert(!isnull);
+		new_catalog_xmin = DatumGetTransactionId(d);
+
+		ExecClearTuple(tupslot);
+		walrcv_clear_result(res);
+
+		if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn &&
+			TransactionIdFollowsOrEquals(new_catalog_xmin,
+										 MyReplicationSlot->data.catalog_xmin))
+		{
+			/* Update new values in remote_slot */
+			remote_slot->restart_lsn = new_restart_lsn;
+			remote_slot->confirmed_lsn = new_confirmed_lsn;
+			remote_slot->catalog_xmin = new_catalog_xmin;
+
+			ereport(LOG,
+					errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)"
+						   " and catalog xmin (%u) has now passed local slot LSN"
+						   " (%X/%X) and catalog xmin (%u)",
+						   remote_slot->name,
+						   LSN_FORMAT_ARGS(new_restart_lsn),
+						   new_catalog_xmin,
+						   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+						   MyReplicationSlot->data.catalog_xmin));
+
+			pfree(cmd.data);
+
+			return true;
+		}
+
+		/*
+		 * If we've been promoted, then no point continuing.
+		 */
+		if (SlotSyncCtx->stopSignaled)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot synchronize replication slots when"
+							" standby promotion is ongoing")));
+			pfree(cmd.data);
+
+			return false;
+		}
+
+		/*
+		 * XXX: Is waiting for 2 seconds before retrying enough or more or
+		 * less?
+		 */
+		rc = WaitLatch(MyLatch,
+					   WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+					   2000L,
+					   WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP);
+
+		if (rc & WL_LATCH_SET)
+			ResetLatch(MyLatch);
+
+		/* log a message every ten seconds */
+		wait_iterations++;
+		if (wait_iterations % 5 == 0)
+		{
+			ereport(LOG,
+					errmsg("continuing to wait for remote slot \"%s\" LSN (%X/%X) and catalog xmin"
+						   " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)",
+						   remote_slot->name,
+						   LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+						   remote_slot->catalog_xmin,
+						   LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+						   MyReplicationSlot->data.catalog_xmin));
+		}
+	}
+}
+
+/*
  * If the remote restart_lsn and catalog_xmin have caught up with the
  * local ones, then update the LSNs and persist the local synced slot for
  * future synchronization; otherwise, do nothing.
@@ -558,7 +738,8 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
  * false.
  */
 static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(WalReceiverConn *wrconn,
+									 RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 	bool		found_consistent_snapshot = false;
@@ -577,12 +758,28 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/*
 		 * The remote slot didn't catch up to locally reserved position.
 		 *
-		 * We do not drop the slot because the restart_lsn can be ahead of the
-		 * current location when recreating the slot in the next cycle. It may
-		 * take more time to create such a slot. Therefore, we keep this slot
-		 * and attempt the synchronization in the next cycle.
+		 * If we're in the slotsync worker, we retain the slot and retry in the
+		 * next cycle. The restart_lsn might advance by then, allowing the slot
+		 * to be created successfully later.
 		 */
-		return false;
+		if (AmLogicalSlotSyncWorkerProcess())
+			return false;
+
+		/*
+		 * For SQL API synchronization, we wait for the remote slot to catch up
+		 * here, since we can't assume the SQL API will be called again soon.
+		 * We will retry the sync once the slot catches up.
+		 *
+		 * Note: This will return false if a promotion is triggered on the
+		 * standby while waiting, in which case we stop syncing and drop the
+		 * temporary slot.
+		 */
+		if (!wait_for_primary_slot_catchup(wrconn, remote_slot))
+			return false;
+		else
+			update_local_synced_slot(remote_slot, remote_dbid,
+									 &found_consistent_snapshot,
+									 &remote_slot_precedes);
 	}
 
 	/*
@@ -622,7 +819,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
  * Returns TRUE if the local slot is updated.
  */
 static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot,
+					 Oid remote_dbid)
 {
 	ReplicationSlot *slot;
 	XLogRecPtr	latestFlushPtr;
@@ -715,7 +913,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		/* Slot not ready yet, let's attempt to make it sync-ready now. */
 		if (slot->data.persistency == RS_TEMPORARY)
 		{
-			slot_updated = update_and_persist_local_synced_slot(remote_slot,
+			slot_updated = update_and_persist_local_synced_slot(wrconn,
+																remote_slot,
 																remote_dbid);
 		}
 
@@ -785,7 +984,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
 
-		update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+		update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid);
 
 		slot_updated = true;
 	}
@@ -927,7 +1126,8 @@ synchronize_slots(WalReceiverConn *wrconn)
 		 */
 		LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 
-		some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+		some_slot_updated |= synchronize_one_slot(wrconn, remote_slot,
+												  remote_dbid);
 
 		UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 	}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d..9fa36ab 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -64,6 +64,7 @@ LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication paralle
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
 REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
+REPLICATION_SLOTSYNC_PRIMARY_CATCHUP	"Waiting for the primary to catch-up."
 SYSLOGGER_MAIN	"Waiting in main loop of syslogger process."
 WAL_RECEIVER_MAIN	"Waiting in main loop of WAL receiver process."
 WAL_SENDER_MAIN	"Waiting in main loop of WAL sender process."
-- 
1.8.3.1

