From e1049a0fc58ebd837ebdd7745a52d5f47acd6bb3 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Thu, 13 Nov 2025 11:23:36 +0800
Subject: [PATCH v3PG16] Fix a race condition of updating
 procArray->replication_slot_xmin.

Previously, ReplicationSlotsComputeRequiredXmin() computed the oldest
xmin across all slots while not holding ProcArrayLock if
already_locked is false, and acquires the ProcArrayLock just before
updating the replication slot xmin. Therefore, if a process calls
ReplicationSlotsComputeRequiredXmin() with already_locked being false
and another process updates the replication slot xmin before the
process acquiring the lock, the slot xmin was overwritten with an old
value.

In the reported failure, a walsender for an apply worker computes
InvalidTransaction as the oldest xmin and overwrote a valid
replication slot xmin value computed by a walsender for a tablesync
worker with this value. Then the walsender for a tablesync worker
ended up computing the transaction id by
GetOldestSafeDecodingTransactionId() without considering replication
slot xmin. That led to an error ""cannot build an initial slot
snapshot as oldest safe xid %u follows snapshot's xmin %u", which was
an assertion failure prior to 240e0dbacd3.

This commit changes ReplicationSlotsComputeRequiredXmin() so that it
computes the oldest xmin while holding ProcArrayLock in exclusive
mode. We keep already_locked parameter in
ProcArraySetReplicationSlotXmin() on backbranches to not break ABI
compatibility.
---
 src/backend/replication/logical/logical.c | 12 +++++----
 src/backend/replication/slot.c            | 32 +++++++++++++++++++----
 2 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6e1879d8149..057f62a8d8f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -404,11 +404,11 @@ CreateInitDecodingContext(const char *plugin,
 	 * without further interlock its return value might immediately be out of
 	 * date.
 	 *
-	 * So we have to acquire the ProcArrayLock to prevent computation of new
-	 * xmin horizons by other backends, get the safe decoding xid, and inform
-	 * the slot machinery about the new limit. Once that's done the
-	 * ProcArrayLock can be released as the slot machinery now is
-	 * protecting against vacuum.
+	 * So we have to acquire both the ReplicationSlotControlLock and the
+	 * ProcArrayLock to prevent concurrent computation and update of new xmin
+	 * horizons by other backends, get the safe decoding xid, and inform the
+	 * slot machinery about the new limit. Once that's done the both locks
+	 * can be released as the slot machinery now is protecting against vacuum.
 	 *
 	 * Note that, temporarily, the data, not just the catalog, xmin has to be
 	 * reserved if a data snapshot is to be exported.  Otherwise the initial
@@ -421,6 +421,7 @@ CreateInitDecodingContext(const char *plugin,
 	 *
 	 * ----
 	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
@@ -435,6 +436,7 @@ CreateInitDecodingContext(const char *plugin,
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d03a0556b5a..637b49ee2c4 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -858,8 +858,11 @@ ReplicationSlotPersist(void)
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  *
- * If already_locked is true, ProcArrayLock has already been acquired
- * exclusively.
+ * If already_locked is true, both the ReplicationSlotControlLock and
+ * the ProcArrayLock have already been acquired exclusively.
+ *
+ * Note that the ReplicationSlotControlLock must be locked first to avoid
+ * deadlocks.
  */
 void
 ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -869,8 +872,26 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 	TransactionId agg_catalog_xmin = InvalidTransactionId;
 
 	Assert(ReplicationSlotCtl != NULL);
+	Assert(!already_locked ||
+		   (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
+			LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
 
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	/*
+	 * Hold the ReplicationSlotControlLock exclusive until after updating the
+	 * slot xmin values, so no backend can compute and update the new value
+	 * concurrently.
+	 *
+	 * One might think that we can hold the ProcArrayLock exclusively, compute
+	 * the xmin values while holding the ReplicationSlotControlLock in shared
+	 * mode, and update the slot xmin values, but it could increase lock
+	 * contention on the ProcArrayLock, which is not great since this function
+	 * can be called at non-negligible frequency.
+	 *
+	 * We instead increase lock contention on the ReplicationSlotControlLock
+	 * but it would be less harmful.
+	 */
+	if (!already_locked)
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -905,9 +926,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 			agg_catalog_xmin = effective_catalog_xmin;
 	}
 
-	LWLockRelease(ReplicationSlotControlLock);
-
 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
+
+	if (!already_locked)
+		LWLockRelease(ReplicationSlotControlLock);
 }
 
 /*
-- 
2.51.1.windows.1

