On Thu, 30 Jan 2025 at 16:02, Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> I have made minor changes in the attached. The main change is to raise
> an ERROR before we broadcast to let everybody know we've modified this
> slot. There is no point in broadcasting if the slot is unusable.
>
> >   - Updated the test files to match the new error message.
> >
>
> -   /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
> +   /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
>
> - "can no longer get changes from replication slot
> \"shared_row_removal_activeslot\""
> + "can no longer access replication slot \"shared_row_removal_activeslot\""
>
> With the above changes, we made the ERROR message generic which was
> required to raise it from the central place. This looks reasonable to
> me. The other option that occurred to me is to write it as: "can no
> longer use replication slot "vacuum_full_inactiveslot" to access WAL
> or modify it" but I find the one used currently in the patch better as
> this is longer and may need modification in future if we start using
> slot for something else.

One of the test was failing because MyReplicationSlot was not set to s
before erroring out which resulted in:
TRAP: failed Assert("s->data.persistency == RS_TEMPORARY"), File:
"slot.c", Line: 777, PID: 280121
postgres: primary: walsender vignesh [local]
START_REPLICATION(ExceptionalCondition+0xbb)[0x578612cd9289]
postgres: primary: walsender vignesh [local]
START_REPLICATION(ReplicationSlotCleanup+0x12b)[0x578612a32348]
postgres: primary: walsender vignesh [local]
START_REPLICATION(WalSndErrorCleanup+0x5e)[0x578612a41995]

Fixed it by setting MyReplicationSlot just before erroring out so that
WalSndErrorCleanup function will have access to it. I also moved the
error reporting above as there is no need to check for slot is active
for pid in case of invalidated slots. The attached patch has the
changes for the same.

Regards,
Vignesh
From 507561f1442f678c5a1e8900b06604c1c84c6b47 Mon Sep 17 00:00:00 2001
From: Vignesh <vignes...@gmail.com>
Date: Thu, 30 Jan 2025 18:15:15 +0530
Subject: [PATCH v4] Raise Error for Invalid Slots in ReplicationSlotAcquire()

Once a replication slot is invalidated, it cannot be reused. However, a
process could still acquire an invalid slot and fail later.

For example, if a process acquires a logical slot that was invalidated due
to wal_removed, it will eventually fail in CreateDecodingContext() when
attempting to access the removed WAL. Similarly, for physical replication
slots, even if the slot is invalidated and invalidation_reason is set to
wal_removed, the walsender does not currently check for invalidation when
starting physical replication. Instead, replication starts, and an error
is only reported later by the standby when a missing WAL is detected.

This patch improves error handling by detecting invalid slots earlier.
If error_if_invalid=true is specified when calling ReplicationSlotAcquire(),
an error will be raised immediately instead of letting the process acquire the
slot and fail later due to the invalidated slot.
---
 src/backend/replication/logical/logical.c     | 20 ------------
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/slotsync.c    |  4 +--
 src/backend/replication/slot.c                | 32 +++++++++++--------
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +--
 src/backend/utils/adt/pg_upgrade_support.c    |  2 +-
 src/include/replication/slot.h                |  3 +-
 src/test/recovery/t/019_replslot_limit.pl     |  2 +-
 .../t/035_standby_logical_decoding.pl         | 15 ++++-----
 10 files changed, 35 insertions(+), 51 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0b25efafe2..2c8cf516bd 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -542,26 +542,6 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				errdetail("This replication slot is being synchronized from the primary server."),
 				errhint("Specify another replication slot."));
 
-	/*
-	 * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
-	 * "cannot get changes" wording in this errmsg because that'd be
-	 * confusingly ambiguous about no changes being available when called from
-	 * pg_logical_slot_get_changes_guts().
-	 */
-	if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("can no longer get changes from replication slot \"%s\"",
-						NameStr(MyReplicationSlot->data.name)),
-				 errdetail("This slot has been invalidated because it was conflicting with recovery.")));
-
 	Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
 	Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec3678..ca53caac2f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d4..be6f87f00b 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1..74f7d565f0 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -535,9 +535,13 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid. It should always be set to true, except when we are temporarily
+ * acquiring the slot and doesn't intend to change it.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -585,6 +589,18 @@ retry:
 		active_pid = MyProcPid;
 	LWLockRelease(ReplicationSlotControlLock);
 
+	/* We made this slot active, so it's ours now. */
+	MyReplicationSlot = s;
+
+	/* Invalid slots can't be modified or used before accessing the WAL. */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("can no longer access replication slot \"%s\"",
+					   NameStr(s->data.name)),
+				errdetail("This replication slot has been invalidated due to \"%s\".",
+						  SlotInvalidationCauses[s->data.invalidated]));
+
 	/*
 	 * If we found the slot but it's already active in another process, we
 	 * wait until the owning process signals us that it's been released, or
@@ -612,9 +628,6 @@ retry:
 	/* Let everybody know we've modified this slot */
 	ConditionVariableBroadcast(&s->active_cv);
 
-	/* We made this slot active, so it's ours now. */
-	MyReplicationSlot = s;
-
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +798,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +825,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -820,13 +833,6 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 				errmsg("cannot use %s with a physical replication slot",
 					   "ALTER_REPLICATION_SLOT"));
 
-	if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
-		ereport(ERROR,
-				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				errmsg("cannot alter invalid replication slot \"%s\"", name),
-				errdetail("This replication slot has been invalidated due to \"%s\".",
-						  SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
-
 	if (RecoveryInProgress())
 	{
 		/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789f..8be4b8c65b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bac504b554..446d10c1a7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05..d44f8c262b 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad0..47ebdaecb6 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index ae2ad5c933..6468784b83 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This replication slot has been invalidated due to \"wal_removed\".",
 			$logstart))
 	{
 		$failed = 1;
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 7e794c5bea..505e85d1eb 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -533,7 +533,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 	qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
 	replication => 'database');
 ok( $stderr =~
-	  /ERROR:  cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+	  /ERROR:  can no longer access replication slot "vacuum_full_inactiveslot"/
 	  && $stderr =~
 	  /DETAIL:  This replication slot has been invalidated due to "rows_removed"./,
 	"invalidated slot cannot be altered");
@@ -551,8 +551,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"vacuum_full_activeslot\""
-);
+	"can no longer access replication slot \"vacuum_full_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -632,8 +631,7 @@ $handle =
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"row_removal_activeslot\""
-);
+	"can no longer access replication slot \"row_removal_activeslot\"");
 
 ##################################################
 # Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -668,7 +666,7 @@ $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"shared_row_removal_activeslot\""
+	"can no longer access replication slot \"shared_row_removal_activeslot\""
 );
 
 ##################################################
@@ -759,7 +757,7 @@ $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
 
 # We are not able to read from the slot as it has been invalidated
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"pruning_activeslot\"");
+	"can no longer access replication slot \"pruning_activeslot\"");
 
 # Turn hot_standby_feedback back on
 change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -818,8 +816,7 @@ $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
 # as the slot has been invalidated we should not be able to read
 check_pg_recvlogical_stderr($handle,
-	"can no longer get changes from replication slot \"wal_level_activeslot\""
-);
+	"can no longer access replication slot \"wal_level_activeslot\"");
 
 ##################################################
 # DROP DATABASE should drop its slots, including active slots.
-- 
2.43.0

Reply via email to