Hi,

Here the generated v2 of the Patch.

Thanks

Fabrice

On Mon, Sep 29, 2025 at 8:28 AM shveta malik <[email protected]> wrote:

> On Fri, Sep 26, 2025 at 9:52 PM Fabrice Chapuis <[email protected]>
> wrote:
> >
> > Hi Shveta,
> >
> > Here is the version 1 of the patch with corrections
> >
>
> Thanks. We can bump the version to v2 now — that is, each time we post
> a new version, we can increment the version number.
>
> The patch fails to apply to the latest HEAD. Also it had some
> 'trailing whitespace' issues. Can you please rebase the patch and post
> it again?
>
> > 1)
> > In both create_physical_replication_slot() and
> > create_physical_replication_slot():
> > +   false, false,false);
> >
> > ,false --> , false (space after comma is recommended)
> >
> > 2)
> > + elog(DEBUG1, "logical replication slot %s created with option
> > allow_overwrite to %s",
> > + NameStr(slot->data.name), slot->data.allow_overwrite ? "true" :
> "false");
> >
> > IMO, we don't need this as we do not log other properties of the slot as
> well.
> > Point 1-2 done
> > 3)
> > We can have pg_replication_slots (pg_get_replication_slots) modified
> > to display the new property. Also we can modify docs to have the new
> > property defined.
> >
> > Point 3 Not yet implemented
> >
> > 4)
> > + {
> > + /* Check if we need to overwrite an existing logical slot */
> > + if (allow_overwrite)
> > + {
> > + /* Get rid of a replication slot that is no longer wanted */
> > + ReplicationSlotDrop(remote_slot->name,true);
> > +
> > + /* Get rid of a replication slot that is no longer wanted */
> > + ereport(WARNING,
> > + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> > + errmsg("slot \"%s\" already exists"
> > + " on the standby but it will be dropped because "
> > + "flag allow_overwrite is set to true",
> > + remote_slot->name));
> > +
> > + /* Going back to the main loop after droping the failover slot */
> > + return false;
> >
> > Currently we are returning after dropping the slot. But I think we
> > shall drop the slot and proceed with new slot creation of the same
> > name otherwise we may be left with old slot dropped and no new slot
> > created  specially when API is run.
> >
> > Scenario:
> > --On primary, create 2 slots: slot1 and slot2.
> > --On standby, create one slot slot1 with allow_overwrite=true.
> > --Run 'SELECT pg_sync_replication_slots();' on standby.
> >
> > At the end of API, expectation is that both slots will be present with
> > 'synced'=true, but only slot2 is present. if we run this API next
> > time, slot1 is created. It should have been dropped and recreated (or
> > say overwritten) in the first run itself.
> >
> > Point 4:
> > I put the creation slot under the allow_overwrite condition.
> > After testing with syn_standby_slots disable on the standby, it works. I
> think the code for the synchronisation of the new slot could be factorised.
> >
> > 5)
> > IMO, LOG is sufficient here, as the action aligns with the
> > user-provided 'allow_overwrite' setting.
> >
> > Point 5
> > Where is it in the code?
> >
>
> This one:
>
> + ereport(WARNING,
> + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("slot \"%s\" already exists"
> + " on the standby but try to recreate it because "
> + "flag allow_overwrite is set to true",
> + remote_slot->name));
>
> thanks
> Shveta
>
From 2ce1d7fa50df7ed6f7370b5e725a15ad11ec3d08 Mon Sep 17 00:00:00 2001
From: Fabrice Chapuis <[email protected]>
Date: Mon, 29 Sep 2025 13:10:38 +0200
Subject: [PATCH v2] Add allow_overwrite option to logical replication slot
 creation

This patch adds a new parameter allow_overwrite to
pg_create_logical_replication_slot. When true, an existing
failover slot on standby will be dropped and replaced
automatically.

Signed-off-by: Fabrice Chapuis <[email protected]>
---
 src/backend/catalog/system_functions.sql   |   1 +
 src/backend/replication/logical/launcher.c |   2 +-
 src/backend/replication/logical/slotsync.c | 203 ++++++++++++++-------
 src/backend/replication/slot.c             |   3 +-
 src/backend/replication/slotfuncs.c        |   9 +-
 src/backend/replication/walsender.c        |   4 +-
 src/include/catalog/pg_proc.dat            |   8 +-
 src/include/replication/slot.h             |   8 +-
 8 files changed, 159 insertions(+), 79 deletions(-)

diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 2d946d6..75ea84f 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
     IN temporary boolean DEFAULT false,
     IN twophase boolean DEFAULT false,
     IN failover boolean DEFAULT false,
+    IN allow_overwrite boolean DEFAULT false,
     OUT slot_name name, OUT lsn pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 218cefe..a26af52 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1519,7 +1519,7 @@ CreateConflictDetectionSlot(void)
 			errmsg("creating replication conflict detection slot"));
 
 	ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
-						  false, false);
+						  false, false, false);
 
 	init_conflict_slot_xmin();
 }
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8c061d5..dbf9380 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -653,91 +653,159 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
 	{
 		bool		synced;
+		bool		allow_overwrite;
 
 		SpinLockAcquire(&slot->mutex);
 		synced = slot->data.synced;
+		allow_overwrite = slot->data.allow_overwrite;
 		SpinLockRelease(&slot->mutex);
 
 		/* User-created slot with the same name exists, raise ERROR. */
 		if (!synced)
-			ereport(ERROR,
+		{
+			/* Check if we need to overwrite an existing logical slot */
+			if (allow_overwrite)
+			{
+				/* Recreate the failover slot and synchronize it */
+				NameData	plugin_name;
+				TransactionId xmin_horizon = InvalidTransactionId;
+
+				ereport(LOG,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("slot \"%s\" already exists"
+					" on the standby but try to recreate it because "
+					"flag allow_overwrite is set to true",
+						remote_slot->name));
+
+				/* Get rid of a replication slot that is no longer wanted */
+				ReplicationSlotDrop(remote_slot->name,true);
+
+				/* Skip creating the local slot if remote_slot is invalidated already */
+				if (remote_slot->invalidated != RS_INVAL_NONE)
+					return false;
+
+				/*
+				* We create temporary slots instead of ephemeral slots here because
+				* we want the slots to survive after releasing them. This is done to
+				* avoid dropping and re-creating the slots in each synchronization
+				* cycle if the restart_lsn or catalog_xmin of the remote slot has not
+				* caught up.
+				*/
+				ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
+									remote_slot->two_phase,
+									remote_slot->failover,
+									true,
+									true);
+
+				/* For shorter lines. */
+				slot = MyReplicationSlot;
+
+				/* Avoid expensive operations while holding a spinlock. */
+				namestrcpy(&plugin_name, remote_slot->plugin);
+
+				SpinLockAcquire(&slot->mutex);
+				slot->data.database = remote_dbid;
+				slot->data.plugin = plugin_name;
+				SpinLockRelease(&slot->mutex);
+
+				reserve_wal_for_local_slot(remote_slot->restart_lsn);
+
+				LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+				xmin_horizon = GetOldestSafeDecodingTransactionId(true);
+				SpinLockAcquire(&slot->mutex);
+				slot->effective_catalog_xmin = xmin_horizon;
+				slot->data.catalog_xmin = xmin_horizon;
+				SpinLockRelease(&slot->mutex);
+				ReplicationSlotsComputeRequiredXmin(true);
+				LWLockRelease(ProcArrayLock);
+
+				update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+
+				slot_updated = true;
+			}
+			else
+				ereport(ERROR,
 					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					errmsg("exiting from slot synchronization because same"
 						   " name slot \"%s\" already exists on the standby",
 						   remote_slot->name));
-
-		/*
-		 * The slot has been synchronized before.
-		 *
-		 * It is important to acquire the slot here before checking
-		 * invalidation. If we don't acquire the slot first, there could be a
-		 * race condition that the local slot could be invalidated just after
-		 * checking the 'invalidated' flag here and we could end up
-		 * overwriting 'invalidated' flag to remote_slot's value. See
-		 * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
-		 * if the slot is not acquired by other processes.
-		 *
-		 * XXX: If it ever turns out that slot acquire/release is costly for
-		 * cases when none of the slot properties is changed then we can do a
-		 * pre-check to ensure that at least one of the slot properties is
-		 * changed before acquiring the slot.
-		 */
-		ReplicationSlotAcquire(remote_slot->name, true, false);
-
-		Assert(slot == MyReplicationSlot);
-
-		/*
-		 * Copy the invalidation cause from remote only if local slot is not
-		 * invalidated locally, we don't want to overwrite existing one.
-		 */
-		if (slot->data.invalidated == RS_INVAL_NONE &&
-			remote_slot->invalidated != RS_INVAL_NONE)
+		}
+		else
 		{
-			SpinLockAcquire(&slot->mutex);
-			slot->data.invalidated = remote_slot->invalidated;
-			SpinLockRelease(&slot->mutex);
 
-			/* Make sure the invalidated state persists across server restart */
-			ReplicationSlotMarkDirty();
-			ReplicationSlotSave();
+			/*
+			* The slot has been synchronized before.
+			*
+			* It is important to acquire the slot here before checking
+			* invalidation. If we don't acquire the slot first, there could be a
+			* race condition that the local slot could be invalidated just after
+			* checking the 'invalidated' flag here and we could end up
+			* overwriting 'invalidated' flag to remote_slot's value. See
+			* InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
+			* if the slot is not acquired by other processes.
+			*
+			* XXX: If it ever turns out that slot acquire/release is costly for
+			* cases when none of the slot properties is changed then we can do a
+			* pre-check to ensure that at least one of the slot properties is
+			* changed before acquiring the slot.
+			*/
+			ReplicationSlotAcquire(remote_slot->name, true, false);
+
+			Assert(slot == MyReplicationSlot);
 
-			slot_updated = true;
-		}
+			/*
+			* Copy the invalidation cause from remote only if local slot is not
+			* invalidated locally, we don't want to overwrite existing one.
+			*/
+			if (slot->data.invalidated == RS_INVAL_NONE &&
+				remote_slot->invalidated != RS_INVAL_NONE)
+			{
+				SpinLockAcquire(&slot->mutex);
+				slot->data.invalidated = remote_slot->invalidated;
+				SpinLockRelease(&slot->mutex);
 
-		/* Skip the sync of an invalidated slot */
-		if (slot->data.invalidated != RS_INVAL_NONE)
-		{
-			ReplicationSlotRelease();
-			return slot_updated;
-		}
+				/* Make sure the invalidated state persists across server restart */
+				ReplicationSlotMarkDirty();
+				ReplicationSlotSave();
 
-		/* Slot not ready yet, let's attempt to make it sync-ready now. */
-		if (slot->data.persistency == RS_TEMPORARY)
-		{
-			slot_updated = update_and_persist_local_synced_slot(remote_slot,
-																remote_dbid);
-		}
+				slot_updated = true;
+			}
 
-		/* Slot ready for sync, so sync it. */
-		else
-		{
-			/*
-			 * Sanity check: As long as the invalidations are handled
-			 * appropriately as above, this should never happen.
-			 *
-			 * We don't need to check restart_lsn here. See the comments in
-			 * update_local_synced_slot() for details.
-			 */
-			if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
-				ereport(ERROR,
-						errmsg_internal("cannot synchronize local slot \"%s\"",
-										remote_slot->name),
-						errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
-										   LSN_FORMAT_ARGS(slot->data.confirmed_flush),
-										   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
+			/* Skip the sync of an invalidated slot */
+			if (slot->data.invalidated != RS_INVAL_NONE)
+			{
+				ReplicationSlotRelease();
+				return slot_updated;
+			}
 
-			slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
-													NULL, NULL);
+			/* Slot not ready yet, let's attempt to make it sync-ready now. */
+			if (slot->data.persistency == RS_TEMPORARY)
+			{
+				slot_updated = update_and_persist_local_synced_slot(remote_slot,
+																	remote_dbid);
+			}
+
+			/* Slot ready for sync, so sync it. */
+			else
+			{
+				/*
+				* Sanity check: As long as the invalidations are handled
+				* appropriately as above, this should never happen.
+				*
+				* We don't need to check restart_lsn here. See the comments in
+				* update_local_synced_slot() for details.
+				*/
+				if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
+					ereport(ERROR,
+							errmsg_internal("cannot synchronize local slot \"%s\"",
+											remote_slot->name),
+							errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
+											LSN_FORMAT_ARGS(slot->data.confirmed_flush),
+											LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
+
+				slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
+														NULL, NULL);
+			}
 		}
 	}
 	/* Otherwise create the slot first. */
@@ -760,6 +828,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
 							  remote_slot->two_phase,
 							  remote_slot->failover,
+							  true,
 							  true);
 
 		/* For shorter lines. */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index fd0fdb9..fcba4d4 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -351,7 +351,7 @@ IsSlotForConflictCheck(const char *name)
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
 					  ReplicationSlotPersistency persistency,
-					  bool two_phase, bool failover, bool synced)
+					  bool two_phase, bool failover, bool allow_overwrite, bool synced)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -445,6 +445,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.two_phase = two_phase;
 	slot->data.two_phase_at = InvalidXLogRecPtr;
 	slot->data.failover = failover;
+	slot->data.allow_overwrite = allow_overwrite;
 	slot->data.synced = synced;
 
 	/* and then data only present in shared memory */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index b8f2115..1a50244 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -41,7 +41,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(name, false,
 						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
-						  false, false);
+						  false, false, false);
 
 	if (immediately_reserve)
 	{
@@ -116,7 +116,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
 static void
 create_logical_replication_slot(char *name, char *plugin,
 								bool temporary, bool two_phase,
-								bool failover,
+								bool failover, bool allow_overwrite,
 								XLogRecPtr restart_lsn,
 								bool find_startpoint)
 {
@@ -134,7 +134,7 @@ create_logical_replication_slot(char *name, char *plugin,
 	 */
 	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
-						  failover, false);
+						  failover, allow_overwrite, false);
 
 	/*
 	 * Create logical decoding context to find start point or, if we don't
@@ -173,6 +173,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	bool		temporary = PG_GETARG_BOOL(2);
 	bool		two_phase = PG_GETARG_BOOL(3);
 	bool		failover = PG_GETARG_BOOL(4);
+	bool		allow_overwrite = PG_GETARG_BOOL(5);
 	Datum		result;
 	TupleDesc	tupdesc;
 	HeapTuple	tuple;
@@ -191,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 									temporary,
 									two_phase,
 									failover,
+									allow_overwrite,
 									InvalidXLogRecPtr,
 									true);
 
@@ -726,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 										temporary,
 										false,
 										false,
+										false,
 										src_restart_lsn,
 										false);
 	}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 59822f2..40586b2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1202,7 +1202,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-							  false, false, false);
+							  false, false, false, false);
 
 		if (reserve_wal)
 		{
@@ -1233,7 +1233,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  two_phase, failover, false);
+							  two_phase, failover, false, false);
 
 		/*
 		 * Do options check early so that we can bail before calling the
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 01eba3b..fbaacfb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11510,10 +11510,10 @@
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
   proparallel => 'u', prorettype => 'record',
-  proargtypes => 'name name bool bool bool',
-  proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}',
-  proargmodes => '{i,i,i,i,i,o,o}',
-  proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}',
+  proargtypes => 'name name bool bool bool bool',
+  proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,i,i,o,o}',
+  proargnames => '{slot_name,plugin,temporary,twophase,failover,allow_overwrite,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
 { oid => '4222',
   descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index fe62162..ca50b79 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -141,6 +141,12 @@ typedef struct ReplicationSlotPersistentData
 	 * for logical slots on the primary server.
 	 */
 	bool		failover;
+
+	/*
+	 * Allow Postgres to drop logical replication slot on standby server to ensure
+	 * creation of new failover slot when remote sync_replication_slots is set to true.
+	 */
+	bool		allow_overwrite;
 } ReplicationSlotPersistentData;
 
 /*
@@ -301,7 +307,7 @@ extern void ReplicationSlotsShmemInit(void);
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
 								  ReplicationSlotPersistency persistency,
-								  bool two_phase, bool failover,
+								  bool two_phase, bool failover, bool allow_overwrite,
 								  bool synced);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
-- 
2.47.3

Reply via email to