From 1aa86d7a05fac28c87baa612e3dfac38e75bc91c Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Tue, 28 Jan 2025 16:23:53 +0530
Subject: [PATCH v64 1/2] Introduce inactive_timeout based replication slot
 invalidation

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
this GUC is a bit tricky. Because the amount of WAL a database
generates, and the allocated storage per instance will vary
greatly in production, making it difficult to pin down a
one-size-fits-all value.

It is often easy for users to set a timeout of say 1 or 2 or n
days, after which all the inactive slots get invalidated. This
commit introduces a GUC named idle_replication_slot_timeout.
When set, postgres invalidates slots (during non-shutdown
checkpoints) that are idle for longer than this amount of
time.

Note that the idle timeout invalidation mechanism is not
applicable for slots on the standby server that are being synced
from the primary server (i.e., standby slots having 'synced' field
'true'). Synced slots are always considered to be inactive because
they don't perform logical decoding to produce changes.
---
 doc/src/sgml/config.sgml                      |  39 ++++
 doc/src/sgml/logical-replication.sgml         |   5 +
 doc/src/sgml/system-views.sgml                |  10 +-
 .../replication/logical/logicalfuncs.c        |   2 +-
 src/backend/replication/logical/slotsync.c    |   8 +-
 src/backend/replication/slot.c                | 187 ++++++++++++++++--
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/replication/walsender.c           |   4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |   2 +-
 src/backend/utils/adt/timestamp.c             |  18 ++
 src/backend/utils/misc/guc_tables.c           |  14 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/bin/pg_basebackup/pg_createsubscriber.c   |   4 +
 src/bin/pg_upgrade/server.c                   |   7 +
 src/include/replication/slot.h                |  25 ++-
 src/include/utils/guc_hooks.h                 |   2 +
 src/include/utils/timestamp.h                 |   3 +
 17 files changed, 302 insertions(+), 31 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index a782f10998..342be29112 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4450,6 +4450,45 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-idle-replication-slot-timeout" xreflabel="idle_replication_slot_timeout">
+      <term><varname>idle_replication_slot_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>idle_replication_slot_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots that have remained idle longer than this
+        duration. If this value is specified without units, it is taken as
+        minutes. A value of zero disables the idle timeout invalidation mechanism.
+        The default is one day. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server command line.
+       </para>
+
+       <para>
+        Slot invalidation due to idle timeout occurs during checkpoint.
+        Because checkpoints happen at <varname>checkpoint_timeout</varname>
+        intervals, there can be some lag between when the
+        <varname>idle_replication_slot_timeout</varname> was exceeded and when
+        the slot invalidation is triggered at the next checkpoint.
+        To avoid such lags, users can force a checkpoint to promptly invalidate
+        inactive slots. The duration of slot inactivity is calculated using the slot's
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>inactive_since</structfield>
+        value.
+       </para>
+
+       <para>
+        Note that the idle timeout invalidation mechanism is not
+        applicable for slots on the standby server that are being synced
+        from the primary server (i.e., standby slots having
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
+        value <literal>true</literal>).
+        Synced slots are always considered to be inactive because they don't
+        perform logical decoding to produce changes.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 07a07dfe0b..2fb8bcd736 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -2198,6 +2198,11 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
     plus some reserve for table synchronization.
    </para>
 
+   <para>
+    Logical replication slots are also affected by
+    <link linkend="guc-idle-replication-slot-timeout"><varname>idle_replication_slot_timeout</varname></link>.
+   </para>
+
    <para>
     <link linkend="guc-max-wal-senders"><varname>max_wal_senders</varname></link>
     should be set to at least the same as
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 8e2b0a7927..7d3a0aa709 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2566,7 +2566,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para>
       <para>
         The time when the slot became inactive. <literal>NULL</literal> if the
-        slot is currently being streamed.
+        slot is currently being streamed. If the slot becomes invalidated,
+        this value will remain unchanged until server shutdown.
         Note that for slots on the standby that are being synced from a
         primary server (whose <structfield>synced</structfield> field is
         <literal>true</literal>), the <structfield>inactive_since</structfield>
@@ -2620,6 +2621,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>idle_timeout</literal> means that the slot has remained
+          idle longer than the configured
+          <xref linkend="guc-idle-replication-slot-timeout"/> duration.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec3678..ca53caac2f 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d4..987857b949 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
@@ -1541,9 +1541,7 @@ update_synced_slots_inactive_since(void)
 			if (now == 0)
 				now = GetCurrentTimestamp();
 
-			SpinLockAcquire(&s->mutex);
-			s->inactive_since = now;
-			SpinLockRelease(&s->mutex);
+			ReplicationSlotSetInactiveSince(s, now, true);
 		}
 	}
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1..ee5aec817a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_IDLE_TIMEOUT] = "idle_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_IDLE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -141,6 +142,12 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 
+/*
+ * Invalidate replication slots that have remained idle longer than this
+ * duration; '0' disables it.
+ */
+int			idle_replication_slot_timeout_mins = HOURS_PER_DAY * MINS_PER_HOUR;
+
 /*
  * This GUC lists streaming replication standby server slot names that
  * logical WAL sender processes will wait for.
@@ -535,9 +542,12 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -615,6 +625,21 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * An error is raised if error_if_invalid is true and the slot has been
+	 * previously invalidated due to inactive timeout.
+	 */
+	if (error_if_invalid && s->data.invalidated == RS_INVAL_IDLE_TIMEOUT)
+	{
+		Assert(s->inactive_since > 0);
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("can no longer get changes from replication slot \"%s\"",
+						NameStr(s->data.name)),
+				 errdetail("This slot has been invalidated because it has remained idle longer than the configured \"%s\" duration.",
+						   "idle_replication_slot_timeout")));
+	}
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -703,16 +728,12 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
-		slot->inactive_since = now;
+		ReplicationSlotSetInactiveSince(slot, now, false);
 		SpinLockRelease(&slot->mutex);
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
 	else
-	{
-		SpinLockAcquire(&slot->mutex);
-		slot->inactive_since = now;
-		SpinLockRelease(&slot->mutex);
-	}
+		ReplicationSlotSetInactiveSince(slot, now, true);
 
 	MyReplicationSlot = NULL;
 
@@ -785,7 +806,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +833,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, false);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1508,7 +1529,8 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   NameData slotname,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
-					   TransactionId snapshotConflictHorizon)
+					   TransactionId snapshotConflictHorizon,
+					   TimestampTz inactive_since)
 {
 	StringInfoData err_detail;
 	bool		hint = false;
@@ -1538,6 +1560,16 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
 			break;
+
+		case RS_INVAL_IDLE_TIMEOUT:
+			Assert(inactive_since > 0);
+			/* translator: second %s is a GUC variable name */
+			appendStringInfo(&err_detail,
+							 _("The slot has remained idle since %s, which is longer than the configured \"%s\" duration."),
+							 timestamptz_to_str(inactive_since),
+							 "idle_replication_slot_timeout");
+			break;
+
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1554,6 +1586,32 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 	pfree(err_detail.data);
 }
 
+/*
+ * Can we invalidate an idle replication slot?
+ *
+ * Idle timeout invalidation is allowed only when:
+ *
+ * 1. Idle timeout is set
+ * 2. Slot has WAL reserved
+ * 3. Slot is inactive
+ * 4. The slot is not being synced from the primary while the server
+ *    is in recovery
+ *
+ * Note that the idle timeout invalidation mechanism is not
+ * applicable for slots on the standby server that are being synced
+ * from the primary server (i.e., standby slots having 'synced' field 'true').
+ * Synced slots are always considered to be inactive because they don't
+ * perform logical decoding to produce changes.
+ */
+static inline bool
+CanInvalidateIdleSlot(ReplicationSlot *s)
+{
+	return (idle_replication_slot_timeout_mins > 0 &&
+			!XLogRecPtrIsInvalid(s->data.restart_lsn) &&
+			s->inactive_since > 0 &&
+			!(RecoveryInProgress() && s->data.synced));
+}
+
 /*
  * Helper for InvalidateObsoleteReplicationSlots
  *
@@ -1581,6 +1639,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
+	TimestampTz inactive_since = 0;
 
 	for (;;)
 	{
@@ -1588,6 +1647,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		NameData	slotname;
 		int			active_pid = 0;
 		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
+		TimestampTz now = 0;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1598,6 +1658,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			break;
 		}
 
+		if (cause == RS_INVAL_IDLE_TIMEOUT)
+		{
+			/*
+			 * We get the current time beforehand to avoid system call while
+			 * holding the spinlock.
+			 */
+			now = GetCurrentTimestamp();
+		}
+
 		/*
 		 * Check if the slot needs to be invalidated. If it needs to be
 		 * invalidated, and is not currently acquired, acquire it and mark it
@@ -1651,6 +1720,21 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						invalidation_cause = cause;
 					break;
+				case RS_INVAL_IDLE_TIMEOUT:
+					Assert(now > 0);
+
+					/*
+					 * Check if the slot needs to be invalidated due to
+					 * idle_replication_slot_timeout GUC.
+					 */
+					if (CanInvalidateIdleSlot(s) &&
+						TimestampDifferenceExceedsSeconds(s->inactive_since, now,
+														  idle_replication_slot_timeout_mins * SECS_PER_MINUTE))
+					{
+						invalidation_cause = cause;
+						inactive_since = s->inactive_since;
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1735,7 +1819,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			{
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
-									   oldestLSN, snapshotConflictHorizon);
+									   oldestLSN, snapshotConflictHorizon,
+									   inactive_since);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SendProcSignal(active_pid,
@@ -1781,7 +1866,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
-								   oldestLSN, snapshotConflictHorizon);
+								   oldestLSN, snapshotConflictHorizon,
+								   inactive_since);
 
 			/* done with this slot for now */
 			break;
@@ -1796,14 +1882,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 /*
  * Invalidate slots that require resources about to be removed.
  *
- * Returns true when any slot have got invalidated.
+ * Returns true if there are any invalidated slots.
  *
- * Whether a slot needs to be invalidated depends on the cause. A slot is
- * removed if it:
+ * Whether a slot needs to be invalidated depends on the invalidation cause.
+ * A slot is invalidated if it:
  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
- * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
+ * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
+ *   "idle_replication_slot_timeout" duration.
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
@@ -1856,7 +1944,8 @@ restart:
 }
 
 /*
- * Flush all replication slots to disk.
+ * Flush all replication slots to disk. Also, invalidate obsolete slots during
+ * non-shutdown checkpoint.
  *
  * It is convenient to flush dirty replication slots at the time of checkpoint.
  * Additionally, in case of a shutdown checkpoint, we also identify the slots
@@ -1914,6 +2003,45 @@ CheckPointReplicationSlots(bool is_shutdown)
 		SaveSlotToPath(s, path, LOG);
 	}
 	LWLockRelease(ReplicationSlotAllocationLock);
+
+	if (!is_shutdown)
+	{
+		elog(DEBUG1, "performing replication slot invalidation checks");
+
+		/*
+		 * NB: We will make another pass over replication slots for
+		 * invalidation checks to keep the code simple. Testing shows that
+		 * there is no noticeable overhead (when compared with wal_removed
+		 * invalidation) even if we were to do idle_timeout invalidation of
+		 * thousands of replication slots here. If it is ever proven that this
+		 * assumption is wrong, we will have to perform the invalidation
+		 * checks in the above for loop with the following changes:
+		 *
+		 * - Acquire ControlLock lock once before the loop.
+		 *
+		 * - Call InvalidatePossiblyObsoleteSlot for each slot.
+		 *
+		 * - Handle the cases in which ControlLock gets released just like
+		 * InvalidateObsoleteReplicationSlots does.
+		 *
+		 * - Avoid saving slot info to disk two times for each invalidated
+		 * slot.
+		 *
+		 * XXX: Should we move idle_timeout invalidation check closer to
+		 * wal_removed in CreateCheckPoint and CreateRestartPoint?
+		 *
+		 * XXX: Slot invalidation due to 'idle_timeout' applies only to
+		 * released slots, and is based on the 'idle_replication_slot_timeout'
+		 * GUC. Active slots currently in use for replication are excluded to
+		 * prevent accidental invalidation. Slots where communication between
+		 * the publisher and subscriber is down are also excluded, as they are
+		 * managed by the 'wal_sender_timeout'.
+		 */
+		InvalidateObsoleteReplicationSlots(RS_INVAL_IDLE_TIMEOUT,
+										   0,
+										   InvalidOid,
+										   InvalidTransactionId);
+	}
 }
 
 /*
@@ -2398,7 +2526,9 @@ RestoreSlotFromDisk(const char *name)
 		/*
 		 * Set the time since the slot has become inactive after loading the
 		 * slot from the disk into memory. Whoever acquires the slot i.e.
-		 * makes the slot active will reset it.
+		 * makes the slot active will reset it. Avoid calling
+		 * ReplicationSlotSetInactiveSince() here, as it will not set the time
+		 * for invalid slots.
 		 */
 		slot->inactive_since = GetCurrentTimestamp();
 
@@ -2793,3 +2923,22 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
 
 	ConditionVariableCancelSleep();
 }
+
+/*
+ * GUC check_hook for idle_replication_slot_timeout
+ *
+ * The value of idle_replication_slot_timeout must be set to 0 during
+ * a binary upgrade. See start_postmaster() in pg_upgrade for more details.
+ */
+bool
+check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
+{
+	if (IsBinaryUpgrade && *newval != 0)
+	{
+		GUC_check_errdetail("The value of \"%s\" must be set to 0 during binary upgrade mode.",
+							"idle_replication_slot_timeout");
+		return false;
+	}
+
+	return true;
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789f..8be4b8c65b 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bac504b554..446d10c1a7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05..d44f8c262b 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index ba9bae0506..9682f9dbdc 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1786,6 +1786,24 @@ TimestampDifferenceExceeds(TimestampTz start_time,
 	return (diff >= msec * INT64CONST(1000));
 }
 
+/*
+ * Check if the difference between two timestamps is >= a given
+ * threshold (expressed in seconds).
+ */
+bool
+TimestampDifferenceExceedsSeconds(TimestampTz start_time,
+								  TimestampTz stop_time,
+								  int threshold_sec)
+{
+	long		secs;
+	int			usecs;
+
+	/* Calculate the difference in seconds */
+	TimestampDifference(start_time, stop_time, &secs, &usecs);
+
+	return (secs >= threshold_sec);
+}
+
 /*
  * Convert a time_t to TimestampTz.
  *
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 38cb9e970d..7cbba03bc1 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3048,6 +3048,20 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"idle_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the duration a replication slot can remain idle before "
+						 "it is invalidated."),
+			NULL,
+			GUC_UNIT_MIN
+		},
+		&idle_replication_slot_timeout_mins,
+		HOURS_PER_DAY * MINS_PER_HOUR,	/* 1 day */
+		0,
+		INT_MAX / SECS_PER_MINUTE,
+		check_idle_replication_slot_timeout, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 079efa1baa..0ed9eb057e 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -329,6 +329,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#idle_replication_slot_timeout = 1d	# in minutes; 0 disables
 
 # - Primary Server -
 
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index faf18ccf13..f2ef8d5ccc 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -1438,6 +1438,10 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
 	appendShellString(pg_ctl_cmd, subscriber_dir);
 	appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
+
+	/* Prevent unintended slot invalidation */
+	appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
+
 	if (restricted_access)
 	{
 		appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
diff --git a/src/bin/pg_upgrade/server.c b/src/bin/pg_upgrade/server.c
index de6971cde6..873e5b5117 100644
--- a/src/bin/pg_upgrade/server.c
+++ b/src/bin/pg_upgrade/server.c
@@ -252,6 +252,13 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
 	if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
 		appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1");
 
+	/*
+	 * Use idle_replication_slot_timeout=0 to prevent slot invalidation due to
+	 * idle_timeout by checkpointer process during upgrade.
+	 */
+	if (GET_MAJOR_VERSION(cluster->major_version) >= 1800)
+		appendPQExpBufferStr(&pgoptions, " -c idle_replication_slot_timeout=0");
+
 	/*
 	 * Use -b to disable autovacuum and logical replication launcher
 	 * (effective in PG17 or later for the latter).
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad0..f3994ab000 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -56,6 +56,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* idle slot timeout has occurred */
+	RS_INVAL_IDLE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -228,6 +230,25 @@ typedef struct ReplicationSlotCtlData
 	ReplicationSlot replication_slots[1];
 } ReplicationSlotCtlData;
 
+/*
+ * Set slot's inactive_since property unless it was previously invalidated.
+ */
+static inline void
+ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz now,
+								bool acquire_lock)
+{
+	if (s->data.invalidated != RS_INVAL_NONE)
+		return;
+
+	if (acquire_lock)
+		SpinLockAcquire(&s->mutex);
+
+	s->inactive_since = now;
+
+	if (acquire_lock)
+		SpinLockRelease(&s->mutex);
+}
+
 /*
  * Pointers to shared memory
  */
@@ -237,6 +258,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
+extern PGDLLIMPORT int idle_replication_slot_timeout_mins;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -253,7 +275,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 87999218d6..951451a976 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -174,5 +174,7 @@ extern void assign_wal_sync_method(int new_wal_sync_method, void *extra);
 extern bool check_synchronized_standby_slots(char **newval, void **extra,
 											 GucSource source);
 extern void assign_synchronized_standby_slots(const char *newval, void *extra);
+extern bool check_idle_replication_slot_timeout(int *newval, void **extra,
+												GucSource source);
 
 #endif							/* GUC_HOOKS_H */
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index d26f023fb8..e1d05d6779 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -143,5 +143,8 @@ extern int	date2isoyear(int year, int mon, int mday);
 extern int	date2isoyearday(int year, int mon, int mday);
 
 extern bool TimestampTimestampTzRequiresRewrite(void);
+extern bool TimestampDifferenceExceedsSeconds(TimestampTz start_time,
+											  TimestampTz stop_time,
+											  int threshold_sec);
 
 #endif							/* TIMESTAMP_H */
-- 
2.34.1

