From 4be9a47c3cce539b8b5879f8c0359f552d1d419a Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 6 Mar 2024 08:45:03 +0000
Subject: [PATCH v8 2/4] Add XID age based replication slot invalidation

Currently postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set an XID age (age of
slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which
the slots get invalidated.

To achieve the above, postgres uses replication slot xmin (the
oldest transaction that this slot needs the database to retain) or
catalog_xmin (the oldest transaction affecting the system catalogs
that this slot needs the database to retain), and a new GUC
max_slot_xid_age. The checkpointer then looks at all replication
slots invalidating the slots based on the age set.
---
 doc/src/sgml/config.sgml                      |  21 ++++
 src/backend/access/transam/xlog.c             |  10 ++
 src/backend/replication/slot.c                |  44 ++++++-
 src/backend/utils/misc/guc_tables.c           |  10 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |   3 +
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 108 ++++++++++++++++++
 8 files changed, 197 insertions(+), 1 deletion(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index b38cbd714a..7a8360cd32 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4544,6 +4544,27 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-slot-xid-age" xreflabel="max_slot_xid_age">
+      <term><varname>max_slot_xid_age</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_slot_xid_age</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots whose <literal>xmin</literal> (the oldest
+        transaction that this slot needs the database to retain) or
+        <literal>catalog_xmin</literal> (the oldest transaction affecting the
+        system catalogs that this slot needs the database to retain) has reached
+        the age specified by this setting. A value of zero (which is default)
+        disables this feature. Users can set this value anywhere from zero to
+        two billion. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server command
+        line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 20a5f86209..36ae2ac6a4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7147,6 +7147,11 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	/* Invalidate replication slots based on xmin or catalog_xmin age */
+	if (max_slot_xid_age > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
@@ -7597,6 +7602,11 @@ CreateRestartPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 
+	/* Invalidate replication slots based on xmin or catalog_xmin age */
+	if (max_slot_xid_age > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b0f48229cb..f05990aeb8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -86,10 +86,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_XID_AGE] = "xid_aged",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -119,6 +120,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variable */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+int			max_slot_xid_age = 0;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -1447,6 +1449,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
 			break;
+		case RS_INVAL_XID_AGE:
+			appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1563,6 +1568,42 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						conflict = cause;
 					break;
+				case RS_INVAL_XID_AGE:
+					{
+						TransactionId xid_cur = ReadNextTransactionId();
+						TransactionId xid_limit;
+						TransactionId xid_slot;
+
+						if (TransactionIdIsNormal(s->data.xmin))
+						{
+							xid_slot = s->data.xmin;
+
+							xid_limit = xid_slot + max_slot_xid_age;
+							if (xid_limit < FirstNormalTransactionId)
+								xid_limit += FirstNormalTransactionId;
+
+							if (TransactionIdFollowsOrEquals(xid_cur, xid_limit))
+							{
+								conflict = cause;
+								break;
+							}
+						}
+						if (TransactionIdIsNormal(s->data.catalog_xmin))
+						{
+							xid_slot = s->data.catalog_xmin;
+
+							xid_limit = xid_slot + max_slot_xid_age;
+							if (xid_limit < FirstNormalTransactionId)
+								xid_limit += FirstNormalTransactionId;
+
+							if (TransactionIdFollowsOrEquals(xid_cur, xid_limit))
+							{
+								conflict = cause;
+								break;
+							}
+						}
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1725,6 +1766,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 45013582a7..3ed642dcaf 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2954,6 +2954,16 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."),
+			gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.")
+		},
+		&max_slot_xid_age,
+		0, 0, 2000000000,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index edcc0282b2..50019d7c25 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -334,6 +334,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#max_slot_xid_age = 0
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 02a96b0e19..4b7ae36f11 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* slot's xmin or catalog_xmin has reached the age */
+	RS_INVAL_XID_AGE,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -226,6 +228,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT int max_slot_xid_age;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index c67249500e..d698c3ec73 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -50,6 +50,7 @@ tests += {
       't/039_end_of_wal.pl',
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
+      't/050_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..2f482b56e8
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,108 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Initialize primary node, setting wal-segsize to 1MB
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$primary->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+});
+$primary->start;
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot('sb1_slot');
+]);
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$standby1->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb1_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+$standby1->start;
+
+# Create some content on primary to move xmin
+$primary->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a");
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NOT NULL
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = 'sb1_slot';
+]) or die "Timed out waiting for slot xmin to advance";
+
+$primary->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET max_slot_xid_age = 500;
+]);
+$primary->reload;
+
+# Stop standby to make the replication slot's xmin on primary to age
+$standby1->stop;
+
+my $logstart = -s $primary->logfile;
+
+# Do some work to advance xmin
+$primary->safe_psql(
+	'postgres', q{
+do $$
+begin
+  for i in 10000..11000 loop
+    -- use an exception block so that each iteration eats an XID
+    begin
+      insert into tab_int values (i);
+    exception
+      when division_by_zero then null;
+    end;
+  end loop;
+end$$;
+});
+
+my $invalidated = 0;
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$primary->safe_psql('postgres', "CHECKPOINT");
+	if ($primary->log_contains(
+			'invalidating obsolete replication slot "sb1_slot"', $logstart))
+	{
+		$invalidated = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($invalidated, 'check that slot sb1_slot invalidation has been logged');
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb1_slot' AND
+		invalidation_reason = 'xid_aged';
+])
+  or die
+  "Timed out while waiting for replication slot sb1_slot to be invalidated";
+
+done_testing();
-- 
2.34.1

