Hi, Here the generated v2 of the Patch.
Thanks Fabrice On Mon, Sep 29, 2025 at 8:28 AM shveta malik <[email protected]> wrote: > On Fri, Sep 26, 2025 at 9:52 PM Fabrice Chapuis <[email protected]> > wrote: > > > > Hi Shveta, > > > > Here is the version 1 of the patch with corrections > > > > Thanks. We can bump the version to v2 now — that is, each time we post > a new version, we can increment the version number. > > The patch fails to apply to the latest HEAD. Also it had some > 'trailing whitespace' issues. Can you please rebase the patch and post > it again? > > > 1) > > In both create_physical_replication_slot() and > > create_physical_replication_slot(): > > + false, false,false); > > > > ,false --> , false (space after comma is recommended) > > > > 2) > > + elog(DEBUG1, "logical replication slot %s created with option > > allow_overwrite to %s", > > + NameStr(slot->data.name), slot->data.allow_overwrite ? "true" : > "false"); > > > > IMO, we don't need this as we do not log other properties of the slot as > well. > > Point 1-2 done > > 3) > > We can have pg_replication_slots (pg_get_replication_slots) modified > > to display the new property. Also we can modify docs to have the new > > property defined. > > > > Point 3 Not yet implemented > > > > 4) > > + { > > + /* Check if we need to overwrite an existing logical slot */ > > + if (allow_overwrite) > > + { > > + /* Get rid of a replication slot that is no longer wanted */ > > + ReplicationSlotDrop(remote_slot->name,true); > > + > > + /* Get rid of a replication slot that is no longer wanted */ > > + ereport(WARNING, > > + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > > + errmsg("slot \"%s\" already exists" > > + " on the standby but it will be dropped because " > > + "flag allow_overwrite is set to true", > > + remote_slot->name)); > > + > > + /* Going back to the main loop after droping the failover slot */ > > + return false; > > > > Currently we are returning after dropping the slot. But I think we > > shall drop the slot and proceed with new slot creation of the same > > name otherwise we may be left with old slot dropped and no new slot > > created specially when API is run. > > > > Scenario: > > --On primary, create 2 slots: slot1 and slot2. > > --On standby, create one slot slot1 with allow_overwrite=true. > > --Run 'SELECT pg_sync_replication_slots();' on standby. > > > > At the end of API, expectation is that both slots will be present with > > 'synced'=true, but only slot2 is present. if we run this API next > > time, slot1 is created. It should have been dropped and recreated (or > > say overwritten) in the first run itself. > > > > Point 4: > > I put the creation slot under the allow_overwrite condition. > > After testing with syn_standby_slots disable on the standby, it works. I > think the code for the synchronisation of the new slot could be factorised. > > > > 5) > > IMO, LOG is sufficient here, as the action aligns with the > > user-provided 'allow_overwrite' setting. > > > > Point 5 > > Where is it in the code? > > > > This one: > > + ereport(WARNING, > + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("slot \"%s\" already exists" > + " on the standby but try to recreate it because " > + "flag allow_overwrite is set to true", > + remote_slot->name)); > > thanks > Shveta >
From 2ce1d7fa50df7ed6f7370b5e725a15ad11ec3d08 Mon Sep 17 00:00:00 2001 From: Fabrice Chapuis <[email protected]> Date: Mon, 29 Sep 2025 13:10:38 +0200 Subject: [PATCH v2] Add allow_overwrite option to logical replication slot creation This patch adds a new parameter allow_overwrite to pg_create_logical_replication_slot. When true, an existing failover slot on standby will be dropped and replaced automatically. Signed-off-by: Fabrice Chapuis <[email protected]> --- src/backend/catalog/system_functions.sql | 1 + src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/slotsync.c | 203 ++++++++++++++------- src/backend/replication/slot.c | 3 +- src/backend/replication/slotfuncs.c | 9 +- src/backend/replication/walsender.c | 4 +- src/include/catalog/pg_proc.dat | 8 +- src/include/replication/slot.h | 8 +- 8 files changed, 159 insertions(+), 79 deletions(-) diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 2d946d6..75ea84f 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, IN failover boolean DEFAULT false, + IN allow_overwrite boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 218cefe..a26af52 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1519,7 +1519,7 @@ CreateConflictDetectionSlot(void) errmsg("creating replication conflict detection slot")); ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false); + false, false, false); init_conflict_slot_xmin(); } diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8c061d5..dbf9380 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -653,91 +653,159 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) { bool synced; + bool allow_overwrite; SpinLockAcquire(&slot->mutex); synced = slot->data.synced; + allow_overwrite = slot->data.allow_overwrite; SpinLockRelease(&slot->mutex); /* User-created slot with the same name exists, raise ERROR. */ if (!synced) - ereport(ERROR, + { + /* Check if we need to overwrite an existing logical slot */ + if (allow_overwrite) + { + /* Recreate the failover slot and synchronize it */ + NameData plugin_name; + TransactionId xmin_horizon = InvalidTransactionId; + + ereport(LOG, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("slot \"%s\" already exists" + " on the standby but try to recreate it because " + "flag allow_overwrite is set to true", + remote_slot->name)); + + /* Get rid of a replication slot that is no longer wanted */ + ReplicationSlotDrop(remote_slot->name,true); + + /* Skip creating the local slot if remote_slot is invalidated already */ + if (remote_slot->invalidated != RS_INVAL_NONE) + return false; + + /* + * We create temporary slots instead of ephemeral slots here because + * we want the slots to survive after releasing them. This is done to + * avoid dropping and re-creating the slots in each synchronization + * cycle if the restart_lsn or catalog_xmin of the remote slot has not + * caught up. + */ + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, + remote_slot->two_phase, + remote_slot->failover, + true, + true); + + /* For shorter lines. */ + slot = MyReplicationSlot; + + /* Avoid expensive operations while holding a spinlock. */ + namestrcpy(&plugin_name, remote_slot->plugin); + + SpinLockAcquire(&slot->mutex); + slot->data.database = remote_dbid; + slot->data.plugin = plugin_name; + SpinLockRelease(&slot->mutex); + + reserve_wal_for_local_slot(remote_slot->restart_lsn); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + update_and_persist_local_synced_slot(remote_slot, remote_dbid); + + slot_updated = true; + } + else + ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("exiting from slot synchronization because same" " name slot \"%s\" already exists on the standby", remote_slot->name)); - - /* - * The slot has been synchronized before. - * - * It is important to acquire the slot here before checking - * invalidation. If we don't acquire the slot first, there could be a - * race condition that the local slot could be invalidated just after - * checking the 'invalidated' flag here and we could end up - * overwriting 'invalidated' flag to remote_slot's value. See - * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly - * if the slot is not acquired by other processes. - * - * XXX: If it ever turns out that slot acquire/release is costly for - * cases when none of the slot properties is changed then we can do a - * pre-check to ensure that at least one of the slot properties is - * changed before acquiring the slot. - */ - ReplicationSlotAcquire(remote_slot->name, true, false); - - Assert(slot == MyReplicationSlot); - - /* - * Copy the invalidation cause from remote only if local slot is not - * invalidated locally, we don't want to overwrite existing one. - */ - if (slot->data.invalidated == RS_INVAL_NONE && - remote_slot->invalidated != RS_INVAL_NONE) + } + else { - SpinLockAcquire(&slot->mutex); - slot->data.invalidated = remote_slot->invalidated; - SpinLockRelease(&slot->mutex); - /* Make sure the invalidated state persists across server restart */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); + /* + * The slot has been synchronized before. + * + * It is important to acquire the slot here before checking + * invalidation. If we don't acquire the slot first, there could be a + * race condition that the local slot could be invalidated just after + * checking the 'invalidated' flag here and we could end up + * overwriting 'invalidated' flag to remote_slot's value. See + * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly + * if the slot is not acquired by other processes. + * + * XXX: If it ever turns out that slot acquire/release is costly for + * cases when none of the slot properties is changed then we can do a + * pre-check to ensure that at least one of the slot properties is + * changed before acquiring the slot. + */ + ReplicationSlotAcquire(remote_slot->name, true, false); + + Assert(slot == MyReplicationSlot); - slot_updated = true; - } + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (slot->data.invalidated == RS_INVAL_NONE && + remote_slot->invalidated != RS_INVAL_NONE) + { + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&slot->mutex); - /* Skip the sync of an invalidated slot */ - if (slot->data.invalidated != RS_INVAL_NONE) - { - ReplicationSlotRelease(); - return slot_updated; - } + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); - /* Slot not ready yet, let's attempt to make it sync-ready now. */ - if (slot->data.persistency == RS_TEMPORARY) - { - slot_updated = update_and_persist_local_synced_slot(remote_slot, - remote_dbid); - } + slot_updated = true; + } - /* Slot ready for sync, so sync it. */ - else - { - /* - * Sanity check: As long as the invalidations are handled - * appropriately as above, this should never happen. - * - * We don't need to check restart_lsn here. See the comments in - * update_local_synced_slot() for details. - */ - if (remote_slot->confirmed_lsn < slot->data.confirmed_flush) - ereport(ERROR, - errmsg_internal("cannot synchronize local slot \"%s\"", - remote_slot->name), - errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).", - LSN_FORMAT_ARGS(slot->data.confirmed_flush), - LSN_FORMAT_ARGS(remote_slot->confirmed_lsn))); + /* Skip the sync of an invalidated slot */ + if (slot->data.invalidated != RS_INVAL_NONE) + { + ReplicationSlotRelease(); + return slot_updated; + } - slot_updated = update_local_synced_slot(remote_slot, remote_dbid, - NULL, NULL); + /* Slot not ready yet, let's attempt to make it sync-ready now. */ + if (slot->data.persistency == RS_TEMPORARY) + { + slot_updated = update_and_persist_local_synced_slot(remote_slot, + remote_dbid); + } + + /* Slot ready for sync, so sync it. */ + else + { + /* + * Sanity check: As long as the invalidations are handled + * appropriately as above, this should never happen. + * + * We don't need to check restart_lsn here. See the comments in + * update_local_synced_slot() for details. + */ + if (remote_slot->confirmed_lsn < slot->data.confirmed_flush) + ereport(ERROR, + errmsg_internal("cannot synchronize local slot \"%s\"", + remote_slot->name), + errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).", + LSN_FORMAT_ARGS(slot->data.confirmed_flush), + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn))); + + slot_updated = update_local_synced_slot(remote_slot, remote_dbid, + NULL, NULL); + } } } /* Otherwise create the slot first. */ @@ -760,6 +828,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, remote_slot->failover, + true, true); /* For shorter lines. */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fd0fdb9..fcba4d4 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -351,7 +351,7 @@ IsSlotForConflictCheck(const char *name) void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, bool synced) + bool two_phase, bool failover, bool allow_overwrite, bool synced) { ReplicationSlot *slot = NULL; int i; @@ -445,6 +445,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.allow_overwrite = allow_overwrite; slot->data.synced = synced; /* and then data only present in shared memory */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index b8f2115..1a50244 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -41,7 +41,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false, false); + false, false, false); if (immediately_reserve) { @@ -116,7 +116,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, - bool failover, + bool failover, bool allow_overwrite, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -134,7 +134,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover, false); + failover, allow_overwrite, false); /* * Create logical decoding context to find start point or, if we don't @@ -173,6 +173,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); bool failover = PG_GETARG_BOOL(4); + bool allow_overwrite = PG_GETARG_BOOL(5); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -191,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) temporary, two_phase, failover, + allow_overwrite, InvalidXLogRecPtr, true); @@ -726,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, false, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f2..40586b2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1202,7 +1202,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false, false); + false, false, false, false); if (reserve_wal) { @@ -1233,7 +1233,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover, false); + two_phase, failover, false, false); /* * Do options check early so that we can bail before calling the diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 01eba3b..fbaacfb 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11510,10 +11510,10 @@ { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool bool', - proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', + proargtypes => 'name name bool bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,allow_overwrite,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index fe62162..ca50b79 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -141,6 +141,12 @@ typedef struct ReplicationSlotPersistentData * for logical slots on the primary server. */ bool failover; + + /* + * Allow Postgres to drop logical replication slot on standby server to ensure + * creation of new failover slot when remote sync_replication_slots is set to true. + */ + bool allow_overwrite; } ReplicationSlotPersistentData; /* @@ -301,7 +307,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, + bool two_phase, bool failover, bool allow_overwrite, bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -- 2.47.3
