On Tue, Sep 30, 2025 at 12:17 PM Fabrice Chapuis
<[email protected]> wrote:
>
> Hi,
>
> Here the generated v2 of the Patch.
>

Thanks. I have refactored the code for synchronize_one_slot() as there
was some code-repetition. Please take it if you find it okay.

Also I felt that when we create a slot through slot-synchronization,
we should create it with allow_overwrite as false. And thus in the
attached patch, I have changed that part as well.

It is a top up patch. Attached it as txt, please rename before
applying atop your changes.  Attached the steps for your reference.

thanks
Shveta
From e99ef98faad4a2ff32daa4db36daceddf8225cf7 Mon Sep 17 00:00:00 2001
From: Shveta Malik <[email protected]>
Date: Tue, 30 Sep 2025 14:54:41 +0530
Subject: [PATCH] refactored synchronize_one_slot

---
 src/backend/replication/logical/slotsync.c | 209 ++++++++-------------
 1 file changed, 82 insertions(+), 127 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c 
b/src/backend/replication/logical/slotsync.c
index dbf9380cb39..a8d44fe6b45 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -649,6 +649,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid 
remote_dbid)
                return false;
        }
 
+retry:
        /* Search for the named slot */
        if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
        {
@@ -666,146 +667,100 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid 
remote_dbid)
                        /* Check if we need to overwrite an existing logical 
slot */
                        if (allow_overwrite)
                        {
-                               /* Recreate the failover slot and synchronize 
it */
-                               NameData        plugin_name;
-                               TransactionId xmin_horizon = 
InvalidTransactionId;
-
                                ereport(LOG,
-                                       
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                       errmsg("slot \"%s\" already exists"
-                                       " on the standby but try to recreate it 
because "
-                                       "flag allow_overwrite is set to true",
-                                               remote_slot->name));
+                                               
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                               errmsg("slot \"%s\" already 
exists"
+                                                          " on the standby but 
will be overwritten as"
+                                                          " allow_overwrite is 
set to true",
+                                                          remote_slot->name));
 
                                /* Get rid of a replication slot that is no 
longer wanted */
-                               ReplicationSlotDrop(remote_slot->name,true);
-
-                               /* Skip creating the local slot if remote_slot 
is invalidated already */
-                               if (remote_slot->invalidated != RS_INVAL_NONE)
-                                       return false;
-
-                               /*
-                               * We create temporary slots instead of 
ephemeral slots here because
-                               * we want the slots to survive after releasing 
them. This is done to
-                               * avoid dropping and re-creating the slots in 
each synchronization
-                               * cycle if the restart_lsn or catalog_xmin of 
the remote slot has not
-                               * caught up.
-                               */
-                               ReplicationSlotCreate(remote_slot->name, true, 
RS_TEMPORARY,
-                                                                       
remote_slot->two_phase,
-                                                                       
remote_slot->failover,
-                                                                       true,
-                                                                       true);
-
-                               /* For shorter lines. */
-                               slot = MyReplicationSlot;
-
-                               /* Avoid expensive operations while holding a 
spinlock. */
-                               namestrcpy(&plugin_name, remote_slot->plugin);
-
-                               SpinLockAcquire(&slot->mutex);
-                               slot->data.database = remote_dbid;
-                               slot->data.plugin = plugin_name;
-                               SpinLockRelease(&slot->mutex);
-
-                               
reserve_wal_for_local_slot(remote_slot->restart_lsn);
-
-                               LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-                               xmin_horizon = 
GetOldestSafeDecodingTransactionId(true);
-                               SpinLockAcquire(&slot->mutex);
-                               slot->effective_catalog_xmin = xmin_horizon;
-                               slot->data.catalog_xmin = xmin_horizon;
-                               SpinLockRelease(&slot->mutex);
-                               ReplicationSlotsComputeRequiredXmin(true);
-                               LWLockRelease(ProcArrayLock);
-
-                               
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
-
-                               slot_updated = true;
+                               ReplicationSlotAcquire(remote_slot->name, true, 
false);
+                               ReplicationSlotDropAcquired();
+                               goto retry;
                        }
                        else
+                       {
                                ereport(ERROR,
-                                       
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                       errmsg("exiting from slot 
synchronization because same"
-                                                  " name slot \"%s\" already 
exists on the standby",
-                                                  remote_slot->name));
+                                               
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                               errmsg("exiting from slot 
synchronization because same"
+                                                          " name slot \"%s\" 
already exists on the standby",
+                                                          remote_slot->name));
+                       }
                }
-               else
-               {
 
-                       /*
-                       * The slot has been synchronized before.
-                       *
-                       * It is important to acquire the slot here before 
checking
-                       * invalidation. If we don't acquire the slot first, 
there could be a
-                       * race condition that the local slot could be 
invalidated just after
-                       * checking the 'invalidated' flag here and we could end 
up
-                       * overwriting 'invalidated' flag to remote_slot's 
value. See
-                       * InvalidatePossiblyObsoleteSlot() where it invalidates 
slot directly
-                       * if the slot is not acquired by other processes.
-                       *
-                       * XXX: If it ever turns out that slot acquire/release 
is costly for
-                       * cases when none of the slot properties is changed 
then we can do a
-                       * pre-check to ensure that at least one of the slot 
properties is
-                       * changed before acquiring the slot.
-                       */
-                       ReplicationSlotAcquire(remote_slot->name, true, false);
-
-                       Assert(slot == MyReplicationSlot);
+               /*
+                * The slot has been synchronized before.
+                *
+                * It is important to acquire the slot here before checking
+                * invalidation. If we don't acquire the slot first, there 
could be a
+                * race condition that the local slot could be invalidated just 
after
+                * checking the 'invalidated' flag here and we could end up
+                * overwriting 'invalidated' flag to remote_slot's value. See
+                * InvalidatePossiblyObsoleteSlot() where it invalidates slot 
directly
+                * if the slot is not acquired by other processes.
+                *
+                * XXX: If it ever turns out that slot acquire/release is 
costly for
+                * cases when none of the slot properties is changed then we 
can do a
+                * pre-check to ensure that at least one of the slot properties 
is
+                * changed before acquiring the slot.
+                */
+               ReplicationSlotAcquire(remote_slot->name, true, false);
 
-                       /*
-                       * Copy the invalidation cause from remote only if local 
slot is not
-                       * invalidated locally, we don't want to overwrite 
existing one.
-                       */
-                       if (slot->data.invalidated == RS_INVAL_NONE &&
-                               remote_slot->invalidated != RS_INVAL_NONE)
-                       {
-                               SpinLockAcquire(&slot->mutex);
-                               slot->data.invalidated = 
remote_slot->invalidated;
-                               SpinLockRelease(&slot->mutex);
+               Assert(slot == MyReplicationSlot);
 
-                               /* Make sure the invalidated state persists 
across server restart */
-                               ReplicationSlotMarkDirty();
-                               ReplicationSlotSave();
+               /*
+                * Copy the invalidation cause from remote only if local slot 
is not
+                * invalidated locally, we don't want to overwrite existing one.
+                */
+               if (slot->data.invalidated == RS_INVAL_NONE &&
+                       remote_slot->invalidated != RS_INVAL_NONE)
+               {
+                       SpinLockAcquire(&slot->mutex);
+                       slot->data.invalidated = remote_slot->invalidated;
+                       SpinLockRelease(&slot->mutex);
 
-                               slot_updated = true;
-                       }
+                       /* Make sure the invalidated state persists across 
server restart */
+                       ReplicationSlotMarkDirty();
+                       ReplicationSlotSave();
 
-                       /* Skip the sync of an invalidated slot */
-                       if (slot->data.invalidated != RS_INVAL_NONE)
-                       {
-                               ReplicationSlotRelease();
-                               return slot_updated;
-                       }
+                       slot_updated = true;
+               }
 
-                       /* Slot not ready yet, let's attempt to make it 
sync-ready now. */
-                       if (slot->data.persistency == RS_TEMPORARY)
-                       {
-                               slot_updated = 
update_and_persist_local_synced_slot(remote_slot,
-                                                                               
                                                        remote_dbid);
-                       }
+               /* Skip the sync of an invalidated slot */
+               if (slot->data.invalidated != RS_INVAL_NONE)
+               {
+                       ReplicationSlotRelease();
+                       return slot_updated;
+               }
 
-                       /* Slot ready for sync, so sync it. */
-                       else
-                       {
-                               /*
-                               * Sanity check: As long as the invalidations 
are handled
-                               * appropriately as above, this should never 
happen.
-                               *
-                               * We don't need to check restart_lsn here. See 
the comments in
-                               * update_local_synced_slot() for details.
-                               */
-                               if (remote_slot->confirmed_lsn < 
slot->data.confirmed_flush)
-                                       ereport(ERROR,
-                                                       errmsg_internal("cannot 
synchronize local slot \"%s\"",
-                                                                               
        remote_slot->name),
-                                                       
errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead 
of remote slot's LSN(%X/%08X).",
-                                                                               
        LSN_FORMAT_ARGS(slot->data.confirmed_flush),
-                                                                               
        LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
-
-                               slot_updated = 
update_local_synced_slot(remote_slot, remote_dbid,
-                                                                               
                                NULL, NULL);
-                       }
+               /* Slot not ready yet, let's attempt to make it sync-ready now. 
*/
+               if (slot->data.persistency == RS_TEMPORARY)
+               {
+                       slot_updated = 
update_and_persist_local_synced_slot(remote_slot,
+                                                                               
                                                remote_dbid);
+               }
+
+               /* Slot ready for sync, so sync it. */
+               else
+               {
+                       /*
+                        * Sanity check: As long as the invalidations are 
handled
+                        * appropriately as above, this should never happen.
+                        *
+                        * We don't need to check restart_lsn here. See the 
comments in
+                        * update_local_synced_slot() for details.
+                        */
+                       if (remote_slot->confirmed_lsn < 
slot->data.confirmed_flush)
+                               ereport(ERROR,
+                                               errmsg_internal("cannot 
synchronize local slot \"%s\"",
+                                                                               
remote_slot->name),
+                                               errdetail_internal("Local 
slot's start streaming location LSN(%X/%08X) is ahead of remote slot's 
LSN(%X/%08X).",
+                                                                               
   LSN_FORMAT_ARGS(slot->data.confirmed_flush),
+                                                                               
   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
+
+                       slot_updated = update_local_synced_slot(remote_slot, 
remote_dbid,
+                                                                               
                        NULL, NULL);
                }
        }
        /* Otherwise create the slot first. */
@@ -828,7 +783,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid 
remote_dbid)
                ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
                                                          
remote_slot->two_phase,
                                                          remote_slot->failover,
-                                                         true,
+                                                         false,
                                                          true);
 
                /* For shorter lines. */
-- 
2.34.1

--Apply the changes 
patch -p1 < 0001-refactored-synchronize_one_slot.patch 

--Review the changes first. If okay, merge it to your patch (i.e. to previous 
commit-id):
git add src/backend/replication/logical/slotsync.c
git commit --amend

Reply via email to