From 0d80cfad9658c7303d036833636b22293fd25109 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Sun, 24 Mar 2024 07:14:51 +0000
Subject: [PATCH v18 2/5] Allow setting inactive_timeout for replication slots
 via SQL API.

This commit adds a new replication slot property called
inactive_timeout specifying the amount of time in seconds the slot
is allowed to be inactive. It is added to slot's persistent data
structure to survive during server restarts. It will be synced to
failover slots on the standby, and also will be carried over to
the new cluster as part of pg_upgrade.

This commit particularly lets one specify the inactive_timeout for
a slot via SQL functions pg_create_physical_replication_slot and
pg_create_logical_replication_slot.

The new property will be useful to implement inactive timeout based
replication slot invalidation in a future commit.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila
Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
---
 contrib/test_decoding/expected/slot.out       | 102 ++++++++++++++++++
 contrib/test_decoding/sql/slot.sql            |  34 ++++++
 doc/src/sgml/func.sgml                        |  18 ++--
 doc/src/sgml/system-views.sgml                |   9 ++
 src/backend/catalog/system_functions.sql      |   2 +
 src/backend/catalog/system_views.sql          |   1 +
 src/backend/replication/logical/slotsync.c    |  17 ++-
 src/backend/replication/slot.c                |  20 +++-
 src/backend/replication/slotfuncs.c           |  31 +++++-
 src/backend/replication/walsender.c           |   4 +-
 src/bin/pg_upgrade/info.c                     |   6 +-
 src/bin/pg_upgrade/pg_upgrade.c               |   5 +-
 src/bin/pg_upgrade/pg_upgrade.h               |   2 +
 src/bin/pg_upgrade/t/003_logical_slots.pl     |  11 +-
 src/include/catalog/pg_proc.dat               |  22 ++--
 src/include/replication/slot.h                |   5 +-
 .../t/040_standby_failover_slots_sync.pl      |  13 ++-
 src/test/regress/expected/rules.out           |   3 +-
 18 files changed, 264 insertions(+), 41 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 349ab2d380..6771520afb 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -466,3 +466,105 @@ SELECT pg_drop_replication_slot('physical_slot');
  
 (1 row)
 
+-- Test negative value for inactive_timeout option for slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', inactive_timeout := -300);  -- error
+ERROR:  "inactive_timeout" must not be negative
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', inactive_timeout := -600);  -- error
+ERROR:  "inactive_timeout" must not be negative
+-- Test inactive_timeout option for temporary slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', temporary := true, inactive_timeout := 300);  -- error
+ERROR:  cannot set inactive_timeout for a temporary replication slot
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', temporary := true, inactive_timeout := 600);  -- error
+ERROR:  cannot set inactive_timeout for a temporary replication slot
+-- Test inactive_timeout option of physical slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot1', immediately_reserve := true, inactive_timeout := 300);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot2');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Copy physical slot with inactive_timeout option set.
+SELECT 'copy' FROM pg_copy_physical_replication_slot(src_slot_name := 'it_phy_slot1', dst_slot_name := 'it_phy_slot3');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;
+  slot_name   | slot_type | inactive_timeout 
+--------------+-----------+------------------
+ it_phy_slot1 | physical  |              300
+ it_phy_slot2 | physical  |                0
+ it_phy_slot3 | physical  |              300
+(3 rows)
+
+SELECT pg_drop_replication_slot('it_phy_slot1');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('it_phy_slot2');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('it_phy_slot3');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+-- Test inactive_timeout option of logical slots.
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot1', plugin := 'test_decoding', inactive_timeout := 600);
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot2', plugin := 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- Copy logical slot with inactive_timeout option set.
+SELECT 'copy' FROM pg_copy_logical_replication_slot(src_slot_name := 'it_log_slot1', dst_slot_name := 'it_log_slot3');
+ ?column? 
+----------
+ copy
+(1 row)
+
+SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;
+  slot_name   | slot_type | inactive_timeout 
+--------------+-----------+------------------
+ it_log_slot1 | logical   |              600
+ it_log_slot2 | logical   |                0
+ it_log_slot3 | logical   |              600
+(3 rows)
+
+SELECT pg_drop_replication_slot('it_log_slot1');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('it_log_slot2');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+SELECT pg_drop_replication_slot('it_log_slot3');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 580e3ae3be..443e91da07 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -190,3 +190,37 @@ SELECT pg_drop_replication_slot('failover_true_slot');
 SELECT pg_drop_replication_slot('failover_false_slot');
 SELECT pg_drop_replication_slot('failover_default_slot');
 SELECT pg_drop_replication_slot('physical_slot');
+
+-- Test negative value for inactive_timeout option for slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', inactive_timeout := -300);  -- error
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', inactive_timeout := -600);  -- error
+
+-- Test inactive_timeout option for temporary slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', temporary := true, inactive_timeout := 300);  -- error
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', temporary := true, inactive_timeout := 600);  -- error
+
+-- Test inactive_timeout option of physical slots.
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot1', immediately_reserve := true, inactive_timeout := 300);
+SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot2');
+
+-- Copy physical slot with inactive_timeout option set.
+SELECT 'copy' FROM pg_copy_physical_replication_slot(src_slot_name := 'it_phy_slot1', dst_slot_name := 'it_phy_slot3');
+
+SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;
+
+SELECT pg_drop_replication_slot('it_phy_slot1');
+SELECT pg_drop_replication_slot('it_phy_slot2');
+SELECT pg_drop_replication_slot('it_phy_slot3');
+
+-- Test inactive_timeout option of logical slots.
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot1', plugin := 'test_decoding', inactive_timeout := 600);
+SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot2', plugin := 'test_decoding');
+
+-- Copy logical slot with inactive_timeout option set.
+SELECT 'copy' FROM pg_copy_logical_replication_slot(src_slot_name := 'it_log_slot1', dst_slot_name := 'it_log_slot3');
+
+SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1;
+
+SELECT pg_drop_replication_slot('it_log_slot1');
+SELECT pg_drop_replication_slot('it_log_slot2');
+SELECT pg_drop_replication_slot('it_log_slot3');
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 8ecc02f2b9..afaafa35ad 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -28373,7 +28373,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_create_physical_replication_slot</primary>
         </indexterm>
-        <function>pg_create_physical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</parameter> <type>boolean</type>, <parameter>temporary</parameter> <type>boolean</type> </optional> )
+        <function>pg_create_physical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type> <optional>, <parameter>immediately_reserve</parameter> <type>boolean</type>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>inactive_timeout</parameter> <type>integer</type> </optional>)
         <returnvalue>record</returnvalue>
         ( <parameter>slot_name</parameter> <type>name</type>,
         <parameter>lsn</parameter> <type>pg_lsn</type> )
@@ -28390,9 +28390,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         parameter, <parameter>temporary</parameter>, when set to true, specifies that
         the slot should not be permanently stored to disk and is only meant
         for use by the current session. Temporary slots are also
-        released upon any error. This function corresponds
-        to the replication protocol command <literal>CREATE_REPLICATION_SLOT
-        ... PHYSICAL</literal>.
+        released upon any error. The optional fourth
+        parameter, <parameter>inactive_timeout</parameter>, when set to a
+        non-zero value, specifies the amount of time in seconds the slot is
+        allowed to be inactive. This function corresponds to the replication
+        protocol command
+        <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>.
        </para></entry>
       </row>
 
@@ -28417,7 +28420,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_create_logical_replication_slot</primary>
         </indexterm>
-        <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type>, <parameter>failover</parameter> <type>boolean</type> </optional> )
+        <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>twophase</parameter> <type>boolean</type>, <parameter>failover</parameter> <type>boolean</type>, <parameter>inactive_timeout</parameter> <type>integer</type> </optional> )
         <returnvalue>record</returnvalue>
         ( <parameter>slot_name</parameter> <type>name</type>,
         <parameter>lsn</parameter> <type>pg_lsn</type> )
@@ -28436,7 +28439,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <parameter>failover</parameter>, when set to true,
         specifies that this slot is enabled to be synced to the
         standbys so that logical replication can be resumed after
-        failover. A call to this function has the same effect as
+        failover.  The optional sixth parameter,
+        <parameter>inactive_timeout</parameter>, when set to a
+        non-zero value, specifies the amount of time in seconds the slot is
+        allowed to be inactive. A call to this function has the same effect as
         the replication protocol command
         <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
        </para></entry>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 2b36b5fef1..dddbaa070f 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -2534,6 +2534,15 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>inactive_timeout</structfield> <type>integer</type>
+      </para>
+      <para>
+        The amount of time in seconds the slot is allowed to be inactive.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>conflicting</structfield> <type>bool</type>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index fe2bb50f46..af27616657 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -469,6 +469,7 @@ AS 'pg_logical_emit_message_bytea';
 CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
     IN slot_name name, IN immediately_reserve boolean DEFAULT false,
     IN temporary boolean DEFAULT false,
+    IN inactive_timeout int DEFAULT 0,
     OUT slot_name name, OUT lsn pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
@@ -480,6 +481,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
     IN temporary boolean DEFAULT false,
     IN twophase boolean DEFAULT false,
     IN failover boolean DEFAULT false,
+    IN inactive_timeout int DEFAULT 0,
     OUT slot_name name, OUT lsn pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index bc70ff193e..40d7ad469d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1024,6 +1024,7 @@ CREATE VIEW pg_replication_slots AS
             L.safe_wal_size,
             L.two_phase,
             L.last_inactive_time,
+            L.inactive_timeout,
             L.conflicting,
             L.invalidation_reason,
             L.failover,
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..c01876ceeb 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -131,6 +131,7 @@ typedef struct RemoteSlot
 	char	   *database;
 	bool		two_phase;
 	bool		failover;
+	int			inactive_timeout;
 	XLogRecPtr	restart_lsn;
 	XLogRecPtr	confirmed_lsn;
 	TransactionId catalog_xmin;
@@ -167,7 +168,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		remote_slot->two_phase == slot->data.two_phase &&
 		remote_slot->failover == slot->data.failover &&
 		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
-		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
+		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0 &&
+		remote_slot->inactive_timeout == slot->data.inactive_timeout)
 		return false;
 
 	/* Avoid expensive operations while holding a spinlock. */
@@ -182,6 +184,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 	slot->data.catalog_xmin = remote_slot->catalog_xmin;
 	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+	slot->data.inactive_timeout = remote_slot->inactive_timeout;
 	SpinLockRelease(&slot->mutex);
 
 	if (xmin_changed)
@@ -607,7 +610,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
 							  remote_slot->two_phase,
 							  remote_slot->failover,
-							  true);
+							  true, 0);
 
 		/* For shorter lines. */
 		slot = MyReplicationSlot;
@@ -627,6 +630,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		SpinLockAcquire(&slot->mutex);
 		slot->effective_catalog_xmin = xmin_horizon;
 		slot->data.catalog_xmin = xmin_horizon;
+		slot->data.inactive_timeout = remote_slot->inactive_timeout;
 		SpinLockRelease(&slot->mutex);
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
@@ -652,9 +656,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 static bool
 synchronize_slots(WalReceiverConn *wrconn)
 {
-#define SLOTSYNC_COLUMN_COUNT 9
+#define SLOTSYNC_COLUMN_COUNT 10
 	Oid			slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
-	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
+	LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID, INT4OID};
 
 	WalRcvExecResult *res;
 	TupleTableSlot *tupslot;
@@ -663,7 +667,7 @@ synchronize_slots(WalReceiverConn *wrconn)
 	bool		started_tx = false;
 	const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
 		" restart_lsn, catalog_xmin, two_phase, failover,"
-		" database, invalidation_reason"
+		" database, invalidation_reason, inactive_timeout"
 		" FROM pg_catalog.pg_replication_slots"
 		" WHERE failover and NOT temporary";
 
@@ -743,6 +747,9 @@ synchronize_slots(WalReceiverConn *wrconn)
 		remote_slot->invalidated = isnull ? RS_INVAL_NONE :
 			GetSlotInvalidationCause(TextDatumGetCString(d));
 
+		remote_slot->inactive_timeout = DatumGetInt32(slot_getattr(tupslot, ++col,
+																   &isnull));
+
 		/* Sanity check */
 		Assert(col == SLOTSYNC_COLUMN_COUNT);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0f48d6dc7c..852a657e97 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -129,7 +129,7 @@ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
 	sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
 
 #define SLOT_MAGIC		0x1051CA1	/* format identifier */
-#define SLOT_VERSION	5		/* version for new files */
+#define SLOT_VERSION	6		/* version for new files */
 
 /* Control array for replication slot management */
 ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -304,11 +304,14 @@ ReplicationSlotValidateName(const char *name, int elevel)
  * failover: If enabled, allows the slot to be synced to standbys so
  *     that logical replication can be resumed after failover.
  * synced: True if the slot is synchronized from the primary server.
+ * inactive_timeout: The amount of time in seconds the slot is allowed to be
+ *     inactive.
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
 					  ReplicationSlotPersistency persistency,
-					  bool two_phase, bool failover, bool synced)
+					  bool two_phase, bool failover, bool synced,
+					  int inactive_timeout)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -345,6 +348,18 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 					errmsg("cannot enable failover for a temporary replication slot"));
 	}
 
+	if (inactive_timeout > 0)
+	{
+		/*
+		 * Do not allow users to set inactive_timeout for temporary slots,
+		 * because temporary slots will not be saved to the disk.
+		 */
+		if (persistency == RS_TEMPORARY)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot set inactive_timeout for a temporary replication slot"));
+	}
+
 	/*
 	 * If some other backend ran this code concurrently with us, we'd likely
 	 * both allocate the same slot, and that would be bad.  We'd also be at
@@ -398,6 +413,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.two_phase_at = InvalidXLogRecPtr;
 	slot->data.failover = failover;
 	slot->data.synced = synced;
+	slot->data.inactive_timeout = inactive_timeout;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 24f5e6d90a..fb79401c50 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -38,14 +38,15 @@
  */
 static void
 create_physical_replication_slot(char *name, bool immediately_reserve,
-								 bool temporary, XLogRecPtr restart_lsn)
+								 bool temporary, int inactive_timeout,
+								 XLogRecPtr restart_lsn)
 {
 	Assert(!MyReplicationSlot);
 
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(name, false,
 						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
-						  false, false);
+						  false, false, inactive_timeout);
 
 	if (immediately_reserve)
 	{
@@ -71,6 +72,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 	Name		name = PG_GETARG_NAME(0);
 	bool		immediately_reserve = PG_GETARG_BOOL(1);
 	bool		temporary = PG_GETARG_BOOL(2);
+	int			inactive_timeout = PG_GETARG_INT32(3);
 	Datum		values[2];
 	bool		nulls[2];
 	TupleDesc	tupdesc;
@@ -84,9 +86,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckSlotRequirements();
 
+	if (inactive_timeout < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				 errmsg("\"inactive_timeout\" must not be negative")));
+
 	create_physical_replication_slot(NameStr(*name),
 									 immediately_reserve,
 									 temporary,
+									 inactive_timeout,
 									 InvalidXLogRecPtr);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
@@ -120,7 +128,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 static void
 create_logical_replication_slot(char *name, char *plugin,
 								bool temporary, bool two_phase,
-								bool failover,
+								bool failover, int inactive_timeout,
 								XLogRecPtr restart_lsn,
 								bool find_startpoint)
 {
@@ -138,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin,
 	 */
 	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
-						  failover, false);
+						  failover, false, inactive_timeout);
 
 	/*
 	 * Create logical decoding context to find start point or, if we don't
@@ -177,6 +185,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	bool		temporary = PG_GETARG_BOOL(2);
 	bool		two_phase = PG_GETARG_BOOL(3);
 	bool		failover = PG_GETARG_BOOL(4);
+	int			inactive_timeout = PG_GETARG_INT32(5);
 	Datum		result;
 	TupleDesc	tupdesc;
 	HeapTuple	tuple;
@@ -190,11 +199,17 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckLogicalDecodingRequirements();
 
+	if (inactive_timeout < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				 errmsg("\"inactive_timeout\" must not be negative")));
+
 	create_logical_replication_slot(NameStr(*name),
 									NameStr(*plugin),
 									temporary,
 									two_phase,
 									failover,
+									inactive_timeout,
 									InvalidXLogRecPtr,
 									true);
 
@@ -239,7 +254,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 19
+#define PG_GET_REPLICATION_SLOTS_COLS 20
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	XLogRecPtr	currlsn;
 	int			slotno;
@@ -415,6 +430,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		values[i++] = Int32GetDatum(slot_contents.data.inactive_timeout);
+
 		cause = slot_contents.data.invalidated;
 
 		if (SlotIsPhysical(&slot_contents))
@@ -720,6 +737,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 	XLogRecPtr	src_restart_lsn;
 	bool		src_islogical;
 	bool		temporary;
+	int			inactive_timeout;
 	char	   *plugin;
 	Datum		values[2];
 	bool		nulls[2];
@@ -776,6 +794,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 	src_restart_lsn = first_slot_contents.data.restart_lsn;
 	temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
 	plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
+	inactive_timeout = first_slot_contents.data.inactive_timeout;
 
 	/* Check type of replication slot */
 	if (src_islogical != logical_slot)
@@ -823,6 +842,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 										temporary,
 										false,
 										false,
+										inactive_timeout,
 										src_restart_lsn,
 										false);
 	}
@@ -830,6 +850,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		create_physical_replication_slot(NameStr(*dst_name),
 										 true,
 										 temporary,
+										 inactive_timeout,
 										 src_restart_lsn);
 
 	/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc40c454de..5315c08650 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1221,7 +1221,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-							  false, false, false);
+							  false, false, false, 0);
 
 		if (reserve_wal)
 		{
@@ -1252,7 +1252,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  two_phase, failover, false);
+							  two_phase, failover, false, 0);
 
 		/*
 		 * Do options check early so that we can bail before calling the
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 95c22a7200..12626987f0 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -676,7 +676,8 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	 * removed.
 	 */
 	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
-							"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
+							"%s as caught_up, invalidation_reason IS NOT NULL as invalid, "
+							"inactive_timeout "
 							"FROM pg_catalog.pg_replication_slots "
 							"WHERE slot_type = 'logical' AND "
 							"database = current_database() AND "
@@ -696,6 +697,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 		int			i_failover;
 		int			i_caught_up;
 		int			i_invalid;
+		int			i_inactive_timeout;
 
 		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
 
@@ -705,6 +707,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 		i_failover = PQfnumber(res, "failover");
 		i_caught_up = PQfnumber(res, "caught_up");
 		i_invalid = PQfnumber(res, "invalid");
+		i_inactive_timeout = PQfnumber(res, "inactive_timeout");
 
 		for (int slotnum = 0; slotnum < num_slots; slotnum++)
 		{
@@ -716,6 +719,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 			curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0);
 			curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0);
 			curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0);
+			curr->inactive_timeout = atooid(PQgetvalue(res, slotnum, i_inactive_timeout));
 		}
 	}
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index f6143b6bc4..2656056103 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -931,9 +931,10 @@ create_logical_replication_slots(void)
 			appendPQExpBuffer(query, ", ");
 			appendStringLiteralConn(query, slot_info->plugin, conn);
 
-			appendPQExpBuffer(query, ", false, %s, %s);",
+			appendPQExpBuffer(query, ", false, %s, %s, %d);",
 							  slot_info->two_phase ? "true" : "false",
-							  slot_info->failover ? "true" : "false");
+							  slot_info->failover ? "true" : "false",
+							  slot_info->inactive_timeout);
 
 			PQclear(executeQueryOrDie(conn, "%s", query->data));
 
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 92bcb693fb..eb86d000b1 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -162,6 +162,8 @@ typedef struct
 	bool		invalid;		/* if true, the slot is unusable */
 	bool		failover;		/* is the slot designated to be synced to the
 								 * physical standby? */
+	int			inactive_timeout;	/* The amount of time in seconds the slot
+									 * is allowed to be inactive. */
 } LogicalSlotInfo;
 
 typedef struct
diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl b/src/bin/pg_upgrade/t/003_logical_slots.pl
index 83d71c3084..6e82d2cb7b 100644
--- a/src/bin/pg_upgrade/t/003_logical_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_slots.pl
@@ -153,14 +153,17 @@ like(
 # TEST: Successful upgrade
 
 # Preparations for the subsequent test:
-# 1. Setup logical replication (first, cleanup slots from the previous tests)
+# 1. Setup logical replication (first, cleanup slots from the previous tests,
+# and then create slot for this test with inactive_timeout set).
 my $old_connstr = $oldpub->connstr . ' dbname=postgres';
 
+my $inactive_timeout = 3600;
 $oldpub->start;
 $oldpub->safe_psql(
 	'postgres', qq[
 	SELECT * FROM pg_drop_replication_slot('test_slot1');
 	SELECT * FROM pg_drop_replication_slot('test_slot2');
+	SELECT pg_create_logical_replication_slot(slot_name := 'regress_sub', plugin := 'pgoutput', inactive_timeout := $inactive_timeout);
 	CREATE PUBLICATION regress_pub FOR ALL TABLES;
 ]);
 
@@ -172,7 +175,7 @@ $sub->start;
 $sub->safe_psql(
 	'postgres', qq[
 	CREATE TABLE tbl (a int);
-	CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true', failover = 'true')
+	CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (slot_name = 'regress_sub', create_slot = false, two_phase = 'true', failover = 'true')
 ]);
 $sub->wait_for_subscription_sync($oldpub, 'regress_sub');
 
@@ -192,8 +195,8 @@ command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old cluster');
 # Check that the slot 'regress_sub' has migrated to the new cluster
 $newpub->start;
 my $result = $newpub->safe_psql('postgres',
-	"SELECT slot_name, two_phase, failover FROM pg_replication_slots");
-is($result, qq(regress_sub|t|t), 'check the slot exists on new cluster');
+	"SELECT slot_name, two_phase, failover, inactive_timeout = $inactive_timeout FROM pg_replication_slots");
+is($result, qq(regress_sub|t|t|t), 'check the slot exists on new cluster');
 
 # Update the connection
 my $new_connstr = $newpub->connstr . ' dbname=postgres';
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 0d26e5b422..a09da44b6a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11105,10 +11105,10 @@
 # replication slots
 { oid => '3779', descr => 'create a physical replication slot',
   proname => 'pg_create_physical_replication_slot', provolatile => 'v',
-  proparallel => 'u', prorettype => 'record', proargtypes => 'name bool bool',
-  proallargtypes => '{name,bool,bool,name,pg_lsn}',
-  proargmodes => '{i,i,i,o,o}',
-  proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name bool bool int4',
+  proallargtypes => '{name,bool,bool,int4,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{slot_name,immediately_reserve,temporary,inactive_timeout,slot_name,lsn}',
   prosrc => 'pg_create_physical_replication_slot' },
 { oid => '4220',
   descr => 'copy a physical replication slot, changing temporality',
@@ -11133,17 +11133,17 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,bool,text,bool,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,last_inactive_time,conflicting,invalidation_reason,failover,synced}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,timestamptz,int4,bool,text,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,last_inactive_time,inactive_timeout,conflicting,invalidation_reason,failover,synced}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
   proparallel => 'u', prorettype => 'record',
-  proargtypes => 'name name bool bool bool',
-  proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
-  proargmodes => '{i,i,i,i,i,o,o}',
-  proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
+  proargtypes => 'name name bool bool bool int4',
+  proallargtypes => '{name,name,bool,bool,bool,int4,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,i,i,o,o}',
+  proargnames => '{slot_name,plugin,temporary,twophase,failover,inactive_timeout,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
 { oid => '4222',
   descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 2f18433ecc..24623cfdc1 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -127,6 +127,9 @@ typedef struct ReplicationSlotPersistentData
 	 * for logical slots on the primary server.
 	 */
 	bool		failover;
+
+	/* The amount of time in seconds the slot is allowed to be inactive */
+	int			inactive_timeout;
 } ReplicationSlotPersistentData;
 
 /*
@@ -239,7 +242,7 @@ extern void ReplicationSlotsShmemInit(void);
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
 								  ReplicationSlotPersistency persistency,
 								  bool two_phase, bool failover,
-								  bool synced);
+								  bool synced, int inactive_timeout);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..3dd780beab 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -152,8 +152,9 @@ log_min_messages = 'debug2'
 $primary->append_conf('postgresql.conf', "log_min_messages = 'debug2'");
 $primary->reload;
 
+my $inactive_timeout = 3600;
 $primary->psql('postgres',
-	q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
+	"SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true, $inactive_timeout);"
 );
 
 $primary->psql('postgres',
@@ -190,6 +191,16 @@ is( $standby1->safe_psql(
 	"t",
 	'logical slots have synced as true on standby');
 
+# Confirm that the synced slot on the standby has got inactive_timeout from the
+# primary.
+is( $standby1->safe_psql(
+		'postgres',
+		"SELECT inactive_timeout = $inactive_timeout FROM pg_replication_slots
+			WHERE slot_name = 'lsub2_slot' AND synced AND NOT temporary;"
+	),
+	"t",
+	'synced logical slot has got inactive_timeout on standby');
+
 ##################################################
 # Test that the synchronized slot will be dropped if the corresponding remote
 # slot on the primary server has been dropped.
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index dfcbaec387..d532e23176 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1474,11 +1474,12 @@ pg_replication_slots| SELECT l.slot_name,
     l.safe_wal_size,
     l.two_phase,
     l.last_inactive_time,
+    l.inactive_timeout,
     l.conflicting,
     l.invalidation_reason,
     l.failover,
     l.synced
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, last_inactive_time, conflicting, invalidation_reason, failover, synced)
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, last_inactive_time, inactive_timeout, conflicting, invalidation_reason, failover, synced)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.34.1

