From 2e1a8cba688291cf9150f0abbce89273832a644d Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sun, 24 Mar 2024 08:48:41 +0000
Subject: [PATCH v18 5/5] Add inactive_timeout based replication slot
 invalidation.

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days at slot level, after which the inactive slots get
dropped.

To achieve the above, postgres uses replication slot property
last_inactive_time (the time at which the slot became inactive),
and a new slot level parameter inactive_timeout and finds an
opportunity to invalidate the slot based on this new mechanism.
The invalidation check happens at various locations to help
being as latest as possible, these locations include the
following:
- Whenever the slot is acquired if the slot
  gets invalidated due to this new mechanism, an error is
  emitted.
- During checkpoint.
- Whenver pg_get_replication_slots() is called.

Note that this new invalidation mechanism won't kick-in for the
slots that are currently being synced from the primary to the
standby.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 doc/src/sgml/func.sgml                        |  12 +-
 doc/src/sgml/system-views.sgml                |  10 +-
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |   4 +-
 src/backend/replication/slot.c                | 184 +++++++++++++++++-
 src/backend/replication/slotfuncs.c           |  19 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/include/replication/slot.h                |   9 +-
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 170 ++++++++++++++++
 11 files changed, 395 insertions(+), 22 deletions(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 22c8e0d39c..4826e45c7d 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -28393,8 +28393,8 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         released upon any error. The optional fourth
         parameter, <parameter>inactive_timeout</parameter>, when set to a
         non-zero value, specifies the amount of time in seconds the slot is
-        allowed to be inactive. This function corresponds to the replication
-        protocol command
+        allowed to be inactive before getting invalidated.
+        This function corresponds to the replication protocol command
         <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
        </para></entry>
       </row>
@@ -28439,12 +28439,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <parameter>failover</parameter>, when set to true,
         specifies that this slot is enabled to be synced to the
         standbys so that logical replication can be resumed after
-        failover.  The optional sixth parameter,
+        failover. The optional sixth parameter,
         <parameter>inactive_timeout</parameter>, when set to a
         non-zero value, specifies the amount of time in seconds the slot is
-        allowed to be inactive. A call to this function has the same effect as
-        the replication protocol command
-        <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
+        allowed to be inactive before getting invalidated.
+        A call to this function has the same effect as the replication protocol
+        command <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
        </para></entry>
       </row>
 
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index dddbaa070f..1722609d39 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2539,7 +2539,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        <structfield>inactive_timeout</structfield> <type>integer</type>
       </para>
       <para>
-        The amount of time in seconds the slot is allowed to be inactive.
+        The amount of time in seconds the slot is allowed to be inactive
+        before getting invalidated.
       </para></entry>
      </row>
 
@@ -2583,6 +2584,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>inactive_timeout</literal> means that the slot has been
+          inactive for the duration specified by slot's
+          <literal>inactive_timeout</literal> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index c01876ceeb..7f1ffab23c 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -319,7 +319,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -529,7 +529,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
 		 * if the slot is not acquired by other processes.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index baf0b9aa72..fae61020c4 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -158,6 +159,9 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static bool InvalidateSlotForInactiveTimeout(ReplicationSlot *slot,
+											 bool need_control_lock,
+											 bool need_mutex);
 
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
@@ -550,9 +554,14 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * If check_for_invalidation is true, the slot is checked for invalidation
+ * based on its inactive_timeout parameter and an error is raised after making
+ * the slot ours.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait,
+					   bool check_for_invalidation)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -630,6 +639,42 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * Check if the given slot can be invalidated based on its
+	 * inactive_timeout parameter. If yes, persist the invalidated state to
+	 * disk and then error out. We do this only after making the slot ours to
+	 * avoid anyone else acquiring it while we check for its invalidation.
+	 */
+	if (check_for_invalidation)
+	{
+		/* The slot is ours by now */
+		Assert(s->active_pid == MyProcPid);
+
+		/*
+		 * Well, the slot is not yet ours really unless we check for the
+		 * invalidation below.
+		 */
+		s->active_pid = 0;
+		if (InvalidateReplicationSlotForInactiveTimeout(s, true, true, true))
+		{
+			/*
+			 * If the slot has been invalidated, recalculate the resource
+			 * limits.
+			 */
+			ReplicationSlotsComputeRequiredXmin(false);
+			ReplicationSlotsComputeRequiredLSN();
+
+			/* Might need it for slot clean up on error, so restore it */
+			s->active_pid = MyProcPid;
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("cannot acquire invalidated replication slot \"%s\"",
+							NameStr(MyReplicationSlot->data.name)),
+					 errdetail("This slot has been invalidated because of its inactive_timeout parameter.")));
+		}
+		s->active_pid = MyProcPid;
+	}
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -793,7 +838,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -818,7 +863,7 @@ ReplicationSlotAlter(const char *name, bool failover, int inactive_timeout)
 
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, true);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1546,6 +1591,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by slot's inactive_timeout parameter."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1659,6 +1707,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+					if (InvalidateReplicationSlotForInactiveTimeout(s, false, false, false))
+						invalidation_cause = cause;
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1812,6 +1864,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1863,6 +1916,109 @@ restart:
 	return invalidated;
 }
 
+/*
+ * Invalidate given slot based on its inactive_timeout parameter.
+ *
+ * Returns true if the slot has got invalidated.
+ *
+ * NB - this function also runs as part of checkpoint, so avoid raising errors
+ * if possible.
+ */
+bool
+InvalidateReplicationSlotForInactiveTimeout(ReplicationSlot *slot,
+											bool need_control_lock,
+											bool need_mutex,
+											bool persist_state)
+{
+	if (!InvalidateSlotForInactiveTimeout(slot, need_control_lock, need_mutex))
+		return false;
+
+	Assert(slot->active_pid == 0);
+
+	SpinLockAcquire(&slot->mutex);
+	slot->data.invalidated = RS_INVAL_INACTIVE_TIMEOUT;
+
+	/* Make sure the invalidated state persists across server restart */
+	slot->just_dirtied = true;
+	slot->dirty = true;
+	SpinLockRelease(&slot->mutex);
+
+	if (persist_state)
+	{
+		char		path[MAXPGPATH];
+
+		sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
+		ReplicationSlotSaveToPath(slot, path, ERROR);
+	}
+
+	ReportSlotInvalidation(RS_INVAL_INACTIVE_TIMEOUT, false, 0,
+						   slot->data.name, InvalidXLogRecPtr,
+						   InvalidXLogRecPtr, InvalidTransactionId);
+
+	return true;
+}
+
+/*
+ * Helper for InvalidateReplicationSlotForInactiveTimeout
+ */
+static bool
+InvalidateSlotForInactiveTimeout(ReplicationSlot *slot,
+								 bool need_control_lock,
+								 bool need_mutex)
+{
+	ReplicationSlotInvalidationCause inavidation_cause = RS_INVAL_NONE;
+
+	if (slot->last_inactive_time == 0 ||
+		slot->data.inactive_timeout == 0)
+		return false;
+
+	/* inactive_timeout is only tracked for permanent slots */
+	if (slot->data.persistency != RS_PERSISTENT)
+		return false;
+
+	/*
+	 * Do not invalidate the slots which are currently being synced from the
+	 * primary to the standby.
+	 */
+	if (RecoveryInProgress() && slot->data.synced)
+		return false;
+
+	if (need_control_lock)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+	/*
+	 * Check if the slot needs to be invalidated due to inactive_timeout. We
+	 * do this with the spinlock held to avoid race conditions -- for example
+	 * the restart_lsn could move forward, or the slot could be dropped.
+	 */
+	if (need_mutex)
+		SpinLockAcquire(&slot->mutex);
+
+	if (slot->last_inactive_time > 0 &&
+		slot->data.inactive_timeout > 0)
+	{
+		TimestampTz now;
+
+		/* last_inactive_time is only tracked for inactive slots */
+		Assert(slot->active_pid == 0);
+
+		now = GetCurrentTimestamp();
+		if (TimestampDifferenceExceeds(slot->last_inactive_time, now,
+									   slot->data.inactive_timeout * 1000))
+			inavidation_cause = RS_INVAL_INACTIVE_TIMEOUT;
+	}
+
+	if (need_mutex)
+		SpinLockRelease(&slot->mutex);
+
+	if (need_control_lock)
+		LWLockRelease(ReplicationSlotControlLock);
+
+	return (inavidation_cause == RS_INVAL_INACTIVE_TIMEOUT);
+}
+
 /*
  * Flush all replication slots to disk.
  *
@@ -1875,6 +2031,7 @@ void
 CheckPointReplicationSlots(bool is_shutdown)
 {
 	int			i;
+	bool		invalidated = false;
 
 	elog(DEBUG1, "performing replication slot checkpoint");
 
@@ -1896,10 +2053,11 @@ CheckPointReplicationSlots(bool is_shutdown)
 			continue;
 
 		/*
-		 * Save the slot to disk, locking is handled in
-		 * ReplicationSlotSaveToPath.
+		 * Here's an opportunity to invalidate inactive replication slots
+		 * based on timeout, so let's do it.
 		 */
-		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+		if (InvalidateReplicationSlotForInactiveTimeout(s, true, true, false))
+			invalidated = true;
 
 		/*
 		 * Slot's data is not flushed each time the confirmed_flush LSN is
@@ -1924,9 +2082,21 @@ CheckPointReplicationSlots(bool is_shutdown)
 			SpinLockRelease(&s->mutex);
 		}
 
+		/*
+		 * Save the slot to disk, locking is handled in
+		 * ReplicationSlotSaveToPath.
+		 */
+		sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
 		ReplicationSlotSaveToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	/* If the slot has been invalidated, recalculate the resource limits */
+	if (invalidated)
+	{
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index dba80ac1bb..aadba68c11 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -257,6 +257,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
+	bool		invalidated = false;
 
 	/*
 	 * We don't require any special permission to see this function's data
@@ -287,6 +288,13 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		slot_contents = *slot;
 		SpinLockRelease(&slot->mutex);
 
+		/*
+		 * Here's an opportunity to invalidate inactive replication slots
+		 * based on timeout, so let's do it.
+		 */
+		if (InvalidateReplicationSlotForInactiveTimeout(slot, false, true, true))
+			invalidated = true;
+
 		memset(values, 0, sizeof(values));
 		memset(nulls, 0, sizeof(nulls));
 
@@ -465,6 +473,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 	LWLockRelease(ReplicationSlotControlLock);
 
+	/*
+	 * If the slot has been invalidated, recalculate the resource limits
+	 */
+	if (invalidated)
+	{
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
+
 	return (Datum) 0;
 }
 
@@ -667,7 +684,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0420274247..aa886412a5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -846,7 +846,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1483,7 +1483,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index c54b08fe18..82956d58d3 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -299,7 +299,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, false);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index ee9b385cf9..00ff8e5ef5 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -249,7 +251,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, bool failover,
 								 int inactive_timeout);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool check_for_invalidation);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
@@ -270,6 +273,10 @@ extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause
 											   XLogSegNo oldestSegno,
 											   Oid dboid,
 											   TransactionId snapshotConflictHorizon);
+extern bool InvalidateReplicationSlotForInactiveTimeout(ReplicationSlot *slot,
+														bool need_control_lock,
+														bool need_mutex,
+														bool persist_state);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index b1eb77b1ec..708a2a3798 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -51,6 +51,7 @@ tests += {
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
       't/042_low_level_backup.pl',
+      't/050_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..77499dde07
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,170 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Check for invalidation of slot in server log.
+sub check_slots_invalidation_in_server_log
+{
+	my ($node, $slot_name, $offset) = @_;
+	my $invalidated = 0;
+
+	for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+	{
+		$node->safe_psql('postgres', "CHECKPOINT");
+		if ($node->log_contains(
+				"invalidating obsolete replication slot \"$slot_name\"", $offset))
+		{
+			$invalidated = 1;
+			last;
+		}
+		usleep(100_000);
+	}
+	ok($invalidated, "check that slot $slot_name invalidation has been logged");
+}
+
+# =============================================================================
+# Testcase start: Invalidate streaming standby's slot due to inactive_timeout
+#
+
+# Initialize primary node
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+
+# Avoid checkpoint during the test, otherwise, the test can get unpredictable
+$primary->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$primary->start;
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+$standby1->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb1_slot'
+});
+
+# Set timeout so that the slot when inactive will get invalidated after the
+# timeout.
+my $inactive_timeout = 1;
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb1_slot', inactive_timeout := $inactive_timeout);
+]);
+
+$standby1->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+# Check inactive_timeout is what we've set above
+my $result = $primary->safe_psql(
+	'postgres', qq[
+	SELECT inactive_timeout = $inactive_timeout
+		FROM pg_replication_slots WHERE slot_name = 'sb1_slot';
+]);
+is($result, "t",
+	'check the inactive replication slot info for an active slot');
+
+my $logstart = -s $primary->logfile;
+
+# Stop standby to make the replication slot on primary inactive
+$standby1->stop;
+
+# Wait for the inactive replication slot info to be updated
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE last_inactive_time IS NOT NULL
+            AND slot_name = 'sb1_slot'
+            AND inactive_timeout = $inactive_timeout;
+])
+  or die
+  "Timed out while waiting for inactive replication slot info to be updated";
+
+check_slots_invalidation_in_server_log($primary, 'sb1_slot', $logstart);
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb1_slot' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for inactive replication slot sb1_slot to be invalidated";
+
+# Testcase end: Invalidate streaming standby's slot due to inactive_timeout
+# =============================================================================
+
+# =============================================================================
+# Testcase start: Invalidate logical subscriber's slot due to inactive_timeout
+my $publisher = $primary;
+
+# Create subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('sub');
+$subscriber->init;
+$subscriber->start;
+
+# Create tables
+$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$subscriber->safe_psql('postgres',
+	"INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot')"
+);
+
+$subscriber->wait_for_subscription_sync($publisher, 'sub');
+
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+# Alter slot to set inactive_timeout
+$publisher->safe_psql(
+	'postgres', qq[
+	SELECT pg_alter_replication_slot(slot_name := 'lsub1_slot', inactive_timeout := $inactive_timeout);
+]);
+
+$logstart = -s $publisher->logfile;
+
+# Stop subscriber to make the replication slot on publisher inactive
+$subscriber->stop;
+
+# Wait for the inactive replication slot info to be updated
+$publisher->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE last_inactive_time IS NOT NULL
+            AND slot_name = 'lsub1_slot'
+            AND inactive_timeout = $inactive_timeout;
+])
+  or die
+  "Timed out while waiting for inactive replication slot info to be updated";
+
+check_slots_invalidation_in_server_log($publisher, 'lsub1_slot', $logstart);
+
+# Testcase end: Invalidate logical subscriber's slot due to inactive_timeout
+# =============================================================================
+
+done_testing();
-- 
2.34.1

