Alvaro Herrera wrote:
> I'm back at looking into this again, after a rather exhausting week.  I
> think I have a better grasp of what is going on in this code now,

Here's an updated patch, which I intend to push tomorrow.

> and it
> appears to me that we should change the locking so that active_pid is
> protected by ReplicationSlotControlLock instead of the slot's spinlock;
> but I haven't analyzed the idea fully yet and I don't have the patch
> done yet either.

This doesn't seem to be a good idea actually.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 33f08678bf20eed3a4cb3d10960bb06543a1b3db Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 12 Jul 2017 18:38:33 -0400
Subject: [PATCH v4] Wait for slot to become free in when dropping it

---
 src/backend/replication/logical/logicalfuncs.c |   2 +-
 src/backend/replication/slot.c                 | 115 ++++++++++++++++++-------
 src/backend/replication/slotfuncs.c            |  32 ++++---
 src/backend/replication/walsender.c            |   6 +-
 src/include/replication/slot.h                 |  10 ++-
 5 files changed, 110 insertions(+), 55 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c 
b/src/backend/replication/logical/logicalfuncs.c
index 363ca82cb0..a3ba2b1266 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
        else
                end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-       ReplicationSlotAcquire(NameStr(*name));
+       ReplicationSlotAcquire(NameStr(*name), true);
 
        PG_TRY();
        {
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dc7de20e11..ea9cd1f22b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void)
                        /* everything else is zeroed by the memset above */
                        SpinLockInit(&slot->mutex);
                        LWLockInitialize(&slot->io_in_progress_lock, 
LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
+                       ConditionVariableInit(&slot->active_cv);
                }
        }
 }
@@ -313,25 +314,35 @@ ReplicationSlotCreate(const char *name, bool db_specific,
        LWLockRelease(ReplicationSlotControlLock);
 
        /*
-        * Now that the slot has been marked as in_use and in_active, it's safe 
to
+        * Now that the slot has been marked as in_use and active, it's safe to
         * let somebody else try to allocate a slot.
         */
        LWLockRelease(ReplicationSlotAllocationLock);
+
+       /* Let everybody know we've modified this slot */
+       ConditionVariableBroadcast(&slot->active_cv);
 }
 
 /*
  * Find a previously created slot and mark it as used by this backend.
  */
 void
-ReplicationSlotAcquire(const char *name)
+ReplicationSlotAcquire(const char *name, bool nowait)
 {
-       ReplicationSlot *slot = NULL;
+       ReplicationSlot *slot;
+       int                     active_pid;
        int                     i;
-       int                     active_pid = 0; /* Keep compiler quiet */
 
+retry:
        Assert(MyReplicationSlot == NULL);
 
-       /* Search for the named slot and mark it active if we find it. */
+       /*
+        * Search for the named slot and mark it active if we find it.  If the
+        * slot is already active, we exit the loop with active_pid set to the 
PID
+        * of the backend that owns it.
+        */
+       active_pid = 0;
+       slot = NULL;
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
        for (i = 0; i < max_replication_slots; i++)
        {
@@ -339,35 +350,59 @@ ReplicationSlotAcquire(const char *name)
 
                if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
                {
+                       /* Found the slot we want -- can we activate it? */
                        SpinLockAcquire(&s->mutex);
+
                        active_pid = s->active_pid;
                        if (active_pid == 0)
                                active_pid = s->active_pid = MyProcPid;
+
                        SpinLockRelease(&s->mutex);
                        slot = s;
+
                        break;
                }
        }
        LWLockRelease(ReplicationSlotControlLock);
 
-       /* If we did not find the slot or it was already active, error out. */
+       /* If we did not find the slot, error out. */
        if (slot == NULL)
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("replication slot \"%s\" does not 
exist", name)));
+
+       /*
+        * If we found the slot but it's already active in another backend, we
+        * either error out or retry after a short wait, as caller specified.
+        */
        if (active_pid != MyProcPid)
-               ereport(ERROR,
-                               (errcode(ERRCODE_OBJECT_IN_USE),
-                                errmsg("replication slot \"%s\" is active for 
PID %d",
-                                               name, active_pid)));
+       {
+               if (nowait)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_OBJECT_IN_USE),
+                                        errmsg("replication slot \"%s\" is 
active for PID %d",
+                                                       name, active_pid)));
+
+               /* Wait here until we get signaled by whoever is active */
+               ConditionVariablePrepareToSleep(&slot->active_cv);
+               ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK);
+               ConditionVariableCancelSleep();
+
+               goto retry;
+       }
+
+       /* Let everybody know we've modified this slot */
+       ConditionVariableBroadcast(&slot->active_cv);
 
        /* We made this slot active, so it's ours now. */
        MyReplicationSlot = slot;
 }
 
 /*
- * Release a replication slot, this or another backend can ReAcquire it
- * later. Resources this slot requires will be preserved.
+ * Release the replication slot that this backend considers to own.
+ *
+ * This or another backend can re-acquire the slot later.
+ * Resources this slot requires will be preserved.
  */
 void
 ReplicationSlotRelease(void)
@@ -385,17 +420,6 @@ ReplicationSlotRelease(void)
                 */
                ReplicationSlotDropAcquired();
        }
-       else if (slot->data.persistency == RS_PERSISTENT)
-       {
-               /*
-                * Mark persistent slot inactive.  We're not freeing it, just
-                * disconnecting.
-                */
-               SpinLockAcquire(&slot->mutex);
-               slot->active_pid = 0;
-               SpinLockRelease(&slot->mutex);
-       }
-
 
        /*
         * If slot needed to temporarily restrain both data and catalog xmin to
@@ -412,6 +436,18 @@ ReplicationSlotRelease(void)
                ReplicationSlotsComputeRequiredXmin(false);
        }
 
+       if (slot->data.persistency == RS_PERSISTENT)
+       {
+               /*
+                * Mark persistent slot inactive.  We're not freeing it, just
+                * disconnecting, but wake up others that may be waiting for it.
+                */
+               SpinLockAcquire(&slot->mutex);
+               slot->active_pid = 0;
+               SpinLockRelease(&slot->mutex);
+               ConditionVariableBroadcast(&slot->active_cv);
+       }
+
        MyReplicationSlot = NULL;
 
        /* might not have been set when we've been a plain slot */
@@ -430,32 +466,43 @@ ReplicationSlotCleanup(void)
 
        Assert(MyReplicationSlot == NULL);
 
-       /*
-        * No need for locking as we are only interested in slots active in
-        * current process and those are not touched by other processes.
-        */
+restart:
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
        for (i = 0; i < max_replication_slots; i++)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
+               if (!s->in_use)
+                       continue;
+
+               SpinLockAcquire(&s->mutex);
                if (s->active_pid == MyProcPid)
                {
-                       Assert(s->in_use && s->data.persistency == 
RS_TEMPORARY);
+                       Assert(s->data.persistency == RS_TEMPORARY);
+                       SpinLockRelease(&s->mutex);
+                       LWLockRelease(ReplicationSlotControlLock);      /* 
avoid deadlock */
 
                        ReplicationSlotDropPtr(s);
+
+                       ConditionVariableBroadcast(&s->active_cv);
+                       goto restart;
                }
+               else
+                       SpinLockRelease(&s->mutex);
        }
+
+       LWLockRelease(ReplicationSlotControlLock);
 }
 
 /*
  * Permanently drop replication slot identified by the passed in name.
  */
 void
-ReplicationSlotDrop(const char *name)
+ReplicationSlotDrop(const char *name, bool nowait)
 {
        Assert(MyReplicationSlot == NULL);
 
-       ReplicationSlotAcquire(name);
+       ReplicationSlotAcquire(name, nowait);
 
        ReplicationSlotDropAcquired();
 }
@@ -527,6 +574,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
                slot->active_pid = 0;
                SpinLockRelease(&slot->mutex);
 
+               /* wake up anyone waiting on this slot */
+               ConditionVariableBroadcast(&slot->active_cv);
+
                ereport(fail_softly ? WARNING : ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not rename file \"%s\" to 
\"%s\": %m",
@@ -535,15 +585,18 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 
        /*
         * The slot is definitely gone.  Lock out concurrent scans of the array
-        * long enough to kill it.  It's OK to clear the active flag here 
without
+        * long enough to kill it.  It's OK to clear the active PID here without
         * grabbing the mutex because nobody else can be scanning the array 
here,
         * and nobody can be attached to this slot and thus access it without
         * scanning the array.
+        *
+        * Also wake up processes waiting for it.
         */
        LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
        slot->active_pid = 0;
        slot->in_use = false;
        LWLockRelease(ReplicationSlotControlLock);
+       ConditionVariableBroadcast(&slot->active_cv);
 
        /*
         * Slot is dead and doesn't prevent resource removal anymore, recompute
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 6dc808874d..d4cbd83bde 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 
        CheckSlotRequirements();
 
-       ReplicationSlotDrop(NameStr(*name));
+       ReplicationSlotDrop(NameStr(*name), false);
 
        PG_RETURN_VOID();
 }
@@ -221,6 +221,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
        MemoryContextSwitchTo(oldcontext);
 
+       LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
        for (slotno = 0; slotno < max_replication_slots; slotno++)
        {
                ReplicationSlot *slot = 
&ReplicationSlotCtl->replication_slots[slotno];
@@ -238,25 +239,21 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                NameData        plugin;
                int                     i;
 
-               SpinLockAcquire(&slot->mutex);
                if (!slot->in_use)
-               {
-                       SpinLockRelease(&slot->mutex);
                        continue;
-               }
-               else
-               {
-                       xmin = slot->data.xmin;
-                       catalog_xmin = slot->data.catalog_xmin;
-                       database = slot->data.database;
-                       restart_lsn = slot->data.restart_lsn;
-                       confirmed_flush_lsn = slot->data.confirmed_flush;
-                       namecpy(&slot_name, &slot->data.name);
-                       namecpy(&plugin, &slot->data.plugin);
 
-                       active_pid = slot->active_pid;
-                       persistency = slot->data.persistency;
-               }
+               SpinLockAcquire(&slot->mutex);
+
+               xmin = slot->data.xmin;
+               catalog_xmin = slot->data.catalog_xmin;
+               database = slot->data.database;
+               restart_lsn = slot->data.restart_lsn;
+               confirmed_flush_lsn = slot->data.confirmed_flush;
+               namecpy(&slot_name, &slot->data.name);
+               namecpy(&plugin, &slot->data.plugin);
+               active_pid = slot->active_pid;
+               persistency = slot->data.persistency;
+
                SpinLockRelease(&slot->mutex);
 
                memset(nulls, 0, sizeof(nulls));
@@ -309,6 +306,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
        }
+       LWLockRelease(ReplicationSlotControlLock);
 
        tuplestore_donestoring(tupstore);
 
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 002143b26a..9a2babef1e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd)
 
        if (cmd->slotname)
        {
-               ReplicationSlotAcquire(cmd->slotname);
+               ReplicationSlotAcquire(cmd->slotname, true);
                if (SlotIsLogical(MyReplicationSlot))
                        ereport(ERROR,
                                        
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 static void
 DropReplicationSlot(DropReplicationSlotCmd *cmd)
 {
-       ReplicationSlotDrop(cmd->slotname);
+       ReplicationSlotDrop(cmd->slotname, false);
        EndCommand("DROP_REPLICATION_SLOT", DestRemote);
 }
 
@@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        Assert(!MyReplicationSlot);
 
-       ReplicationSlotAcquire(cmd->slotname);
+       ReplicationSlotAcquire(cmd->slotname, true);
 
        /*
         * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a283f4e2b8..0bf2611fe9 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -12,6 +12,7 @@
 #include "fmgr.h"
 #include "access/xlog.h"
 #include "access/xlogreader.h"
+#include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
@@ -19,7 +20,7 @@
 /*
  * Behaviour of replication slots, upon release or crash.
  *
- * Slots marked as PERSISTENT are crashsafe and will not be dropped when
+ * Slots marked as PERSISTENT are crash-safe and will not be dropped when
  * released. Slots marked as EPHEMERAL will be dropped when released or after
  * restarts.
  *
@@ -117,6 +118,9 @@ typedef struct ReplicationSlot
        /* is somebody performing io on this slot? */
        LWLock          io_in_progress_lock;
 
+       /* Condition variable signalled when active_pid changes */
+       ConditionVariable active_cv;
+
        /* all the remaining data is only used for logical slots */
 
        /*
@@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void);
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
                                          ReplicationSlotPersistency p);
 extern void ReplicationSlotPersist(void);
-extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
-- 
2.11.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to