From b10a655ed8000457da9160c13cc7832c46154347 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 19 Dec 2025 10:51:20 +0800
Subject: [PATCH] Fix a race condition in updating
 procArray->replication_slot_xmin.

Previously, ReplicationSlotsComputeRequiredXmin() computed the oldest
xmin across all slots without holding ProcArrayLock (when
already_locked is false), acquiring the lock just before updating the
replication slot xmin.

This could lead to a race condition: if a backend created a new slot
and updates the global replication slot xmin, another backend
concurrently running ReplicationSlotsComputeRequiredXmin() could
overwrite that update with an invalid or stale value. This happens
because the concurrent backend might have computed the aggregate xmin
before the new slot was accounted for, but applied the update after
the new slot had already updated the global value.

In the reported failure, a walsender for an apply worker computed
InvalidTransactionId as the oldest xmin and overwrote a valid
replication slot xmin value computed by a walsender for a tablesync
worker. Consequently, the tablesync worker computed a transaction ID
via GetOldestSafeDecodingTransactionId() effectively without
considering the replication slot xmin. This led to the error "cannot
build an initial slot snapshot as oldest safe xid %u follows
snapshot's xmin %u", which was an assertion failure prior to commit
240e0dbacd3.

To fix this, we acquire ReplicationSlotControlLock in exclusive mode
during slot creation to perform the initial update of the slot
xmin. In ReplicationSlotsComputeRequiredXmin(), we hold
ReplicationSlotControlLock in shared mode until the global slot xmin
is updated in ProcArraySetReplicationSlotXmin(). This prevents
concurrent computations and updates of the global xmin by other
backends during the initial slot xmin update process, while still
permitting concurrent calls to ReplicationSlotsComputeRequiredXmin().

Backpatch to all supported versions.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Pradeep Kumar <spradeepkumar29@gmail.com>
Reviewed-by: Hayato Kuroda (Fujitsu) <kuroda.hayato@fujitsu.com>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CAA4eK1L8wYcyTPxNzPGkhuO52WBGoOZbT0A73Le=ZUWYAYmdfw@mail.gmail.com
Backpatch-through: 13
---
 src/backend/replication/logical/launcher.c |  2 ++
 src/backend/replication/logical/logical.c  | 12 ++++---
 src/backend/replication/logical/slotsync.c |  2 ++
 src/backend/replication/slot.c             | 39 +++++++++++++++++++---
 4 files changed, 45 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3991e1495d4..b9780c7bc99 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1540,6 +1540,7 @@ init_conflict_slot_xmin(void)
 	Assert(MyReplicationSlot &&
 		   !TransactionIdIsValid(MyReplicationSlot->data.xmin));
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(false);
@@ -1552,6 +1553,7 @@ init_conflict_slot_xmin(void)
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	/* Write this slot to disk */
 	ReplicationSlotMarkDirty();
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index c8858e06616..ed117536c7c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -394,11 +394,11 @@ CreateInitDecodingContext(const char *plugin,
 	 * without further interlock its return value might immediately be out of
 	 * date.
 	 *
-	 * So we have to acquire the ProcArrayLock to prevent computation of new
-	 * xmin horizons by other backends, get the safe decoding xid, and inform
-	 * the slot machinery about the new limit. Once that's done the
-	 * ProcArrayLock can be released as the slot machinery now is
-	 * protecting against vacuum.
+	 * So we have to acquire both the ReplicationSlotControlLock and the
+	 * ProcArrayLock to prevent concurrent computation and update of new xmin
+	 * horizons by other backends, get the safe decoding xid, and inform the
+	 * slot machinery about the new limit. Once that's done the both locks
+	 * can be released as the slot machinery now is protecting against vacuum.
 	 *
 	 * Note that, temporarily, the data, not just the catalog, xmin has to be
 	 * reserved if a data snapshot is to be exported.  Otherwise the initial
@@ -411,6 +411,7 @@ CreateInitDecodingContext(const char *plugin,
 	 *
 	 * ----
 	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
@@ -425,6 +426,7 @@ CreateInitDecodingContext(const char *plugin,
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 2aea776352d..0feebffd431 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -857,6 +857,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 
 		reserve_wal_for_local_slot(remote_slot->restart_lsn);
 
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 		xmin_horizon = GetOldestSafeDecodingTransactionId(true);
 		SpinLockAcquire(&slot->mutex);
@@ -865,6 +866,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		SpinLockRelease(&slot->mutex);
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
+		LWLockRelease(ReplicationSlotControlLock);
 
 		/*
 		 * Make sure that concerned WAL is received and flushed before syncing
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 58c41d45516..75967580550 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1201,8 +1201,11 @@ ReplicationSlotPersist(void)
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  *
- * If already_locked is true, ProcArrayLock has already been acquired
- * exclusively.
+ * If already_locked is true, both the ReplicationSlotControlLock and the
+ * ProcArrayLock have already been acquired exclusively. It is crucial that the
+ * caller first acquires the ReplicationSlotControlLock, followed by the
+ * ProcArrayLock, to prevent any undetectable deadlocks since this function
+ * acquire them in that order.
  */
 void
 ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -1212,8 +1215,33 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 	TransactionId agg_catalog_xmin = InvalidTransactionId;
 
 	Assert(ReplicationSlotCtl != NULL);
+	Assert(!already_locked ||
+		   (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
+			LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
 
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	/*
+	 * Hold the ReplicationSlotControlLock until after updating the slot xmin
+	 * values, so no backend update the initial xmin for newly created slot
+	 * concurrently. A shared lock is used here to minimize lock contention,
+	 * especially when many slots exist and advancements occur frequently.
+	 * This is safe since an exclusive lock is taken during initial slot xmin
+	 * update in slot creation.
+	 *
+	 * One might think that we can hold the ProcArrayLock exclusively and
+	 * update the slot xmin values, but it could increase lock contention on
+	 * the ProcArrayLock, which is not great since this function can be called
+	 * at non-negligible frequency.
+	 *
+	 * Concurrent invocation of this function may cause the computed slot xmin
+	 * to regress. However, this is harmless because tuples prior to the most
+	 * recent xmin are no longer useful once advancement occurs (see
+	 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
+	 * before updating the effective_xmin). Thus, such regression merely
+	 * prevents VACUUM from prematurely removing tuples without causing the
+	 * early deletion of required data.
+	 */
+	if (!already_locked)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -1248,9 +1276,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 			agg_catalog_xmin = effective_catalog_xmin;
 	}
 
-	LWLockRelease(ReplicationSlotControlLock);
-
 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
+
+	if (!already_locked)
+		LWLockRelease(ReplicationSlotControlLock);
 }
 
 /*
-- 
2.47.3

