From a1062c4c527693c6980dc2b63c5091eed19438e7 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sun, 24 Mar 2024 08:08:24 +0000
Subject: [PATCH v18 4/5] Allow setting inactive_timeout in the replication
 commands.

This commit allows replication connections to be able to set
inactive_timeout property of the slot using replication commands
CREATE_REPLICATION_SLOT and ALTER_REPLICATION_SLOT.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Ajin Cherian
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 doc/src/sgml/protocol.sgml            | 20 +++++++++++
 src/backend/replication/slot.c        | 31 +++++++++++++++--
 src/backend/replication/walsender.c   | 38 ++++++++++++++++----
 src/include/replication/slot.h        |  3 +-
 src/test/recovery/t/001_stream_rep.pl | 50 +++++++++++++++++++++++++++
 5 files changed, 132 insertions(+), 10 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index a5cb19357f..2ffa1b470a 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2068,6 +2068,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>INACTIVE_TIMEOUT [ <replaceable class="parameter">integer</replaceable> ]</literal></term>
+        <listitem>
+         <para>
+          If set to a non-zero value, specifies the amount of time in seconds
+          the slot is allowed to be inactive. The default is zero.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
 
       <para>
@@ -2168,6 +2178,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>INACTIVE_TIMEOUT [ <replaceable class="parameter">integer</replaceable> ]</literal></term>
+        <listitem>
+         <para>
+          If set to a non-zero value, specifies the amount of time in seconds
+          the slot is allowed to be inactive. The default is zero.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
 
      </listitem>
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 3287aa2860..baf0b9aa72 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -812,8 +812,10 @@ ReplicationSlotDrop(const char *name, bool nowait)
  * Change the definition of the slot identified by the specified name.
  */
 void
-ReplicationSlotAlter(const char *name, bool failover)
+ReplicationSlotAlter(const char *name, bool failover, int inactive_timeout)
 {
+	bool		lock_acquired;
+
 	Assert(MyReplicationSlot == NULL);
 
 	ReplicationSlotAcquire(name, false);
@@ -856,10 +858,35 @@ ReplicationSlotAlter(const char *name, bool failover)
 				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				errmsg("cannot enable failover for a temporary replication slot"));
 
-	if (MyReplicationSlot->data.failover != failover)
+	/*
+	 * Do not allow users to set inactive_timeout for temporary slots because
+	 * temporary, slots will not be saved to the disk.
+	 */
+	if (inactive_timeout > 0 && MyReplicationSlot->data.persistency == RS_TEMPORARY)
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("cannot set inactive_timeout for a temporary replication slot"));
+
+	/*
+	 * If we are to change any of the slot property, acquire the lock once and
+	 * for all.
+	 */
+	lock_acquired = false;
+	if (MyReplicationSlot->data.failover != failover ||
+		MyReplicationSlot->data.inactive_timeout != inactive_timeout)
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
+		lock_acquired = true;
+	}
+
+	if (MyReplicationSlot->data.failover != failover)
 		MyReplicationSlot->data.failover = failover;
+
+	if (MyReplicationSlot->data.inactive_timeout != inactive_timeout)
+		MyReplicationSlot->data.inactive_timeout = inactive_timeout;
+
+	if (lock_acquired)
+	{
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
 		ReplicationSlotMarkDirty();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5315c08650..0420274247 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1123,13 +1123,15 @@ static void
 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 						   bool *reserve_wal,
 						   CRSSnapshotAction *snapshot_action,
-						   bool *two_phase, bool *failover)
+						   bool *two_phase, bool *failover,
+						   int *inactive_timeout)
 {
 	ListCell   *lc;
 	bool		snapshot_action_given = false;
 	bool		reserve_wal_given = false;
 	bool		two_phase_given = false;
 	bool		failover_given = false;
+	bool		inactive_timeout_given = false;
 
 	/* Parse options */
 	foreach(lc, cmd->options)
@@ -1188,6 +1190,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 			failover_given = true;
 			*failover = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "inactive_timeout") == 0)
+		{
+			if (inactive_timeout_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			inactive_timeout_given = true;
+			*inactive_timeout = defGetInt32(defel);
+		}
 		else
 			elog(ERROR, "unrecognized option: %s", defel->defname);
 	}
@@ -1205,6 +1216,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	bool		reserve_wal = false;
 	bool		two_phase = false;
 	bool		failover = false;
+	int			inactive_timeout = 0;
 	CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
 	DestReceiver *dest;
 	TupOutputState *tstate;
@@ -1215,13 +1227,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	Assert(!MyReplicationSlot);
 
 	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
-							   &failover);
+							   &failover, &inactive_timeout);
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-							  false, false, false, 0);
+							  false, false, false, inactive_timeout);
 
 		if (reserve_wal)
 		{
@@ -1252,7 +1264,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  two_phase, failover, false, 0);
+							  two_phase, failover, false, inactive_timeout);
 
 		/*
 		 * Do options check early so that we can bail before calling the
@@ -1411,9 +1423,11 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
  * Process extra options given to ALTER_REPLICATION_SLOT.
  */
 static void
-ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
+ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover,
+						  int *inactive_timeout)
 {
 	bool		failover_given = false;
+	bool		inactive_timeout_given = false;
 
 	/* Parse options */
 	foreach_ptr(DefElem, defel, cmd->options)
@@ -1427,6 +1441,15 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
 			failover_given = true;
 			*failover = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "inactive_timeout") == 0)
+		{
+			if (inactive_timeout_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			inactive_timeout_given = true;
+			*inactive_timeout = defGetInt32(defel);
+		}
 		else
 			elog(ERROR, "unrecognized option: %s", defel->defname);
 	}
@@ -1439,9 +1462,10 @@ static void
 AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
 {
 	bool		failover = false;
+	int			inactive_timeout = 0;
 
-	ParseAlterReplSlotOptions(cmd, &failover);
-	ReplicationSlotAlter(cmd->slotname, failover);
+	ParseAlterReplSlotOptions(cmd, &failover, &inactive_timeout);
+	ReplicationSlotAlter(cmd->slotname, failover, inactive_timeout);
 }
 
 /*
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 915edf7617..ee9b385cf9 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -246,7 +246,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
-extern void ReplicationSlotAlter(const char *name, bool failover);
+extern void ReplicationSlotAlter(const char *name, bool failover,
+								 int inactive_timeout);
 
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 5311ade509..db00b6aa24 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -604,4 +604,54 @@ ok( pump_until(
 	'base backup cleanly canceled');
 $sigchld_bb->finish();
 
+# Drop any existing slots on the primary, for the follow-up tests.
+$node_primary->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;");
+
+# Test setting inactive_timeout option via replication commands.
+$node_primary->append_conf(
+	'postgresql.conf', qq(
+wal_level = logical
+));
+$node_primary->restart;
+
+$node_primary->psql(
+	'postgres',
+	"CREATE_REPLICATION_SLOT it_phy_slot1 PHYSICAL (RESERVE_WAL, INACTIVE_TIMEOUT 100);",
+	extra_params => [ '-d', $connstr_db ]);
+
+$node_primary->psql(
+	'postgres',
+	"CREATE_REPLICATION_SLOT it_phy_slot2 PHYSICAL (RESERVE_WAL);",
+	extra_params => [ '-d', $connstr_db ]);
+
+$node_primary->psql(
+	'postgres',
+	"ALTER_REPLICATION_SLOT it_phy_slot2 (INACTIVE_TIMEOUT 200);",
+	extra_params => [ '-d', $connstr_db ]);
+
+$node_primary->psql(
+	'postgres',
+	"CREATE_REPLICATION_SLOT it_log_slot1 LOGICAL pgoutput (TWO_PHASE, INACTIVE_TIMEOUT 300);",
+	extra_params => [ '-d', $connstr_db ]);
+
+$node_primary->psql(
+	'postgres',
+	"CREATE_REPLICATION_SLOT it_log_slot2 LOGICAL pgoutput;",
+	extra_params => [ '-d', $connstr_db ]);
+
+$node_primary->psql(
+	'postgres',
+	"ALTER_REPLICATION_SLOT it_log_slot2 (INACTIVE_TIMEOUT 400);",
+	extra_params => [ '-d', $connstr_db ]);
+
+my $slot_info_expected = 'it_log_slot1|logical|300
+it_log_slot2|logical|400
+it_phy_slot1|physical|100
+it_phy_slot2|physical|0';
+
+my $slot_info = $node_primary->safe_psql('postgres',
+	qq[SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;]);
+is($slot_info, $slot_info_expected, "replication slots with inactive_timeout on primary exist");
+
 done_testing();
-- 
2.34.1

