From 67d28785b9d0a4cf10a863cc7d1265d5753a41a0 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Tue, 3 Jun 2025 15:31:47 -0700
Subject: [PATCH v1 1/2] Allow to create logical slots with no WAL reservation.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
---
 contrib/test_decoding/expected/slot.out       | 32 ++++++++
 contrib/test_decoding/sql/slot.sql            | 12 +++
 src/backend/catalog/system_functions.sql      |  1 +
 src/backend/replication/logical/logical.c     | 34 +++++++++
 .../replication/logical/logicalfuncs.c        | 16 ++--
 src/backend/replication/slotfuncs.c           | 75 +++++++++++++------
 src/backend/replication/walsender.c           | 12 +--
 src/include/catalog/pg_proc.dat               |  8 +-
 src/include/replication/logical.h             |  7 ++
 9 files changed, 156 insertions(+), 41 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 7de03c79f6f..4475260286e 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -466,3 +466,35 @@ SELECT pg_drop_replication_slot('physical_slot');
  
 (1 row)
 
+-- Test logical slots with not WAL reservation.
+SELECT 'init' FROM pg_create_logical_replication_slot('slot_nowal', 'test_decoding', false, false, false, false);
+ ?column? 
+----------
+ init
+(1 row)
+
+-- No WAL reserved.
+SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal';
+ restart_lsn | confirmed_flush_lsn 
+-------------+---------------------
+             | 
+(1 row)
+
+-- Use this slot and check the WAL reservation again.
+SELECT data FROM pg_logical_slot_get_changes('slot_nowal', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+SELECT restart_lsn is not null as valid_restart_lsn, confirmed_flush_lsn is not null as valid_confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal';
+ valid_restart_lsn | valid_confirmed_flush_lsn 
+-------------------+---------------------------
+ t                 | t
+(1 row)
+
+SELECT pg_drop_replication_slot('slot_nowal');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 580e3ae3bef..3da5d6eab9b 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -190,3 +190,15 @@ SELECT pg_drop_replication_slot('failover_true_slot');
 SELECT pg_drop_replication_slot('failover_false_slot');
 SELECT pg_drop_replication_slot('failover_default_slot');
 SELECT pg_drop_replication_slot('physical_slot');
+
+-- Test logical slots with not WAL reservation.
+SELECT 'init' FROM pg_create_logical_replication_slot('slot_nowal', 'test_decoding', false, false, false, false);
+
+-- No WAL reserved.
+SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal';
+
+-- Use this slot and check the WAL reservation again.
+SELECT data FROM pg_logical_slot_get_changes('slot_nowal', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT restart_lsn is not null as valid_restart_lsn, confirmed_flush_lsn is not null as valid_confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal';
+
+SELECT pg_drop_replication_slot('slot_nowal');
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..5a801f5f3bb 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
     IN temporary boolean DEFAULT false,
     IN twophase boolean DEFAULT false,
     IN failover boolean DEFAULT false,
+    IN immediately_reserve boolean DEFAULT true,
     OUT slot_name name, OUT lsn pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1d56d0c4ef3..2a34d2bf846 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -303,6 +303,40 @@ StartupDecodingContext(List *output_plugin_options,
 	return ctx;
 }
 
+/*
+ * Create the logical decoding and initialize it if necessary. This function
+ * can be used for logical slot that might not have been initialized yet.
+ */
+LogicalDecodingContext *
+CreateOrInitDecodingContext(XLogRecPtr restart_lsn,
+							List *output_plugin_options,
+							bool fast_foward,
+							XLogReaderRoutine *xl_routine,
+							LogicalOutputPluginWriterPrepareWrite prepare_write,
+							LogicalOutputPluginWriterWrite do_write,
+							LogicalOutputPluginWriterUpdateProgress update_progress)
+{
+	LogicalDecodingContext *ctx;
+
+	/* Initialize the slot with a new logical decoding if not yet */
+	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
+	{
+		ctx = CreateInitDecodingContext(NameStr(MyReplicationSlot->data.plugin),
+										output_plugin_options, false,
+										restart_lsn, xl_routine,
+										prepare_write, do_write, update_progress);
+
+		DecodingContextFindStartpoint(ctx);
+
+		FreeDecodingContext(ctx);
+	}
+
+	ctx = CreateDecodingContext(restart_lsn, output_plugin_options, fast_foward,
+								xl_routine, prepare_write, do_write, update_progress);
+
+	return ctx;
+}
+
 /*
  * Create a new decoding context, for a new logical slot.
  *
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index ca53caac2f2..8469280a37b 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -202,14 +202,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	PG_TRY();
 	{
 		/* restart at slot's confirmed_flush */
-		ctx = CreateDecodingContext(InvalidXLogRecPtr,
-									options,
-									false,
-									XL_ROUTINE(.page_read = read_local_xlog_page,
-											   .segment_open = wal_segment_open,
-											   .segment_close = wal_segment_close),
-									LogicalOutputPrepareWrite,
-									LogicalOutputWrite, NULL);
+		ctx = CreateOrInitDecodingContext(InvalidXLogRecPtr,
+										  options,
+										  false,
+										  XL_ROUTINE(.page_read = read_local_xlog_page,
+													 .segment_open = wal_segment_open,
+													 .segment_close = wal_segment_close),
+										  LogicalOutputPrepareWrite,
+										  LogicalOutputWrite, NULL);
 
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 36cc2ed4e44..ccbc3732e95 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -116,13 +116,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 static void
 create_logical_replication_slot(char *name, char *plugin,
 								bool temporary, bool two_phase,
-								bool failover,
+								bool failover, bool immediately_reserve,
 								XLogRecPtr restart_lsn,
 								bool find_startpoint)
 {
 	LogicalDecodingContext *ctx = NULL;
 
 	Assert(!MyReplicationSlot);
+	Assert(plugin != NULL);
 
 	/*
 	 * Acquire a logical decoding slot, this will check for conflicting names.
@@ -136,30 +137,55 @@ create_logical_replication_slot(char *name, char *plugin,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
 						  failover, false);
 
-	/*
-	 * Create logical decoding context to find start point or, if we don't
-	 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
-	 *
-	 * Note: when !find_startpoint this is still important, because it's at
-	 * this point that the output plugin is validated.
-	 */
-	ctx = CreateInitDecodingContext(plugin, NIL,
-									false,	/* just catalogs is OK */
-									restart_lsn,
-									XL_ROUTINE(.page_read = read_local_xlog_page,
-											   .segment_open = wal_segment_open,
-											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
+	if (immediately_reserve)
+	{
+		/*
+		 * Create logical decoding context to find start point or, if we don't
+		 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin
+		 * sanity.
+		 *
+		 * Note: when !find_startpoint this is still important, because it's
+		 * at this point that the output plugin is validated.
+		 */
+		ctx = CreateInitDecodingContext(plugin, NIL,
+										false,	/* just catalogs is OK */
+										restart_lsn,
+										XL_ROUTINE(.page_read = read_local_xlog_page,
+												   .segment_open = wal_segment_open,
+												   .segment_close = wal_segment_close),
+										NULL, NULL, NULL);
 
-	/*
-	 * If caller needs us to determine the decoding start point, do so now.
-	 * This might take a while.
-	 */
-	if (find_startpoint)
-		DecodingContextFindStartpoint(ctx);
+		/*
+		 * If caller needs us to determine the decoding start point, do so
+		 * now. This might take a while.
+		 */
+		if (find_startpoint)
+			DecodingContextFindStartpoint(ctx);
+
+		/* don't need the decoding context anymore */
+		FreeDecodingContext(ctx);
+	}
+	else
+	{
+		NameData	plugin_name;
+
+		/*
+		 * On a standby, this check is also required while creating the slot.
+		 * Check the comments in the function.
+		 */
+		CheckLogicalDecodingRequirements();
 
-	/* don't need the decoding context anymore */
-	FreeDecodingContext(ctx);
+		/*
+		 * Register output plugin name with slot.  We need the mutex to avoid
+		 * concurrent reading of a partially copied string.  But we don't want
+		 * any complicated code while holding a spinlock, so do namestrcpy()
+		 * outside.
+		 */
+		namestrcpy(&plugin_name, plugin);
+		SpinLockAcquire(&MyReplicationSlot->mutex);
+		MyReplicationSlot->data.plugin = plugin_name;
+		SpinLockRelease(&MyReplicationSlot->mutex);
+	}
 }
 
 /*
@@ -173,6 +199,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	bool		temporary = PG_GETARG_BOOL(2);
 	bool		two_phase = PG_GETARG_BOOL(3);
 	bool		failover = PG_GETARG_BOOL(4);
+	bool		immediately_reserve = PG_GETARG_BOOL(5);
 	Datum		result;
 	TupleDesc	tupdesc;
 	HeapTuple	tuple;
@@ -191,6 +218,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 									temporary,
 									two_phase,
 									failover,
+									immediately_reserve,
 									InvalidXLogRecPtr,
 									true);
 
@@ -726,6 +754,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 										temporary,
 										false,
 										false,
+										true,
 										src_restart_lsn,
 										false);
 	}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9fa8beb6103..85668ef807c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1462,12 +1462,12 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 	 * are reported early.
 	 */
 	logical_decoding_ctx =
-		CreateDecodingContext(cmd->startpoint, cmd->options, false,
-							  XL_ROUTINE(.page_read = logical_read_xlog_page,
-										 .segment_open = WalSndSegmentOpen,
-										 .segment_close = wal_segment_close),
-							  WalSndPrepareWrite, WalSndWriteData,
-							  WalSndUpdateProgress);
+		CreateOrInitDecodingContext(cmd->startpoint, cmd->options, false,
+									XL_ROUTINE(.page_read = logical_read_xlog_page,
+											   .segment_open = WalSndSegmentOpen,
+											   .segment_close = wal_segment_close),
+									WalSndPrepareWrite, WalSndWriteData,
+									WalSndUpdateProgress);
 	xlogreader = logical_decoding_ctx->reader;
 
 	WalSndSetState(WALSNDSTATE_CATCHUP);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 37a484147a8..62b302fbd1f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11470,10 +11470,10 @@
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
   proparallel => 'u', prorettype => 'record',
-  proargtypes => 'name name bool bool bool',
-  proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
-  proargmodes => '{i,i,i,i,i,o,o}',
-  proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
+  proargtypes => 'name name bool bool bool bool',
+  proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,i,i,o,o}',
+  proargnames => '{slot_name,plugin,temporary,twophase,failover,immediately_reserve,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
 { oid => '4222',
   descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 2e562bee5a9..481ea53e7e1 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -132,6 +132,13 @@ extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
 													 LogicalOutputPluginWriterPrepareWrite prepare_write,
 													 LogicalOutputPluginWriterWrite do_write,
 													 LogicalOutputPluginWriterUpdateProgress update_progress);
+extern LogicalDecodingContext *CreateOrInitDecodingContext(XLogRecPtr restart_lsn,
+														   List *output_plugin_options,
+														   bool fast_foward,
+														   XLogReaderRoutine *xl_routine,
+														   LogicalOutputPluginWriterPrepareWrite prepare_write,
+														   LogicalOutputPluginWriterWrite do_write,
+														   LogicalOutputPluginWriterUpdateProgress update_progress);
 extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
 extern bool DecodingContextReady(LogicalDecodingContext *ctx);
 extern void FreeDecodingContext(LogicalDecodingContext *ctx);
-- 
2.43.5

