From 1d3d286b97607bce67cdfbf7cab6f0a9b734a204 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 6 Mar 2024 08:46:47 +0000
Subject: [PATCH v8 4/4] Add inactive_timeout based replication slot
 invalidation

Currently postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days, after which the inactive slots get dropped.

To achieve the above, postgres uses replication slot metric
inactive_at (the time at which the slot became inactive), and a
new GUC inactive_replication_slot_timeout. The checkpointer then
looks at all replication slots invalidating the inactive slots
based on the timeout set.
---
 doc/src/sgml/config.sgml                      | 18 +++++
 src/backend/access/transam/xlog.c             | 10 +++
 src/backend/replication/slot.c                | 22 +++++-
 src/backend/utils/misc/guc_tables.c           | 12 +++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/replication/slot.h                |  3 +
 src/test/recovery/t/050_invalidate_slots.pl   | 79 +++++++++++++++++++
 7 files changed, 144 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 7a8360cd32..f5c299ef73 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4565,6 +4565,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-inactive-replication-slot-timeout" xreflabel="inactive_replication_slot_timeout">
+      <term><varname>inactive_replication_slot_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>inactive_replication_slot_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots that are inactive for longer than this
+        amount of time at the next checkpoint. If this value is specified
+        without units, it is taken as seconds. A value of zero (which is
+        default) disables the timeout mechanism. This parameter can only be
+        set in the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 36ae2ac6a4..166c3ed794 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7152,6 +7152,11 @@ CreateCheckPoint(int flags)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
 										   InvalidOid, InvalidTransactionId);
 
+	/* Invalidate inactive replication slots based on timeout */
+	if (inactive_replication_slot_timeout > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
@@ -7607,6 +7612,11 @@ CreateRestartPoint(int flags)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
 										   InvalidOid, InvalidTransactionId);
 
+	/* Invalidate inactive replication slots based on timeout */
+	if (inactive_replication_slot_timeout > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 9e323b58b3..2360682e05 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -87,10 +87,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
 	[RS_INVAL_XID_AGE] = "xid_aged",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -121,6 +122,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 int			max_slot_xid_age = 0;
+int			inactive_replication_slot_timeout = 0;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -1477,6 +1479,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_XID_AGE:
 			appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by inactive_replication_slot_timeout."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1629,6 +1634,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 						}
 					}
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+					if (s->data.last_inactive_at > 0)
+					{
+						TimestampTz now;
+
+						Assert(s->data.persistency == RS_PERSISTENT);
+						Assert(s->active_pid == 0);
+
+						now = GetCurrentTimestamp();
+						if (TimestampDifferenceExceeds(s->data.last_inactive_at, now,
+													   inactive_replication_slot_timeout * 1000))
+							conflict = cause;
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1792,6 +1811,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
  * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 3ed642dcaf..06e3e87f4a 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2964,6 +2964,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"inactive_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time to wait before invalidating an "
+						 "inactive replication slot."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&inactive_replication_slot_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 50019d7c25..092aaf1bec 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -261,6 +261,7 @@
 #recovery_prefetch = try	# prefetch pages referenced in the WAL?
 #wal_decode_buffer_size = 512kB	# lookahead window used for prefetching
 				# (change requires restart)
+#inactive_replication_slot_timeout = 0	# in seconds; 0 disables
 
 # - Archiving -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7d668918b0..7ae98046a4 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -55,6 +55,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_WAL_LEVEL,
 	/* slot's xmin or catalog_xmin has reached the age */
 	RS_INVAL_XID_AGE,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -235,6 +237,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT int max_slot_xid_age;
+extern PGDLLIMPORT int inactive_replication_slot_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
index 2f482b56e8..4c66dd4a4e 100644
--- a/src/test/recovery/t/050_invalidate_slots.pl
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -105,4 +105,83 @@ $primary->poll_query_until(
   or die
   "Timed out while waiting for replication slot sb1_slot to be invalidated";
 
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot('sb2_slot');
+]);
+
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET max_slot_xid_age = 0;
+]);
+$primary->reload;
+
+# Create a standby linking to the primary using the replication slot
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup($primary, $backup_name, has_streaming => 1);
+$standby2->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb2_slot'
+});
+$standby2->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby2);
+
+# The inactive replication slot info should be null when the slot is active
+my $result = $primary->safe_psql(
+	'postgres', qq[
+	SELECT last_inactive_at IS NULL, inactive_count = 0 AS OK
+		FROM pg_replication_slots WHERE slot_name = 'sb2_slot';
+]);
+is($result, "t|t",
+	'check the inactive replication slot info for an active slot');
+
+# Set timeout so that the next checkpoint will invalidate the inactive
+# replication slot.
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET inactive_replication_slot_timeout TO '1s';
+]);
+$primary->reload;
+
+$logstart = -s $primary->logfile;
+
+# Stop standby to make the replication slot on primary inactive
+$standby2->stop;
+
+# Wait for the inactive replication slot info to be updated
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE last_inactive_at IS NOT NULL AND
+		inactive_count = 1 AND slot_name = 'sb2_slot';
+])
+  or die
+  "Timed out while waiting for inactive replication slot info to be updated";
+
+$invalidated = 0;
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$primary->safe_psql('postgres', "CHECKPOINT");
+	if ($primary->log_contains(
+			'invalidating obsolete replication slot "sb2_slot"', $logstart))
+	{
+		$invalidated = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($invalidated, 'check that slot sb2_slot invalidation has been logged');
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb2_slot' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for inactive replication slot sb2_slot to be invalidated";
+
 done_testing();
-- 
2.34.1

