From ce199c10e965b75986fed450eedfba3f090099c4 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Thu, 13 Nov 2025 10:04:30 +0800
Subject: [PATCH v3] Fix race conditions causing invalidation of newly created
 slots

This commit addresses some race conditions that could lead to the removal of WALs
reserved by a slot, potentially resulting in slot invalidation:

1) Currently, there is a race condition that ReplicationSlotReserveWal selects a
   WAL position to be reserved but has not yet updated estart_lsn, while the
   checkpoint process might select a later WAL position as the minimum, causing
   premature removal of WAL needed by a slot.

This commit fixes it by taking exclusive lock on ReplicationSlotAllocationLock
when reserving WAL to serialize the minimum restart_lsn computation in checkpoint
process and WAL reservation, ensuring that:

* If the WAL reservation occurs first, the checkpoint must wait for the
restart_lsn to be updated before proceeding with WAL removal. This guarantees
that the most recent restart_lsn position is detected.

* If the checkpoint calls CheckPointReplicationSlots() first, then any
subsequent WAL reservation must take a position later than the redo pointer.

2) If a backend advances a slot's restart_lsn reaches
   XLogSetReplicationSlotMinimumLSN but not yet update the minimum LSN, and
   another backend creates a new slot. This can set the minimum LSN to a more
   recent position than the WAL reserved by the new slot, potentially
   invalidating it in the next checkpoint.

The commit resolves this by acquiring an exclusive ReplicationSlotControlLock,
replacing the original shared level lock, to prevent concurrent updates to
restart_lsn.

3) During slotsync, the remote LSN queried from the publisher is used as the
   initial restart_lsn for newly synced slots. As these slots are synced
   asynchronously, the synced restart_lsn may lag behind the standby's redo
   pointer. This creates a race conditions where checkpoints might remove
   required WALs and invalidate the newly synced slots.

Unlike in ReplicationSlotReserveWal(), holding only the
ReplicationSlotAllocationLock does not safeguard a newly synced slot if a
checkpoint occurs before WAL reservation during slotsync as the initial
restart_lsn provided could be prior to the redo pointer.

To address this issue, this commit changes the WAL reservation to compare the
remote restart_lsn with both minimum slot LSN and redo pointer. If either local
LSN is less than the remote restart_lsn, we update the local slot with the
remote value. Otherwise, we use the minimum of the two local LSNs.

Additionally, we acquire both ReplicationSlotAllocationLock and
ReplicationSlotControlLock to prevent the checkpoint from updating the redo
pointer and other backend processes from updating the minimum slot LSN.
---
 src/backend/access/transam/xlog.c          |   3 +-
 src/backend/replication/logical/slotsync.c |  97 ++++++++++---------
 src/backend/replication/slot.c             | 104 +++++++++++----------
 src/include/access/xlog.h                  |   1 +
 4 files changed, 109 insertions(+), 96 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 22d0a2e8c3a..17324259cff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -668,7 +668,6 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
 												  TimeLineID newTLI);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
-static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
 static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli,
 								  bool opportunistic);
@@ -2678,7 +2677,7 @@ XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
  * Return the oldest LSN we must retain to satisfy the needs of some
  * replication slot.
  */
-static XLogRecPtr
+XLogRecPtr
 XLogGetReplicationSlotMinimumLSN(void)
 {
 	XLogRecPtr	retval;
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8b4afd87dc9..b4ced23193e 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -49,6 +49,7 @@
 
 #include <time.h>
 
+#include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
 #include "catalog/pg_database.h"
@@ -482,70 +483,74 @@ drop_local_obsolete_slots(List *remote_slot_list)
  * Reserve WAL for the currently active local slot using the specified WAL
  * location (restart_lsn).
  *
- * If the given WAL location has been removed, reserve WAL using the oldest
- * existing WAL segment.
+ * If the given WAL location has been removed or is at risk of being removed,
+ * reserve WAL using the oldest segment that is non-removable.
  */
 static void
 reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
 {
-	XLogSegNo	oldest_segno;
+	XLogRecPtr	slot_min_lsn;
+	XLogRecPtr	min_safe_lsn;
 	XLogSegNo	segno;
 	ReplicationSlot *slot = MyReplicationSlot;
 
 	Assert(slot != NULL);
 	Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
 
-	while (true)
-	{
-		SpinLockAcquire(&slot->mutex);
-		slot->data.restart_lsn = restart_lsn;
-		SpinLockRelease(&slot->mutex);
+	/*
+	 * Acquire an exclusive lock to prevent the checkpoint process from
+	 * concurrently calculating the minimum slot LSN (see
+	 * CheckPointReplicationSlots), ensuring that if WAL reservation occurs
+	 * first, the checkpoint must wait for the restart_lsn update before
+	 * calculating the minimum LSN.
+	 *
+	 * Note: Unlike in ReplicationSlotReserveWal(), this lock does not protect
+	 * a newly synced slot if a checkpoint occurs before the WAL reservation
+	 * here. The initial restart_lsn passed here may be outdated, preceding
+	 * the redo pointer. Therefore, when initializing a new slot below, we
+	 * might use the redo pointer or the minimum slot LSN instead of relying
+	 * solely on the passed value.
+	 */
+	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
 
-		/* Prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
+	/*
+	 * Acquire an lock to prevent other backends from updating the minimum
+	 * slot LSN concurrently, ensuring that WALs prior to the minimum LSN are
+	 * not removable during this lock period.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+	/*
+	 * Select the safe non-removable LSN by evaluating the redo pointer
+	 * alongside the minimum slot LSN.
+	 */
+	min_safe_lsn = GetRedoRecPtr();
+	slot_min_lsn = XLogGetReplicationSlotMinimumLSN();
 
-		/*
-		 * Find the oldest existing WAL segment file.
-		 *
-		 * Normally, we can determine it by using the last removed segment
-		 * number. However, if no WAL segment files have been removed by a
-		 * checkpoint since startup, we need to search for the oldest segment
-		 * file from the current timeline existing in XLOGDIR.
-		 *
-		 * XXX: Currently, we are searching for the oldest segment in the
-		 * current timeline as there is less chance of the slot's restart_lsn
-		 * from being some prior timeline, and even if it happens, in the
-		 * worst case, we will wait to sync till the slot's restart_lsn moved
-		 * to the current timeline.
-		 */
-		oldest_segno = XLogGetLastRemovedSegno() + 1;
+	if (XLogRecPtrIsValid(slot_min_lsn) && min_safe_lsn > slot_min_lsn)
+		min_safe_lsn = slot_min_lsn;
 
-		if (oldest_segno == 1)
-		{
-			TimeLineID	cur_timeline;
+	/*
+	 * If the minimum safe LSN is less than the given restart_lsn, use it as
+	 * the initial restart_lsn for the newly synced slot.
+	 */
+	if (min_safe_lsn < restart_lsn)
+		restart_lsn = min_safe_lsn;
 
-			GetWalRcvFlushRecPtr(NULL, &cur_timeline);
-			oldest_segno = XLogGetOldestSegno(cur_timeline);
-		}
+	SpinLockAcquire(&slot->mutex);
+	slot->data.restart_lsn = restart_lsn;
+	SpinLockRelease(&slot->mutex);
 
-		elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
-			 segno, oldest_segno);
+	LWLockRelease(ReplicationSlotControlLock);
 
-		/*
-		 * If all required WAL is still there, great, otherwise retry. The
-		 * slot should prevent further removal of WAL, unless there's a
-		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
-		 * the new restart_lsn above, so normally we should never need to loop
-		 * more than twice.
-		 */
-		if (segno >= oldest_segno)
-			break;
+	ReplicationSlotsComputeRequiredLSN();
 
-		/* Retry using the location of the oldest wal segment */
-		XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
-	}
+	XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+	if (XLogGetLastRemovedSegno() >= segno)
+		elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
+			 NameStr(slot->data.name));
+
+	LWLockRelease(ReplicationSlotAllocationLock);
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1ec1e997b27..5f121df7a0b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1237,7 +1237,15 @@ ReplicationSlotsComputeRequiredLSN(void)
 
 	Assert(ReplicationSlotCtl != NULL);
 
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	/*
+	 * Hold the ReplicationSlotControlLock exclusive until after updating the
+	 * slot minimum LSN value, so no backend can compute and update the new
+	 * value concurrently. This prevents overwriting the minimum LSN with a
+	 * position more recent than the WAL position reserved by another newly
+	 * created slot, ensuring the WALs required by the new slot are not
+	 * prematurely removed during checkpoint.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	for (i = 0; i < max_replication_slots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
@@ -1282,9 +1290,10 @@ ReplicationSlotsComputeRequiredLSN(void)
 			 restart_lsn < min_required))
 			min_required = restart_lsn;
 	}
-	LWLockRelease(ReplicationSlotControlLock);
 
 	XLogSetReplicationSlotMinimumLSN(min_required);
+
+	LWLockRelease(ReplicationSlotControlLock);
 }
 
 /*
@@ -1571,63 +1580,62 @@ void
 ReplicationSlotReserveWal(void)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
+	XLogSegNo	segno;
+	XLogRecPtr	restart_lsn;
 
 	Assert(slot != NULL);
 	Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
 	Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
 
 	/*
-	 * The replication slot mechanism is used to prevent removal of required
-	 * WAL. As there is no interlock between this routine and checkpoints, WAL
-	 * segments could concurrently be removed when a now stale return value of
-	 * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
-	 * this happens we'll just retry.
+	 * Acquire an exclusive lock to prevent the checkpoint process from
+	 * concurrently calculating the minimum slot LSN (see
+	 * CheckPointReplicationSlots), ensuring that the reserved WAL cannot be
+	 * removed during a checkpoint.
+	 *
+	 * The mechanism is reliable because if WAL reservation occurs first, the
+	 * checkpoint must wait for the restart_lsn update before determining the
+	 * minimum non-removable LSN. On the other hand, if the checkpoint occurs
+	 * first, subsequent WAL reservations must choose positions beyond or
+	 * equal to the redo pointer of checkpoint.
 	 */
-	while (true)
-	{
-		XLogSegNo	segno;
-		XLogRecPtr	restart_lsn;
+	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
 
-		/*
-		 * For logical slots log a standby snapshot and start logical decoding
-		 * at exactly that position. That allows the slot to start up more
-		 * quickly. But on a standby we cannot do WAL writes, so just use the
-		 * replay pointer; effectively, an attempt to create a logical slot on
-		 * standby will cause it to wait for an xl_running_xact record to be
-		 * logged independently on the primary, so that a snapshot can be
-		 * built using the record.
-		 *
-		 * None of this is needed (or indeed helpful) for physical slots as
-		 * they'll start replay at the last logged checkpoint anyway. Instead
-		 * return the location of the last redo LSN. While that slightly
-		 * increases the chance that we have to retry, it's where a base
-		 * backup has to start replay at.
-		 */
-		if (SlotIsPhysical(slot))
-			restart_lsn = GetRedoRecPtr();
-		else if (RecoveryInProgress())
-			restart_lsn = GetXLogReplayRecPtr(NULL);
-		else
-			restart_lsn = GetXLogInsertRecPtr();
+	/*
+	 * For logical slots log a standby snapshot and start logical decoding at
+	 * exactly that position. That allows the slot to start up more quickly.
+	 * But on a standby we cannot do WAL writes, so just use the replay
+	 * pointer; effectively, an attempt to create a logical slot on standby
+	 * will cause it to wait for an xl_running_xact record to be logged
+	 * independently on the primary, so that a snapshot can be built using the
+	 * record.
+	 *
+	 * None of this is needed (or indeed helpful) for physical slots as
+	 * they'll start replay at the last logged checkpoint anyway. Instead
+	 * return the location of the last redo LSN. While that slightly increases
+	 * the chance that we have to retry, it's where a base backup has to start
+	 * replay at.
+	 */
+	if (SlotIsPhysical(slot))
+		restart_lsn = GetRedoRecPtr();
+	else if (RecoveryInProgress())
+		restart_lsn = GetXLogReplayRecPtr(NULL);
+	else
+		restart_lsn = GetXLogInsertRecPtr();
 
-		SpinLockAcquire(&slot->mutex);
-		slot->data.restart_lsn = restart_lsn;
-		SpinLockRelease(&slot->mutex);
+	SpinLockAcquire(&slot->mutex);
+	slot->data.restart_lsn = restart_lsn;
+	SpinLockRelease(&slot->mutex);
 
-		/* prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
+	/* prevent WAL removal as fast as possible */
+	ReplicationSlotsComputeRequiredLSN();
 
-		/*
-		 * If all required WAL is still there, great, otherwise retry. The
-		 * slot should prevent further removal of WAL, unless there's a
-		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
-		 * the new restart_lsn above, so normally we should never need to loop
-		 * more than twice.
-		 */
-		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
-		if (XLogGetLastRemovedSegno() < segno)
-			break;
-	}
+	XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+	if (XLogGetLastRemovedSegno() >= segno)
+		elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
+			 NameStr(slot->data.name));
+
+	LWLockRelease(ReplicationSlotAllocationLock);
 
 	if (!RecoveryInProgress() && SlotIsLogical(slot))
 	{
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 605280ed8fb..935d3df8758 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -215,6 +215,7 @@ extern XLogSegNo XLogGetLastRemovedSegno(void);
 extern XLogSegNo XLogGetOldestSegno(TimeLineID tli);
 extern void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN);
 extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
+extern XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
 extern void xlog_redo(struct XLogReaderState *record);
 extern void xlog_desc(StringInfo buf, struct XLogReaderState *record);
-- 
2.31.1

