From 4987776c0883d4a6a76bd5280ad09ae03e5a8faf Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Mon, 3 Feb 2025 15:20:40 +0530
Subject: [PATCH v74 1/2] Introduce inactive_timeout based replication slot
 invalidation

Till now, postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
this GUC is a bit tricky. Because the amount of WAL a database
generates, and the allocated storage per instance will vary
greatly in production, making it difficult to pin down a
one-size-fits-all value.

It is often easy for users to set a timeout of say 1 or 2 or n
days, after which all the inactive slots get invalidated. This
commit introduces a GUC named idle_replication_slot_timeout.
When set, postgres invalidates slots (during non-shutdown
checkpoints) that are idle for longer than this amount of
time.

Note that the idle timeout invalidation mechanism is not applicable
for slots that do not reserve WAL or for slots on the standby server
that are being synced from the primary server (i.e., standby slots
having 'synced' field 'true'). Synced slots are always considered to be
inactive because they don't perform logical decoding to produce changes.
---
 doc/src/sgml/config.sgml                      |  42 +++
 doc/src/sgml/logical-replication.sgml         |   5 +
 doc/src/sgml/system-views.sgml                |   7 +
 src/backend/access/transam/xlog.c             |   4 +-
 src/backend/replication/slot.c                | 251 +++++++++++++-----
 src/backend/replication/slotfuncs.c           |   2 +-
 src/backend/utils/adt/timestamp.c             |  18 ++
 src/backend/utils/misc/guc_tables.c           |  12 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/bin/pg_basebackup/pg_createsubscriber.c   |   4 +
 src/bin/pg_upgrade/server.c                   |   7 +
 src/include/replication/slot.h                |  27 +-
 src/include/utils/guc_hooks.h                 |   2 +
 src/include/utils/timestamp.h                 |   3 +
 src/tools/pgindent/typedefs.list              |   1 +
 15 files changed, 315 insertions(+), 71 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 38244409e3..a915a43625 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4423,6 +4423,48 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
        </listitem>
       </varlistentry>
 
+     <varlistentry id="guc-idle-replication-slot-timeout" xreflabel="idle_replication_slot_timeout">
+      <term><varname>idle_replication_slot_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>idle_replication_slot_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots that have remained idle longer than this
+        duration. If this value is specified without units, it is taken as
+        minutes. A value of zero (the default) disables the idle timeout
+        invalidation mechanism. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server command
+        line.
+       </para>
+
+       <para>
+        Slot invalidation due to idle timeout occurs during checkpoint.
+        Because checkpoints happen at <varname>checkpoint_timeout</varname>
+        intervals, there can be some lag between when the
+        <varname>idle_replication_slot_timeout</varname> was exceeded and when
+        the slot invalidation is triggered at the next checkpoint.
+        To avoid such lags, users can force a checkpoint to promptly invalidate
+        inactive slots. The duration of slot inactivity is calculated using the
+        slot's <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>inactive_since</structfield>
+        value.
+       </para>
+
+       <para>
+        Note that the idle timeout invalidation mechanism is not applicable
+        for slots that do not reserve WAL or for slots on the standby server
+        that are being synced from the primary server (i.e., standby slots
+        having <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
+        value <literal>true</literal>). Synced slots are always considered to
+        be inactive because they don't perform logical decoding to produce
+        changes. Slots that appear idle due to a disrupted connection between
+        the publisher and subscriber are also excluded, as they are managed by
+        <link linkend="guc-wal-sender-timeout"><varname>wal_sender_timeout</varname></link>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 613abcd28b..3d18e507bb 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -2390,6 +2390,11 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
     plus some reserve for table synchronization.
    </para>
 
+   <para>
+    Logical replication slots are also affected by
+    <link linkend="guc-idle-replication-slot-timeout"><varname>idle_replication_slot_timeout</varname></link>.
+   </para>
+
    <para>
     <link linkend="guc-max-wal-senders"><varname>max_wal_senders</varname></link>
     should be set to at least the same as
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index be81c2b51d..f58b9406e4 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2621,6 +2621,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           perform logical decoding.  It is set only for logical slots.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>idle_timeout</literal> means that the slot has remained
+          idle longer than the configured
+          <xref linkend="guc-idle-replication-slot-timeout"/> duration.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9c270e7d46..3eaf0bf311 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7337,7 +7337,7 @@ CreateCheckPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
+	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT,
 										   _logSegNo, InvalidOid,
 										   InvalidTransactionId))
 	{
@@ -7792,7 +7792,7 @@ CreateRestartPoint(int flags)
 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
 	endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
 	KeepLogSeg(endptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
+	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT,
 										   _logSegNo, InvalidOid,
 										   InvalidTransactionId))
 	{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index fe5acd8b1f..eccada9d4c 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -102,18 +102,16 @@ typedef struct
 /*
  * Lookup table for slot invalidation causes.
  */
-const char *const SlotInvalidationCauses[] = {
-	[RS_INVAL_NONE] = "none",
-	[RS_INVAL_WAL_REMOVED] = "wal_removed",
-	[RS_INVAL_HORIZON] = "rows_removed",
-	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+const SlotInvalidationCauseMap InvalidationCauses[] = {
+	{RS_INVAL_NONE, "none"},
+	{RS_INVAL_WAL_REMOVED, "wal_removed"},
+	{RS_INVAL_HORIZON, "rows_removed"},
+	{RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
+	{RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
-
-StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
-				 "array length mismatch");
+#define	RS_INVAL_MAX_CAUSES (lengthof(InvalidationCauses)-1)
 
 /* size of version independent data */
 #define ReplicationSlotOnDiskConstantSize \
@@ -141,6 +139,12 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 
+/*
+ * Invalidate replication slots that have remained idle longer than this
+ * duration; '0' disables it.
+ */
+int			idle_replication_slot_timeout_mins = 0;
+
 /*
  * This GUC lists streaming replication standby server slot names that
  * logical WAL sender processes will wait for.
@@ -575,7 +579,7 @@ retry:
 				errmsg("can no longer access replication slot \"%s\"",
 					   NameStr(s->data.name)),
 				errdetail("This replication slot has been invalidated due to \"%s\".",
-						  SlotInvalidationCauses[s->data.invalidated]));
+						  GetSlotInvalidationCauseName(s->data.invalidated)));
 	}
 
 	/*
@@ -1512,12 +1516,18 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   NameData slotname,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
-					   TransactionId snapshotConflictHorizon)
+					   TransactionId snapshotConflictHorizon,
+					   TimestampTz inactive_since,
+					   TimestampTz now)
 {
+	int			minutes;
+	int			secs;
+	long		elapsed_secs;
 	StringInfoData err_detail;
-	bool		hint = false;
+	StringInfoData err_hint;
 
 	initStringInfo(&err_detail);
+	initStringInfo(&err_hint);
 
 	switch (cause)
 	{
@@ -1525,13 +1535,15 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 			{
 				unsigned long long ex = oldestLSN - restart_lsn;
 
-				hint = true;
 				appendStringInfo(&err_detail,
 								 ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
 										  "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
 										  ex),
 								 LSN_FORMAT_ARGS(restart_lsn),
 								 ex);
+				/* translator: %s is a GUC variable name */
+				appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
+								 "max_slot_wal_keep_size");
 				break;
 			}
 		case RS_INVAL_HORIZON:
@@ -1542,6 +1554,24 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
 			break;
+
+		case RS_INVAL_IDLE_TIMEOUT:
+			Assert(inactive_since > 0 && now > 0);
+
+			/* Calculate the idle time duration of the slot */
+			elapsed_secs = (now - inactive_since) / USECS_PER_SEC;
+			minutes = elapsed_secs / SECS_PER_MINUTE;
+			secs = elapsed_secs % SECS_PER_MINUTE;
+
+			/* translator: %s is a GUC variable name */
+			appendStringInfo(&err_detail, _("The slot's idle time of %d minutes and %02d seconds exceeds the configured \"%s\" duration of %d minutes."),
+							 minutes, secs, "idle_replication_slot_timeout",
+							 idle_replication_slot_timeout_mins);
+			/* translator: %s is a GUC variable name */
+			appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
+							 "idle_replication_slot_timeout");
+			break;
+
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1553,9 +1583,31 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 			errmsg("invalidating obsolete replication slot \"%s\"",
 				   NameStr(slotname)),
 			errdetail_internal("%s", err_detail.data),
-			hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0);
+			err_hint.len ? errhint("%s", err_hint.data) : 0);
 
 	pfree(err_detail.data);
+	pfree(err_hint.data);
+}
+
+/*
+ * Can we invalidate an idle replication slot?
+ *
+ * Idle timeout invalidation is allowed only when:
+ *
+ * 1. Idle timeout is set
+ * 2. Slot has reserved WAL
+ * 3. Slot is inactive
+ * 4. The slot is not being synced from the primary while the server is in
+ *	  recovery. This is because synced slots are always considered to be
+ *	  inactive because they don't perform logical decoding to produce changes.
+ */
+static inline bool
+CanInvalidateIdleSlot(ReplicationSlot *s)
+{
+	return (idle_replication_slot_timeout_mins != 0 &&
+			!XLogRecPtrIsInvalid(s->data.restart_lsn) &&
+			s->inactive_since > 0 &&
+			!(RecoveryInProgress() && s->data.synced));
 }
 
 /*
@@ -1572,7 +1624,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
  * for syscalls, so caller must restart if we return true.
  */
 static bool
-InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 							   ReplicationSlot *s,
 							   XLogRecPtr oldestLSN,
 							   Oid dboid, TransactionId snapshotConflictHorizon,
@@ -1585,6 +1637,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 	TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
 	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
 	ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
+	TimestampTz inactive_since = 0;
 
 	for (;;)
 	{
@@ -1592,6 +1645,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		NameData	slotname;
 		int			active_pid = 0;
 		ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
+		TimestampTz now = 0;
 
 		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
 
@@ -1602,6 +1656,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			break;
 		}
 
+		if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
+		{
+			/*
+			 * Assign the current time here to avoid system call overhead
+			 * while holding the spinlock in subsequent code.
+			 */
+			now = GetCurrentTimestamp();
+		}
+
 		/*
 		 * Check if the slot needs to be invalidated. If it needs to be
 		 * invalidated, and is not currently acquired, acquire it and mark it
@@ -1629,34 +1692,49 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 				initial_catalog_effective_xmin = s->effective_catalog_xmin;
 			}
 
-			switch (cause)
+			if (possible_causes & RS_INVAL_WAL_REMOVED)
+			{
+				if (initial_restart_lsn != InvalidXLogRecPtr &&
+					initial_restart_lsn < oldestLSN)
+					invalidation_cause = RS_INVAL_WAL_REMOVED;
+			}
+			if (invalidation_cause == RS_INVAL_NONE &&
+				(possible_causes & RS_INVAL_HORIZON))
+			{
+				if (SlotIsLogical(s) &&
+				/* invalid DB oid signals a shared relation */
+					(dboid == InvalidOid || dboid == s->data.database) &&
+					TransactionIdIsValid(initial_effective_xmin) &&
+					TransactionIdPrecedesOrEquals(initial_effective_xmin,
+												  snapshotConflictHorizon))
+					invalidation_cause = RS_INVAL_HORIZON;
+				else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
+						 TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
+													   snapshotConflictHorizon))
+					invalidation_cause = RS_INVAL_HORIZON;
+			}
+			if (invalidation_cause == RS_INVAL_NONE &&
+				(possible_causes & RS_INVAL_WAL_LEVEL))
 			{
-				case RS_INVAL_WAL_REMOVED:
-					if (initial_restart_lsn != InvalidXLogRecPtr &&
-						initial_restart_lsn < oldestLSN)
-						invalidation_cause = cause;
-					break;
-				case RS_INVAL_HORIZON:
-					if (!SlotIsLogical(s))
-						break;
-					/* invalid DB oid signals a shared relation */
-					if (dboid != InvalidOid && dboid != s->data.database)
-						break;
-					if (TransactionIdIsValid(initial_effective_xmin) &&
-						TransactionIdPrecedesOrEquals(initial_effective_xmin,
-													  snapshotConflictHorizon))
-						invalidation_cause = cause;
-					else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
-							 TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
-														   snapshotConflictHorizon))
-						invalidation_cause = cause;
-					break;
-				case RS_INVAL_WAL_LEVEL:
-					if (SlotIsLogical(s))
-						invalidation_cause = cause;
-					break;
-				case RS_INVAL_NONE:
-					pg_unreachable();
+				if (SlotIsLogical(s))
+					invalidation_cause = RS_INVAL_WAL_LEVEL;
+			}
+			if (invalidation_cause == RS_INVAL_NONE &&
+				(possible_causes & RS_INVAL_IDLE_TIMEOUT))
+			{
+				Assert(now > 0);
+
+				/*
+				 * Check if the slot needs to be invalidated due to
+				 * idle_replication_slot_timeout GUC.
+				 */
+				if (CanInvalidateIdleSlot(s) &&
+					TimestampDifferenceExceedsSeconds(s->inactive_since, now,
+													  idle_replication_slot_timeout_mins * SECS_PER_MINUTE))
+				{
+					invalidation_cause = RS_INVAL_IDLE_TIMEOUT;
+					inactive_since = s->inactive_since;
+				}
 			}
 		}
 
@@ -1705,9 +1783,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 		/*
 		 * The logical replication slots shouldn't be invalidated as GUC
-		 * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
-		 * check_old_cluster_for_valid_slots() where we ensure that no
-		 * invalidated before the upgrade.
+		 * max_slot_wal_keep_size is set to -1 and
+		 * idle_replication_slot_timeout is set to 0 during the binary
+		 * upgrade. See check_old_cluster_for_valid_slots() where we ensure
+		 * that no invalidated before the upgrade.
 		 */
 		Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
 
@@ -1739,7 +1818,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			{
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
-									   oldestLSN, snapshotConflictHorizon);
+									   oldestLSN, snapshotConflictHorizon,
+									   inactive_since, now);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SendProcSignal(active_pid,
@@ -1785,7 +1865,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
-								   oldestLSN, snapshotConflictHorizon);
+								   oldestLSN, snapshotConflictHorizon,
+								   inactive_since, now);
 
 			/* done with this slot for now */
 			break;
@@ -1800,28 +1881,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 /*
  * Invalidate slots that require resources about to be removed.
  *
- * Returns true when any slot have got invalidated.
+ * Returns true if there are any invalidated slots.
  *
- * Whether a slot needs to be invalidated depends on the cause. A slot is
- * removed if it:
+ * Whether a slot needs to be invalidated depends on the invalidation cause.
+ * A slot is invalidated if it:
  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
- * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
+ * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
+ *   "idle_replication_slot_timeout" duration.
+ *
+ * Note: This function attempts to invalidate the slot for multiple possible
+ * causes in a single pass, minimizing redundant iterations. The "cause"
+ * parameter can be a MASK representing one or more of the defined causes.
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
-InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+InvalidateObsoleteReplicationSlots(uint32 possible_causes,
 								   XLogSegNo oldestSegno, Oid dboid,
 								   TransactionId snapshotConflictHorizon)
 {
 	XLogRecPtr	oldestLSN;
 	bool		invalidated = false;
 
-	Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
-	Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
-	Assert(cause != RS_INVAL_NONE);
+	Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
+	Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
+	Assert(!(possible_causes & RS_INVAL_NONE));
 
 	if (max_replication_slots == 0)
 		return invalidated;
@@ -1837,7 +1924,7 @@ restart:
 		if (!s->in_use)
 			continue;
 
-		if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
+		if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
 										   snapshotConflictHorizon,
 										   &invalidated))
 		{
@@ -2428,18 +2515,43 @@ RestoreSlotFromDisk(const char *name)
 ReplicationSlotInvalidationCause
 GetSlotInvalidationCause(const char *invalidation_reason)
 {
-	ReplicationSlotInvalidationCause cause;
 	ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
 	bool		found PG_USED_FOR_ASSERTS_ONLY = false;
+	int			cause_idx;
 
 	Assert(invalidation_reason);
 
-	for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
+	for (cause_idx = 0; cause_idx <= RS_INVAL_MAX_CAUSES; cause_idx++)
 	{
-		if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
+		if (strcmp(InvalidationCauses[cause_idx].cause_name, invalidation_reason) == 0)
 		{
 			found = true;
-			result = cause;
+			result = InvalidationCauses[cause_idx].cause;
+			break;
+		}
+	}
+
+	Assert(found);
+	return result;
+}
+
+/*
+ * Maps an ReplicationSlotInvalidationCause to the invalidation
+ * reason for a replication slot.
+ */
+const char *
+GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
+{
+	const char *result = "none";
+	bool		found PG_USED_FOR_ASSERTS_ONLY = false;
+	int			cause_idx;
+
+	for (cause_idx = 0; cause_idx <= RS_INVAL_MAX_CAUSES; cause_idx++)
+	{
+		if (InvalidationCauses[cause_idx].cause == cause)
+		{
+			found = true;
+			result = InvalidationCauses[cause_idx].cause_name;
 			break;
 		}
 	}
@@ -2802,3 +2914,22 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
 
 	ConditionVariableCancelSleep();
 }
+
+/*
+ * GUC check_hook for idle_replication_slot_timeout
+ *
+ * The value of idle_replication_slot_timeout must be set to 0 during
+ * a binary upgrade. See start_postmaster() in pg_upgrade for more details.
+ */
+bool
+check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
+{
+	if (IsBinaryUpgrade && *newval != 0)
+	{
+		GUC_check_errdetail("The value of \"%s\" must be set to 0 during binary upgrade mode.",
+							"idle_replication_slot_timeout");
+		return false;
+	}
+
+	return true;
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8be4b8c65b..f652ec8a73 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -431,7 +431,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		if (cause == RS_INVAL_NONE)
 			nulls[i++] = true;
 		else
-			values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
+			values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause));
 
 		values[i++] = BoolGetDatum(slot_contents.data.failover);
 
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index ba9bae0506..9682f9dbdc 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1786,6 +1786,24 @@ TimestampDifferenceExceeds(TimestampTz start_time,
 	return (diff >= msec * INT64CONST(1000));
 }
 
+/*
+ * Check if the difference between two timestamps is >= a given
+ * threshold (expressed in seconds).
+ */
+bool
+TimestampDifferenceExceedsSeconds(TimestampTz start_time,
+								  TimestampTz stop_time,
+								  int threshold_sec)
+{
+	long		secs;
+	int			usecs;
+
+	/* Calculate the difference in seconds */
+	TimestampDifference(start_time, stop_time, &secs, &usecs);
+
+	return (secs >= threshold_sec);
+}
+
 /*
  * Convert a time_t to TimestampTz.
  *
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index ce7534d4d2..758329b1c1 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3048,6 +3048,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"idle_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the duration a replication slot can remain idle before "
+						 "it is invalidated."),
+			NULL,
+			GUC_UNIT_MIN
+		},
+		&idle_replication_slot_timeout_mins,
+		0, 0, INT_MAX / SECS_PER_MINUTE,
+		check_idle_replication_slot_timeout, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c40b7a3121..f9a5561166 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -326,6 +326,7 @@
 				# (change requires restart)
 #wal_keep_size = 0		# in megabytes; 0 disables
 #max_slot_wal_keep_size = -1	# in megabytes; -1 disables
+#idle_replication_slot_timeout = 0	# in minutes; 0 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index faf18ccf13..f2ef8d5ccc 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -1438,6 +1438,10 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
 	appendShellString(pg_ctl_cmd, subscriber_dir);
 	appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
+
+	/* Prevent unintended slot invalidation */
+	appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
+
 	if (restricted_access)
 	{
 		appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
diff --git a/src/bin/pg_upgrade/server.c b/src/bin/pg_upgrade/server.c
index de6971cde6..873e5b5117 100644
--- a/src/bin/pg_upgrade/server.c
+++ b/src/bin/pg_upgrade/server.c
@@ -252,6 +252,13 @@ start_postmaster(ClusterInfo *cluster, bool report_and_exit_on_error)
 	if (GET_MAJOR_VERSION(cluster->major_version) >= 1700)
 		appendPQExpBufferStr(&pgoptions, " -c max_slot_wal_keep_size=-1");
 
+	/*
+	 * Use idle_replication_slot_timeout=0 to prevent slot invalidation due to
+	 * idle_timeout by checkpointer process during upgrade.
+	 */
+	if (GET_MAJOR_VERSION(cluster->major_version) >= 1800)
+		appendPQExpBufferStr(&pgoptions, " -c idle_replication_slot_timeout=0");
+
 	/*
 	 * Use -b to disable autovacuum and logical replication launcher
 	 * (effective in PG17 or later for the latter).
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 000c36d30d..161784b15b 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -44,21 +44,30 @@ typedef enum ReplicationSlotPersistency
  * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
  * 'invalidated' field is set to a value other than _NONE.
  *
- * When adding a new invalidation cause here, remember to update
- * SlotInvalidationCauses and RS_INVAL_MAX_CAUSES.
+ * When adding a new invalidation cause here, the value must be powers of 2
+ * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
+ * SlotInvalidationCauseMap in slot.c.
  */
 typedef enum ReplicationSlotInvalidationCause
 {
-	RS_INVAL_NONE,
+	RS_INVAL_NONE = 0,
 	/* required WAL has been removed */
-	RS_INVAL_WAL_REMOVED,
+	RS_INVAL_WAL_REMOVED = (1 << 0),
 	/* required rows have been removed */
-	RS_INVAL_HORIZON,
+	RS_INVAL_HORIZON = (1 << 1),
 	/* wal_level insufficient for slot */
-	RS_INVAL_WAL_LEVEL,
+	RS_INVAL_WAL_LEVEL = (1 << 2),
+	/* idle slot timeout has occurred */
+	RS_INVAL_IDLE_TIMEOUT = (1 << 3),
 } ReplicationSlotInvalidationCause;
 
-extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
+typedef struct SlotInvalidationCauseMap
+{
+	int			cause;
+	const char *cause_name;
+} SlotInvalidationCauseMap;
+
+extern PGDLLIMPORT const SlotInvalidationCauseMap InvalidationCauses[];
 
 /*
  * On-Disk data of a replication slot, preserved across restarts.
@@ -254,6 +263,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
+extern PGDLLIMPORT int idle_replication_slot_timeout_mins;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -286,7 +296,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
 											   XLogSegNo oldestSegno,
 											   Oid dboid,
 											   TransactionId snapshotConflictHorizon);
@@ -303,6 +313,7 @@ extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
 extern ReplicationSlotInvalidationCause
 			GetSlotInvalidationCause(const char *invalidation_reason);
+extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause);
 
 extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
 extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 87999218d6..951451a976 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -174,5 +174,7 @@ extern void assign_wal_sync_method(int new_wal_sync_method, void *extra);
 extern bool check_synchronized_standby_slots(char **newval, void **extra,
 											 GucSource source);
 extern void assign_synchronized_standby_slots(const char *newval, void *extra);
+extern bool check_idle_replication_slot_timeout(int *newval, void **extra,
+												GucSource source);
 
 #endif							/* GUC_HOOKS_H */
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index d26f023fb8..9963bddc0e 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -107,6 +107,9 @@ extern long TimestampDifferenceMilliseconds(TimestampTz start_time,
 extern bool TimestampDifferenceExceeds(TimestampTz start_time,
 									   TimestampTz stop_time,
 									   int msec);
+extern bool TimestampDifferenceExceedsSeconds(TimestampTz start_time,
+											  TimestampTz stop_time,
+											  int threshold_sec);
 
 extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
 extern pg_time_t timestamptz_to_time_t(TimestampTz t);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9a3bee93de..af8eda5c85 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2687,6 +2687,7 @@ SkipPages
 SlabBlock
 SlabContext
 SlabSlot
+SlotInvalidationCauseMap
 SlotNumber
 SlotSyncCtxStruct
 SlruCtl
-- 
2.34.1

