On Thu, Mar 14, 2024 at 12:24 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Wed, Mar 13, 2024 at 9:24 PM Bharath Rupireddy
> >
> > Yes, there will be some sort of duplicity if we emit conflict_reason
> > as a text field. However, I still think the better way is to turn
> > conflict_reason text to conflict boolean and set it to true only on
> > rows_removed and wal_level_insufficient invalidations. When conflict
> > boolean is true, one (including all the tests that we've added
> > recently) can look for invalidation_reason text field for the reason.
> > This sounds reasonable to me as opposed to we just mentioning in the
> > docs that "if invalidation_reason is rows_removed or
> > wal_level_insufficient it's the reason for conflict with recovery".
> >
> Fair point. I think we can go either way. Bertrand, Nathan, and
> others, do you have an opinion on this matter?

While we wait to hear from others on this, I'm attaching the v9 patch
set implementing the above idea (check 0001 patch). Please have a
look. I'll come back to the other review comments soon.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From 18855c08cd8bcbaf41aba10048f0ea23a246e546 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 14 Mar 2024 12:48:52 +0000
Subject: [PATCH v9 1/4] Track invalidation_reason in pg_replication_slots

Up until now, reason for replication slot invalidation is not
tracked in pg_replication_slots. A recent commit 007693f2a added
conflict_reason to show the reasons for slot invalidation, but
only for logical slots.

This commit adds a new column to show invalidation reasons for
both physical and logical slots. And, this commit also turns
conflict_reason text column to conflicting boolean column
(effectively reverting commit 007693f2a). One now can look at the
new invalidation_reason column for logical slots conflict with
recovery.
---
 doc/src/sgml/ref/pgupgrade.sgml               |  4 +-
 doc/src/sgml/system-views.sgml                | 63 +++++++++++--------
 src/backend/catalog/system_views.sql          |  5 +-
 src/backend/replication/logical/slotsync.c    |  2 +-
 src/backend/replication/slot.c                |  8 +--
 src/backend/replication/slotfuncs.c           | 25 +++++---
 src/bin/pg_upgrade/info.c                     |  4 +-
 src/include/catalog/pg_proc.dat               |  6 +-
 src/include/replication/slot.h                |  2 +-
 .../t/035_standby_logical_decoding.pl         | 35 ++++++-----
 .../t/040_standby_failover_slots_sync.pl      |  4 +-
 src/test/regress/expected/rules.out           |  7 ++-
 12 files changed, 95 insertions(+), 70 deletions(-)

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 58c6c2df8b..8de52bf752 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -453,8 +453,8 @@ make prefix=/usr/local/pgsql.new install
       <para>
        All slots on the old cluster must be usable, i.e., there are no slots
        whose
-       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflict_reason</structfield>
-       is not <literal>NULL</literal>.
+       <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflicting</structfield>
+       is not <literal>true</literal>.
       </para>
      </listitem>
      <listitem>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index be90edd0e2..f3fb5ba1b0 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2525,34 +2525,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>conflict_reason</structfield> <type>text</type>
+       <structfield>conflicting</structfield> <type>bool</type>
       </para>
       <para>
-       The reason for the logical slot's conflict with recovery. It is always
-       NULL for physical slots, as well as for logical slots which are not
-       invalidated. The non-NULL values indicate that the slot is marked
-       as invalidated. Possible values are:
-       <itemizedlist spacing="compact">
-        <listitem>
-         <para>
-          <literal>wal_removed</literal> means that the required WAL has been
-          removed.
-         </para>
-        </listitem>
-        <listitem>
-         <para>
-          <literal>rows_removed</literal> means that the required rows have
-          been removed.
-         </para>
-        </listitem>
-        <listitem>
-         <para>
-          <literal>wal_level_insufficient</literal> means that the
-          primary doesn't have a <xref linkend="guc-wal-level"/> sufficient to
-          perform logical decoding.
-         </para>
-        </listitem>
-       </itemizedlist>
+       True if this logical slot conflicted with recovery (and so is now
+       invalidated). When this column is true, check
+       <structfield>invalidation_reason</structfield> column for the conflict
+       reason.
       </para></entry>
      </row>
 
@@ -2581,6 +2560,38 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>invalidation_reason</structfield> <type>text</type>
+      </para>
+      <para>
+       The reason for the slot's invalidation. <literal>NULL</literal> if the
+       slot is currently actively being used. The non-NULL values indicate that
+       the slot is marked as invalidated. Possible values are:
+       <itemizedlist spacing="compact">
+        <listitem>
+         <para>
+          <literal>wal_removed</literal> means that the required WAL has been
+          removed.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>rows_removed</literal> means that the required rows have
+          been removed.
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <literal>wal_level_insufficient</literal> means that the
+          primary doesn't have a <xref linkend="guc-wal-level"/> sufficient to
+          perform logical decoding.
+         </para>
+        </listitem>
+       </itemizedlist>
+      </para></entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 04227a72d1..cd22dad959 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1023,9 +1023,10 @@ CREATE VIEW pg_replication_slots AS
             L.wal_status,
             L.safe_wal_size,
             L.two_phase,
-            L.conflict_reason,
+            L.conflicting,
             L.failover,
-            L.synced
+            L.synced,
+            L.invalidation_reason
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 5074c8409f..260632cfdd 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -668,7 +668,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, failover,"
-		" database, conflict_reason"
+		" database, invalidation_reason"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 91ca397857..4f1a17f6ce 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -2356,21 +2356,21 @@ RestoreSlotFromDisk(const char *name)
 }
 
 /*
- * Maps a conflict reason for a replication slot to
+ * Maps a invalidation reason for a replication slot to
  * ReplicationSlotInvalidationCause.
  */
 ReplicationSlotInvalidationCause
-GetSlotInvalidationCause(const char *conflict_reason)
+GetSlotInvalidationCause(const char *invalidation_reason)
 {
 	ReplicationSlotInvalidationCause cause;
 	ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
 	bool		found PG_USED_FOR_ASSERTS_ONLY = false;
 
-	Assert(conflict_reason);
+	Assert(invalidation_reason);
 
 	for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
 	{
-		if (strcmp(SlotInvalidationCauses[cause], conflict_reason) == 0)
+		if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
 		{
 			found = true;
 			result = cause;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ad79e1fccd..b5a638edea 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 17
+#define PG_GET_REPLICATION_SLOTS_COLS 18
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -263,6 +263,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		bool		nulls[PG_GET_REPLICATION_SLOTS_COLS];
 		WALAvailability walstate;
 		int			i;
+		ReplicationSlotInvalidationCause cause;
 
 		if (!slot->in_use)
 			continue;
@@ -409,22 +410,32 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
-		if (slot_contents.data.database == InvalidOid)
+		cause = slot_contents.data.invalidated;
+
+		if (SlotIsPhysical(&slot_contents))
 			nulls[i++] = true;
 		else
 		{
-			ReplicationSlotInvalidationCause cause = slot_contents.data.invalidated;
-
-			if (cause == RS_INVAL_NONE)
-				nulls[i++] = true;
+			/*
+			 * rows_removed and wal_level_insufficient are only two reasons
+			 * for the logical slot's conflict with recovery.
+			 */
+			if (cause == RS_INVAL_HORIZON ||
+				cause == RS_INVAL_WAL_LEVEL)
+				values[i++] = BoolGetDatum(true);
 			else
-				values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
+				values[i++] = BoolGetDatum(false);
 		}
 
 		values[i++] = BoolGetDatum(slot_contents.data.failover);
 
 		values[i++] = BoolGetDatum(slot_contents.data.synced);
 
+		if (cause == RS_INVAL_NONE)
+			nulls[i++] = true;
+		else
+			values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index b5b8d11602..34a157f792 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -676,13 +676,13 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	 * removed.
 	 */
 	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
-							"%s as caught_up, conflict_reason IS NOT NULL as invalid "
+							"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
 							"FROM pg_catalog.pg_replication_slots "
 							"WHERE slot_type = 'logical' AND "
 							"database = current_database() AND "
 							"temporary IS FALSE;",
 							live_check ? "FALSE" :
-							"(CASE WHEN conflict_reason IS NOT NULL THEN FALSE "
+							"(CASE WHEN conflicting THEN FALSE "
 							"ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
 							"END)");
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4af5c2e847..e5dc1cbdb3 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11120,9 +11120,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 425effad21..7f25a083ee 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -273,7 +273,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown);
 extern void CheckSlotRequirements(void);
 extern void CheckSlotPermissions(void);
 extern ReplicationSlotInvalidationCause
-			GetSlotInvalidationCause(const char *conflict_reason);
+			GetSlotInvalidationCause(const char *invalidation_reason);
 
 extern bool SlotExistsInStandbySlotNames(const char *slot_name);
 extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 88b03048c4..addff6a1a5 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -168,7 +168,7 @@ sub change_hot_standby_feedback_and_wait_for_xmins
 	}
 }
 
-# Check conflict_reason in pg_replication_slots.
+# Check reason for conflict in pg_replication_slots.
 sub check_slots_conflict_reason
 {
 	my ($slot_prefix, $reason) = @_;
@@ -178,15 +178,15 @@ sub check_slots_conflict_reason
 
 	$res = $node_standby->safe_psql(
 		'postgres', qq(
-			 select conflict_reason from pg_replication_slots where slot_name = '$active_slot';));
+			 select invalidation_reason from pg_replication_slots where slot_name = '$active_slot' and conflicting;));
 
-	is($res, "$reason", "$active_slot conflict_reason is $reason");
+	is($res, "$reason", "$active_slot reason for conflict is $reason");
 
 	$res = $node_standby->safe_psql(
 		'postgres', qq(
-			 select conflict_reason from pg_replication_slots where slot_name = '$inactive_slot';));
+			 select invalidation_reason from pg_replication_slots where slot_name = '$inactive_slot' and conflicting;));
 
-	is($res, "$reason", "$inactive_slot conflict_reason is $reason");
+	is($res, "$reason", "$inactive_slot reason for conflict is $reason");
 }
 
 # Drop the slots, re-create them, change hot_standby_feedback,
@@ -293,13 +293,13 @@ $node_primary->safe_psql('testdb',
 	qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]
 );
 
-# Check conflict_reason is NULL for physical slot
+# Check conflicting is NULL for physical slot
 $res = $node_primary->safe_psql(
 	'postgres', qq[
-		 SELECT conflict_reason is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
+		 SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]
 );
 
-is($res, 't', "Physical slot reports conflict_reason as NULL");
+is($res, 't', "Physical slot reports conflicting as NULL");
 
 my $backup_name = 'b1';
 $node_primary->backup($backup_name);
@@ -524,7 +524,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify reason for conflict is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 # Ensure that replication slot stats are not removed after invalidation.
@@ -551,7 +551,7 @@ change_hot_standby_feedback_and_wait_for_xmins(1, 1);
 ##################################################
 $node_standby->restart;
 
-# Verify conflict_reason is retained across a restart.
+# Verify reason for conflict is retained across a restart.
 check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 ##################################################
@@ -560,7 +560,8 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
 
 # Get the restart_lsn from an invalidated slot
 my $restart_lsn = $node_standby->safe_psql('postgres',
-	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'vacuum_full_activeslot' and conflict_reason is not null;"
+	"SELECT restart_lsn FROM pg_replication_slots
+		WHERE slot_name = 'vacuum_full_activeslot' AND conflicting;"
 );
 
 chomp($restart_lsn);
@@ -611,7 +612,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify reason for conflict is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('row_removal_', 'rows_removed');
 
 $handle =
@@ -647,7 +648,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 check_for_invalidation('shared_row_removal_', $logstart,
 	'with vacuum on pg_authid');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify reason for conflict is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('shared_row_removal_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
@@ -700,8 +701,8 @@ ok( $node_standby->poll_query_until(
 is( $node_standby->safe_psql(
 		'postgres',
 		q[select bool_or(conflicting) from
-		  (select conflict_reason is not NULL as conflicting
-		   from pg_replication_slots WHERE slot_type = 'logical')]),
+		  (select conflicting from pg_replication_slots
+		  	where slot_type = 'logical')]),
 	'f',
 	'Logical slots are reported as non conflicting');
 
@@ -739,7 +740,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
 
-# Verify conflict_reason is 'rows_removed' in pg_replication_slots
+# Verify reason for conflict is 'rows_removed' in pg_replication_slots
 check_slots_conflict_reason('pruning_', 'rows_removed');
 
 $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
@@ -783,7 +784,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 # Check invalidation in the logfile and in pg_stat_database_conflicts
 check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
 
-# Verify conflict_reason is 'wal_level_insufficient' in pg_replication_slots
+# Verify reason for conflict is 'wal_level_insufficient' in pg_replication_slots
 check_slots_conflict_reason('wal_level_', 'wal_level_insufficient');
 
 $handle =
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 0ea1f3d323..f47bfd78eb 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -228,7 +228,7 @@ $standby1->safe_psql('postgres', "CHECKPOINT");
 # Check if the synced slot is invalidated
 is( $standby1->safe_psql(
 		'postgres',
-		q{SELECT conflict_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+		q{SELECT invalidation_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
 	),
 	"t",
 	'synchronized slot has been invalidated');
@@ -274,7 +274,7 @@ $standby1->wait_for_log(qr/dropped replication slot "lsub1_slot" of dbid [0-9]+/
 # flagged as 'synced'
 is( $standby1->safe_psql(
 		'postgres',
-		q{SELECT conflict_reason IS NULL AND synced AND NOT temporary FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
+		q{SELECT invalidation_reason IS NULL AND synced AND NOT temporary FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
 	),
 	"t",
 	'logical slot is re-synced');
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 0cd2c64fca..055bec068d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1473,10 +1473,11 @@ pg_replication_slots| SELECT l.slot_name,
     l.wal_status,
     l.safe_wal_size,
     l.two_phase,
-    l.conflict_reason,
+    l.conflicting,
     l.failover,
-    l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced)
+    l.synced,
+    l.invalidation_reason
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

From 4d4248000adfd9096c652bdf0a654ac2203d57a0 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 14 Mar 2024 12:51:48 +0000
Subject: [PATCH v9 2/4] Add XID age based replication slot invalidation

Currently postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set an XID age (age of
slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which
the slots get invalidated.

To achieve the above, postgres uses replication slot xmin (the
oldest transaction that this slot needs the database to retain) or
catalog_xmin (the oldest transaction affecting the system catalogs
that this slot needs the database to retain), and a new GUC
max_slot_xid_age. The checkpointer then looks at all replication
slots invalidating the slots based on the age set.
---
 doc/src/sgml/config.sgml                      |  21 ++++
 src/backend/access/transam/xlog.c             |  10 ++
 src/backend/replication/slot.c                |  44 ++++++-
 src/backend/utils/misc/guc_tables.c           |  10 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/replication/slot.h                |   3 +
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/050_invalidate_slots.pl   | 108 ++++++++++++++++++
 8 files changed, 197 insertions(+), 1 deletion(-)
 create mode 100644 src/test/recovery/t/050_invalidate_slots.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 65a6e6c408..6dd54ffcb7 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4544,6 +4544,27 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-slot-xid-age" xreflabel="max_slot_xid_age">
+      <term><varname>max_slot_xid_age</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_slot_xid_age</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots whose <literal>xmin</literal> (the oldest
+        transaction that this slot needs the database to retain) or
+        <literal>catalog_xmin</literal> (the oldest transaction affecting the
+        system catalogs that this slot needs the database to retain) has reached
+        the age specified by this setting. A value of zero (which is default)
+        disables this feature. Users can set this value anywhere from zero to
+        two billion. This parameter can only be set in the
+        <filename>postgresql.conf</filename> file or on the server command
+        line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 20a5f86209..36ae2ac6a4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7147,6 +7147,11 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
+	/* Invalidate replication slots based on xmin or catalog_xmin age */
+	if (max_slot_xid_age > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
@@ -7597,6 +7602,11 @@ CreateRestartPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 
+	/* Invalidate replication slots based on xmin or catalog_xmin age */
+	if (max_slot_xid_age > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 4f1a17f6ce..2a1885da24 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_WAL_REMOVED] = "wal_removed",
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
+	[RS_INVAL_XID_AGE] = "xid_aged",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -140,6 +141,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+int			max_slot_xid_age = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -1483,6 +1485,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_WAL_LEVEL:
 			appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
 			break;
+		case RS_INVAL_XID_AGE:
+			appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1599,6 +1604,42 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					if (SlotIsLogical(s))
 						conflict = cause;
 					break;
+				case RS_INVAL_XID_AGE:
+					{
+						TransactionId xid_cur = ReadNextTransactionId();
+						TransactionId xid_limit;
+						TransactionId xid_slot;
+
+						if (TransactionIdIsNormal(s->data.xmin))
+						{
+							xid_slot = s->data.xmin;
+
+							xid_limit = xid_slot + max_slot_xid_age;
+							if (xid_limit < FirstNormalTransactionId)
+								xid_limit += FirstNormalTransactionId;
+
+							if (TransactionIdFollowsOrEquals(xid_cur, xid_limit))
+							{
+								conflict = cause;
+								break;
+							}
+						}
+						if (TransactionIdIsNormal(s->data.catalog_xmin))
+						{
+							xid_slot = s->data.catalog_xmin;
+
+							xid_limit = xid_slot + max_slot_xid_age;
+							if (xid_limit < FirstNormalTransactionId)
+								xid_limit += FirstNormalTransactionId;
+
+							if (TransactionIdFollowsOrEquals(xid_cur, xid_limit))
+							{
+								conflict = cause;
+								break;
+							}
+						}
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1752,6 +1793,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
+ * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 57d9de4dd9..6b5375909d 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2954,6 +2954,16 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."),
+			gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.")
+		},
+		&max_slot_xid_age,
+		0, 0, 2000000000,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 2244ee52f7..b4c928b826 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -334,6 +334,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 #track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
+#max_slot_xid_age = 0
 
 # - Primary Server -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7f25a083ee..614ba0e30b 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_HORIZON,
 	/* wal_level insufficient for slot */
 	RS_INVAL_WAL_LEVEL,
+	/* slot's xmin or catalog_xmin has reached the age */
+	RS_INVAL_XID_AGE,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -227,6 +229,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *standby_slot_names;
+extern PGDLLIMPORT int max_slot_xid_age;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index c67249500e..d698c3ec73 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -50,6 +50,7 @@ tests += {
       't/039_end_of_wal.pl',
       't/040_standby_failover_slots_sync.pl',
       't/041_checkpoint_at_promote.pl',
+      't/050_invalidate_slots.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
new file mode 100644
index 0000000000..2f482b56e8
--- /dev/null
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -0,0 +1,108 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test for replication slots invalidation
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::Cluster;
+use Test::More;
+use Time::HiRes qw(usleep);
+
+# Initialize primary node, setting wal-segsize to 1MB
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$primary->append_conf(
+	'postgresql.conf', q{
+checkpoint_timeout = 1h
+});
+$primary->start;
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot('sb1_slot');
+]);
+
+# Take backup
+my $backup_name = 'my_backup';
+$primary->backup($backup_name);
+
+# Create a standby linking to the primary using the replication slot
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, $backup_name, has_streaming => 1);
+
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$standby1->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb1_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+$standby1->start;
+
+# Create some content on primary to move xmin
+$primary->safe_psql('postgres',
+	"CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a");
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby1);
+
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NOT NULL
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = 'sb1_slot';
+]) or die "Timed out waiting for slot xmin to advance";
+
+$primary->safe_psql(
+	'postgres', qq[
+	ALTER SYSTEM SET max_slot_xid_age = 500;
+]);
+$primary->reload;
+
+# Stop standby to make the replication slot's xmin on primary to age
+$standby1->stop;
+
+my $logstart = -s $primary->logfile;
+
+# Do some work to advance xmin
+$primary->safe_psql(
+	'postgres', q{
+do $$
+begin
+  for i in 10000..11000 loop
+    -- use an exception block so that each iteration eats an XID
+    begin
+      insert into tab_int values (i);
+    exception
+      when division_by_zero then null;
+    end;
+  end loop;
+end$$;
+});
+
+my $invalidated = 0;
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$primary->safe_psql('postgres', "CHECKPOINT");
+	if ($primary->log_contains(
+			'invalidating obsolete replication slot "sb1_slot"', $logstart))
+	{
+		$invalidated = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($invalidated, 'check that slot sb1_slot invalidation has been logged');
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb1_slot' AND
+		invalidation_reason = 'xid_aged';
+])
+  or die
+  "Timed out while waiting for replication slot sb1_slot to be invalidated";
+
+done_testing();
-- 
2.34.1

From 8349200b0bad1c8cda3e3f96034e7b63e3054d97 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 14 Mar 2024 14:04:00 +0000
Subject: [PATCH v9 3/4] Track inactive replication slot information

Currently postgres doesn't track metrics like the time at which
the slot became inactive, and the total number of times the slot
became inactive in its lifetime. This commit adds two new metrics
last_inactive_at of type timestamptz and inactive_count of type numeric
to ReplicationSlotPersistentData. Whenever a slot becomes
inactive, the current timestamp and inactive count are persisted
to disk.

These metrics are useful in the following ways:
- To improve replication slot monitoring tools. For instance, one
can build a monitoring tool that signals a) when replication slots
is lying inactive for a day or so using last_inactive_at metric,
b) when a replication slot is becoming inactive too frequently
using last_inactive_at metric.

- To implement timeout-based inactive replication slot management
capability in postgres.

Increases SLOT_VERSION due to the added two new metrics.
---
 doc/src/sgml/system-views.sgml       | 20 +++++++++++++
 src/backend/catalog/system_views.sql |  4 ++-
 src/backend/replication/slot.c       | 43 ++++++++++++++++++++++------
 src/backend/replication/slotfuncs.c  | 15 +++++++++-
 src/include/catalog/pg_proc.dat      |  6 ++--
 src/include/replication/slot.h       |  6 ++++
 src/test/regress/expected/rules.out  |  6 ++--
 7 files changed, 84 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index f3fb5ba1b0..59cd1b5211 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2750,6 +2750,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
        ID of role
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_inactive_at</structfield> <type>timestamptz</type>
+      </para>
+      <para>
+        The time at which the slot became inactive.
+        <literal>NULL</literal> if the slot is currently actively being
+        used.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inactive_count</structfield> <type>numeric</type>
+      </para>
+      <para>
+        The total number of times the slot became inactive in its lifetime.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index cd22dad959..de9f1d5506 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1026,7 +1026,9 @@ CREATE VIEW pg_replication_slots AS
             L.conflicting,
             L.failover,
             L.synced,
-            L.invalidation_reason
+            L.invalidation_reason,
+            L.last_inactive_at,
+            L.inactive_count
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2a1885da24..e606218673 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -130,7 +130,7 @@ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
-#define SLOT_VERSION	5		/* version for new files */
+#define SLOT_VERSION	6		/* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -400,6 +400,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.two_phase_at = InvalidXLogRecPtr;
 	slot->data.failover = failover;
 	slot->data.synced = synced;
+	slot->data.last_inactive_at = 0;
+	slot->data.inactive_count = 0;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
@@ -626,6 +628,17 @@ retry:
 
 	if (am_walsender)
 	{
+		if (s->data.persistency == RS_PERSISTENT)
+		{
+			SpinLockAcquire(&s->mutex);
+			s->data.last_inactive_at = 0;
+			SpinLockRelease(&s->mutex);
+
+			/* Write this slot to disk */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+		}
+
 		ereport(log_replication_commands ? LOG : DEBUG1,
 				SlotIsLogical(s)
 				? errmsg("acquired logical replication slot \"%s\"",
@@ -693,16 +706,20 @@ ReplicationSlotRelease(void)
 		ConditionVariableBroadcast(&slot->active_cv);
 	}
 
-	MyReplicationSlot = NULL;
-
-	/* might not have been set when we've been a plain slot */
-	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
-	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
-	LWLockRelease(ProcArrayLock);
-
 	if (am_walsender)
 	{
+		if (slot->data.persistency == RS_PERSISTENT)
+		{
+			SpinLockAcquire(&slot->mutex);
+			slot->data.last_inactive_at = GetCurrentTimestamp();
+			slot->data.inactive_count++;
+			SpinLockRelease(&slot->mutex);
+
+			/* Write this slot to disk */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+		}
+
 		ereport(log_replication_commands ? LOG : DEBUG1,
 				is_logical
 				? errmsg("released logical replication slot \"%s\"",
@@ -712,6 +729,14 @@ ReplicationSlotRelease(void)
 
 		pfree(slotname);
 	}
+
+	MyReplicationSlot = NULL;
+
+	/* might not have been set when we've been a plain slot */
+	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+	MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
+	ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
+	LWLockRelease(ProcArrayLock);
 }
 
 /*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index b5a638edea..4c7a120df1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 18
+#define PG_GET_REPLICATION_SLOTS_COLS 20
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -264,6 +264,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		WALAvailability walstate;
 		int			i;
 		ReplicationSlotInvalidationCause cause;
+		char		buf[256];
 
 		if (!slot->in_use)
 			continue;
@@ -436,6 +437,18 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]);
 
+		if (slot_contents.data.last_inactive_at > 0)
+			values[i++] = TimestampTzGetDatum(slot_contents.data.last_inactive_at);
+		else
+			nulls[i++] = true;
+
+		/* Convert to numeric. */
+		snprintf(buf, sizeof buf, UINT64_FORMAT, slot_contents.data.inactive_count);
+		values[i++] = DirectFunctionCall3(numeric_in,
+										  CStringGetDatum(buf),
+										  ObjectIdGetDatum(0),
+										  Int32GetDatum(-1));
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index e5dc1cbdb3..b26b53b714 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11120,9 +11120,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text,timestamptz,numeric}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason,last_inactive_at,inactive_count}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 614ba0e30b..780767a819 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -129,6 +129,12 @@ typedef struct ReplicationSlotPersistentData
 	 * for logical slots on the primary server.
 	 */
 	bool		failover;
+
+	/* When did this slot become inactive last time? */
+	TimestampTz last_inactive_at;
+
+	/* How many times the slot has been inactive? */
+	uint64		inactive_count;
 } ReplicationSlotPersistentData;
 
 /*
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 055bec068d..c0bdfe76d8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1476,8 +1476,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.conflicting,
     l.failover,
     l.synced,
-    l.invalidation_reason
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason)
+    l.invalidation_reason,
+    l.last_inactive_at,
+    l.inactive_count
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason, last_inactive_at, inactive_count)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

From ff5007e261039bf951a93babdf407faeeed2bdeb Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 14 Mar 2024 14:06:41 +0000
Subject: [PATCH v9 4/4] Add inactive_timeout based replication slot
 invalidation

Currently postgres has the ability to invalidate inactive
replication slots based on the amount of WAL (set via
max_slot_wal_keep_size GUC) that will be needed for the slots in
case they become active. However, choosing a default value for
max_slot_wal_keep_size is tricky. Because the amount of WAL a
customer generates, and their allocated storage will vary greatly
in production, making it difficult to pin down a one-size-fits-all
value. It is often easy for developers to set a timeout of say 1
or 2 or 3 days, after which the inactive slots get dropped.

To achieve the above, postgres uses replication slot metric
inactive_at (the time at which the slot became inactive), and a
new GUC inactive_replication_slot_timeout. The checkpointer then
looks at all replication slots invalidating the inactive slots
based on the timeout set.
---
 doc/src/sgml/config.sgml                      | 18 +++++
 src/backend/access/transam/xlog.c             | 10 +++
 src/backend/replication/slot.c                | 22 +++++-
 src/backend/utils/misc/guc_tables.c           | 12 +++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/replication/slot.h                |  3 +
 src/test/recovery/t/050_invalidate_slots.pl   | 79 +++++++++++++++++++
 7 files changed, 144 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 6dd54ffcb7..4b0b60a1ac 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4565,6 +4565,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-inactive-replication-slot-timeout" xreflabel="inactive_replication_slot_timeout">
+      <term><varname>inactive_replication_slot_timeout</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>inactive_replication_slot_timeout</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots that are inactive for longer than this
+        amount of time at the next checkpoint. If this value is specified
+        without units, it is taken as seconds. A value of zero (which is
+        default) disables the timeout mechanism. This parameter can only be
+        set in the <filename>postgresql.conf</filename> file or on the server
+        command line.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
       <term><varname>track_commit_timestamp</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 36ae2ac6a4..166c3ed794 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7152,6 +7152,11 @@ CreateCheckPoint(int flags)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
 										   InvalidOid, InvalidTransactionId);
 
+	/* Invalidate inactive replication slots based on timeout */
+	if (inactive_replication_slot_timeout > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Delete old log files, those no longer needed for last checkpoint to
 	 * prevent the disk holding the xlog from growing full.
@@ -7607,6 +7612,11 @@ CreateRestartPoint(int flags)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0,
 										   InvalidOid, InvalidTransactionId);
 
+	/* Invalidate inactive replication slots based on timeout */
+	if (inactive_replication_slot_timeout > 0)
+		InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0,
+										   InvalidOid, InvalidTransactionId);
+
 	/*
 	 * Retreat _logSegNo using the current end of xlog replayed or received,
 	 * whichever is later.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e606218673..37498d3d98 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -108,10 +108,11 @@ const char *const SlotInvalidationCauses[] = {
 	[RS_INVAL_HORIZON] = "rows_removed",
 	[RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
 	[RS_INVAL_XID_AGE] = "xid_aged",
+	[RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout",
 };
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE
+#define	RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT
 
 StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 				 "array length mismatch");
@@ -142,6 +143,7 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 int			max_slot_xid_age = 0;
+int			inactive_replication_slot_timeout = 0;
 
 /*
  * This GUC lists streaming replication standby server slot names that
@@ -1513,6 +1515,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 		case RS_INVAL_XID_AGE:
 			appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age."));
 			break;
+		case RS_INVAL_INACTIVE_TIMEOUT:
+			appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by inactive_replication_slot_timeout."));
+			break;
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1665,6 +1670,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 						}
 					}
 					break;
+				case RS_INVAL_INACTIVE_TIMEOUT:
+					if (s->data.last_inactive_at > 0)
+					{
+						TimestampTz now;
+
+						Assert(s->data.persistency == RS_PERSISTENT);
+						Assert(s->active_pid == 0);
+
+						now = GetCurrentTimestamp();
+						if (TimestampDifferenceExceeds(s->data.last_inactive_at, now,
+													   inactive_replication_slot_timeout * 1000))
+							conflict = cause;
+					}
+					break;
 				case RS_INVAL_NONE:
 					pg_unreachable();
 			}
@@ -1819,6 +1838,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
  *   db; dboid may be InvalidOid for shared relations
  * - RS_INVAL_WAL_LEVEL: is logical
  * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age
+ * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs
  *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 6b5375909d..6caf40d51e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2964,6 +2964,18 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"inactive_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the amount of time to wait before invalidating an "
+						 "inactive replication slot."),
+			NULL,
+			GUC_UNIT_S
+		},
+		&inactive_replication_slot_timeout,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"commit_delay", PGC_SUSET, WAL_SETTINGS,
 			gettext_noop("Sets the delay in microseconds between transaction commit and "
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b4c928b826..7f2a3e41f1 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -261,6 +261,7 @@
 #recovery_prefetch = try	# prefetch pages referenced in the WAL?
 #wal_decode_buffer_size = 512kB	# lookahead window used for prefetching
 				# (change requires restart)
+#inactive_replication_slot_timeout = 0	# in seconds; 0 disables
 
 # - Archiving -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 780767a819..8378d7c913 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -55,6 +55,8 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_WAL_LEVEL,
 	/* slot's xmin or catalog_xmin has reached the age */
 	RS_INVAL_XID_AGE,
+	/* inactive slot timeout has occurred */
+	RS_INVAL_INACTIVE_TIMEOUT,
 } ReplicationSlotInvalidationCause;
 
 extern PGDLLIMPORT const char *const SlotInvalidationCauses[];
@@ -236,6 +238,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *standby_slot_names;
 extern PGDLLIMPORT int max_slot_xid_age;
+extern PGDLLIMPORT int inactive_replication_slot_timeout;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl
index 2f482b56e8..4c66dd4a4e 100644
--- a/src/test/recovery/t/050_invalidate_slots.pl
+++ b/src/test/recovery/t/050_invalidate_slots.pl
@@ -105,4 +105,83 @@ $primary->poll_query_until(
   or die
   "Timed out while waiting for replication slot sb1_slot to be invalidated";
 
+$primary->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot('sb2_slot');
+]);
+
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET max_slot_xid_age = 0;
+]);
+$primary->reload;
+
+# Create a standby linking to the primary using the replication slot
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup($primary, $backup_name, has_streaming => 1);
+$standby2->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb2_slot'
+});
+$standby2->start;
+
+# Wait until standby has replayed enough data
+$primary->wait_for_catchup($standby2);
+
+# The inactive replication slot info should be null when the slot is active
+my $result = $primary->safe_psql(
+	'postgres', qq[
+	SELECT last_inactive_at IS NULL, inactive_count = 0 AS OK
+		FROM pg_replication_slots WHERE slot_name = 'sb2_slot';
+]);
+is($result, "t|t",
+	'check the inactive replication slot info for an active slot');
+
+# Set timeout so that the next checkpoint will invalidate the inactive
+# replication slot.
+$primary->safe_psql(
+	'postgres', qq[
+    ALTER SYSTEM SET inactive_replication_slot_timeout TO '1s';
+]);
+$primary->reload;
+
+$logstart = -s $primary->logfile;
+
+# Stop standby to make the replication slot on primary inactive
+$standby2->stop;
+
+# Wait for the inactive replication slot info to be updated
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE last_inactive_at IS NOT NULL AND
+		inactive_count = 1 AND slot_name = 'sb2_slot';
+])
+  or die
+  "Timed out while waiting for inactive replication slot info to be updated";
+
+$invalidated = 0;
+for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
+{
+	$primary->safe_psql('postgres', "CHECKPOINT");
+	if ($primary->log_contains(
+			'invalidating obsolete replication slot "sb2_slot"', $logstart))
+	{
+		$invalidated = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($invalidated, 'check that slot sb2_slot invalidation has been logged');
+
+# Wait for the inactive replication slots to be invalidated.
+$primary->poll_query_until(
+	'postgres', qq[
+	SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+		WHERE slot_name = 'sb2_slot' AND
+		invalidation_reason = 'inactive_timeout';
+])
+  or die
+  "Timed out while waiting for inactive replication slot sb2_slot to be invalidated";
+
 done_testing();
-- 
2.34.1

Reply via email to