From aa47b7d71a12d18ced352e3771055753b993a122 Mon Sep 17 00:00:00 2001
From: Vignesh <vignesh21@gmail.com>
Date: Thu, 30 Jan 2025 18:15:15 +0530
Subject: [PATCH v65 1/4] Raise Error for Invalid Slots in
 ReplicationSlotAcquire()

Once a replication slot is invalidated, it cannot be reused. However, a
process could still acquire an invalid slot and fail later.

For example, if a process acquires a logical slot that was invalidated due
to wal_removed, it will eventually fail in CreateDecodingContext() when
attempting to access the removed WAL. Similarly, for physical replication
slots, even if the slot is invalidated and invalidation_reason is set to
wal_removed, the walsender does not currently check for invalidation when
starting physical replication. Instead, replication starts, and an error
is only reported later by the standby when a missing WAL is detected.

This patch improves error handling by detecting invalid slots earlier.
If error_if_invalid=true is specified when calling ReplicationSlotAcquire(),
an error will be raised immediately instead of letting the process acquire the
slot and fail later due to the invalidated slot.
---
 src/backend/replication/logical/logical.c     | 20 ------------
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/slotsync.c    |  4 +--
 src/backend/replication/slot.c                | 32 +++++++++++--------
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +--
 src/backend/utils/adt/pg_upgrade_support.c    |  2 +-
 src/include/replication/slot.h                |  3 +-
 src/test/recovery/t/019_replslot_limit.pl     |  2 +-
 .../t/035_standby_logical_decoding.pl         | 15 ++++-----
 10 files changed, 35 insertions(+), 51 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0b25efafe2..2c8cf516bd 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -542,26 +542,6 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				errdetail("This replication slot is being synchronized from the primary server."),
 				errhint("Specify another replication slot."));
 
-	/*
-	 * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
-	 * "cannot get changes" wording in this errmsg because that'd be
-	 * confusingly ambiguous about no changes being available when called from
-	 * pg_logical_slot_get_changes_guts().
-	 */
-	if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
-
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec3678..ca53caac2f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d4..be6f87f00b 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1..74f7d565f0 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -535,9 +535,13 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid. It should always be set to true, except when we are temporarily
+ * acquiring the slot and doesn't intend to change it.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -585,6 +589,18 @@ retry:
 		active_pid = MyProcPid;
 	LWLockRelease(ReplicationSlotControlLock);
 
+	/* We made this slot active, so it's ours now. */
+	MyReplicationSlot = s;
+
+	/* Invalid slots can't be modified or used before accessing the WAL. */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("can no longer access replication slot \"%s\"",
+					   NameStr(s->data.name)),
+				errdetail("This replication slot has been invalidated due to \"%s\".",
+						  SlotInvalidationCauses[s->data.invalidated]));
+
 	/*
 	 * If we found the slot but it's already active in another process, we
 	 * wait until the owning process signals us that it's been released, or
@@ -612,9 +628,6 @@ retry:
 	/* Let everybody know we've modified this slot */
 	ConditionVariableBroadcast(&s->active_cv);
 
-	/* We made this slot active, so it's ours now. */
-	MyReplicationSlot = s;
-
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +798,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +825,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -820,13 +833,6 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 				errmsg("cannot use %s with a physical replication slot",
 					   "ALTER_REPLICATION_SLOT"));
 
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot alter invalid replication slot \"%s\"", name),
-				errdetail("This replication slot has been invalidated due to \"%s\".",
-						  SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
-
 	if (RecoveryInProgress())
 	{
 		/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789f..8be4b8c65b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bac504b554..446d10c1a7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05..d44f8c262b 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad0..47ebdaecb6 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index ae2ad5c933..6468784b83 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This replication slot has been invalidated due to \"wal_removed\".",
 			$logstart))
 	{
 		$failed = 1;
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 7e794c5bea..505e85d1eb 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -533,7 +533,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 	qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
 	replication => 'database');
 ok( $stderr =~
-	  /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+	  /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
 	  && $stderr =~
 	  /DETAIL:  This replication slot has been invalidated due to "rows_removed"./,
 	"invalidated slot cannot be altered");
@@ -551,8 +551,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"vacuum_full_activeslot\""
-);
+	"can no longer access replication slot \"vacuum_full_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -632,8 +631,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"row_removal_activeslot\""
-);
+	"can no longer access replication slot \"row_removal_activeslot\"");
 
 ##################################################
 # Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -668,7 +666,7 @@ $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"shared_row_removal_activeslot\""
+	"can no longer access replication slot \"shared_row_removal_activeslot\""
 );
 
 ##################################################
@@ -759,7 +757,7 @@ $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"pruning_activeslot\"");
+	"can no longer access replication slot \"pruning_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -818,8 +816,7 @@ $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # as the slot has been invalidated we should not be able to read
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"wal_level_activeslot\""
-);
+	"can no longer access replication slot \"wal_level_activeslot\"");
 
 ##################################################
 # DROP DATABASE should drop its slots, including active slots.
-- 
2.34.1

