Petr Jelinek wrote:

> So best idea I could come up with is to make use of the new condition
> variable API. That lets us wait for variable which can be set per slot.

Here's a cleaned up version of that patch, which I intend to get in the
tree tomorrow.  I verified that this allows the subscription tests to
pass with Tom's sleep additions.

I noticed one point where we're reading the active_pid without locking;
marked it with an XXX comment.  Not yet sure whether this is a bug or
not.


I noticed something funny in CREATE_REPLICATION; apparently we first
create a slot and set it active, then we activate it by name.  With the
current coding it seems to work fine, because we're careful to make
activation idempotent, but it looks convoluted.  I don't think this is
new, though.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 55533aa3cdc2fecbf7b6b6c661649342a204e73b Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 12 Jul 2017 18:38:33 -0400
Subject: [PATCH v3 1/1] Wait for slot to become free in when dropping it

---
 src/backend/replication/logical/logicalfuncs.c |  2 +-
 src/backend/replication/slot.c                 | 79 +++++++++++++++++++-------
 src/backend/replication/slotfuncs.c            |  2 +-
 src/backend/replication/walsender.c            |  6 +-
 src/include/replication/slot.h                 |  8 ++-
 5 files changed, 69 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c 
b/src/backend/replication/logical/logicalfuncs.c
index 363ca82cb0..a3ba2b1266 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, 
bool confirm, bool bin
        else
                end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-       ReplicationSlotAcquire(NameStr(*name));
+       ReplicationSlotAcquire(NameStr(*name), true);
 
        PG_TRY();
        {
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dc7de20e11..76198a627d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void)
                        /* everything else is zeroed by the memset above */
                        SpinLockInit(&slot->mutex);
                        LWLockInitialize(&slot->io_in_progress_lock, 
LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
+                       ConditionVariableInit(&slot->active_cv);
                }
        }
 }
@@ -313,24 +314,27 @@ ReplicationSlotCreate(const char *name, bool db_specific,
        LWLockRelease(ReplicationSlotControlLock);
 
        /*
-        * Now that the slot has been marked as in_use and in_active, it's safe 
to
+        * Now that the slot has been marked as in_use and active, it's safe to
         * let somebody else try to allocate a slot.
         */
        LWLockRelease(ReplicationSlotAllocationLock);
+
+       ConditionVariableBroadcast(&slot->active_cv);
 }
 
 /*
  * Find a previously created slot and mark it as used by this backend.
  */
 void
-ReplicationSlotAcquire(const char *name)
+ReplicationSlotAcquire(const char *name, bool nowait)
 {
        ReplicationSlot *slot = NULL;
+       int                     active_pid = 0;
        int                     i;
-       int                     active_pid = 0; /* Keep compiler quiet */
 
        Assert(MyReplicationSlot == NULL);
 
+retry:
        /* Search for the named slot and mark it active if we find it. */
        LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
        for (i = 0; i < max_replication_slots; i++)
@@ -339,27 +343,52 @@ ReplicationSlotAcquire(const char *name)
 
                if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
                {
+                       /* Found the slot we want -- can we activate it? */
                        SpinLockAcquire(&s->mutex);
+
                        active_pid = s->active_pid;
                        if (active_pid == 0)
                                active_pid = s->active_pid = MyProcPid;
+
                        SpinLockRelease(&s->mutex);
                        slot = s;
+
                        break;
                }
        }
        LWLockRelease(ReplicationSlotControlLock);
 
-       /* If we did not find the slot or it was already active, error out. */
+       /* If we did not find the slot, error out. */
        if (slot == NULL)
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_OBJECT),
                                 errmsg("replication slot \"%s\" does not 
exist", name)));
+
+       /*
+        * If we found the slot but it's already active in another backend, we
+        * either error out or retry after a short wait, as caller specified.
+        */
        if (active_pid != MyProcPid)
-               ereport(ERROR,
-                               (errcode(ERRCODE_OBJECT_IN_USE),
-                                errmsg("replication slot \"%s\" is active for 
PID %d",
-                                               name, active_pid)));
+       {
+               if (nowait)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_OBJECT_IN_USE),
+                                        errmsg("replication slot \"%s\" is 
active for PID %d",
+                                                       name, active_pid)));
+
+               /* Wait here until we get signaled by whoever is active */
+               ConditionVariablePrepareToSleep(&slot->active_cv);
+               ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK);
+               ConditionVariableCancelSleep();
+
+               goto retry;
+       }
+
+       /*
+        * If another process lost the race to set the slot active, it's now
+        * sleeping; wake it up so that it can continue and fail properly.
+        */
+       ConditionVariableBroadcast(&slot->active_cv);
 
        /* We made this slot active, so it's ours now. */
        MyReplicationSlot = slot;
@@ -385,17 +414,6 @@ ReplicationSlotRelease(void)
                 */
                ReplicationSlotDropAcquired();
        }
-       else if (slot->data.persistency == RS_PERSISTENT)
-       {
-               /*
-                * Mark persistent slot inactive.  We're not freeing it, just
-                * disconnecting.
-                */
-               SpinLockAcquire(&slot->mutex);
-               slot->active_pid = 0;
-               SpinLockRelease(&slot->mutex);
-       }
-
 
        /*
         * If slot needed to temporarily restrain both data and catalog xmin to
@@ -412,6 +430,18 @@ ReplicationSlotRelease(void)
                ReplicationSlotsComputeRequiredXmin(false);
        }
 
+       if (slot->data.persistency == RS_PERSISTENT)
+       {
+               /*
+                * Mark persistent slot inactive.  We're not freeing it, just
+                * disconnecting, but wake up others that may be waiting for it.
+                */
+               SpinLockAcquire(&slot->mutex);
+               slot->active_pid = 0;
+               SpinLockRelease(&slot->mutex);
+               ConditionVariableBroadcast(&slot->active_cv);
+       }
+
        MyReplicationSlot = NULL;
 
        /* might not have been set when we've been a plain slot */
@@ -438,6 +468,7 @@ ReplicationSlotCleanup(void)
        {
                ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
+               /* XXX why is it okay to read unlocked here? */
                if (s->active_pid == MyProcPid)
                {
                        Assert(s->in_use && s->data.persistency == 
RS_TEMPORARY);
@@ -451,11 +482,11 @@ ReplicationSlotCleanup(void)
  * Permanently drop replication slot identified by the passed in name.
  */
 void
-ReplicationSlotDrop(const char *name)
+ReplicationSlotDrop(const char *name, bool nowait)
 {
        Assert(MyReplicationSlot == NULL);
 
-       ReplicationSlotAcquire(name);
+       ReplicationSlotAcquire(name, nowait);
 
        ReplicationSlotDropAcquired();
 }
@@ -527,6 +558,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
                slot->active_pid = 0;
                SpinLockRelease(&slot->mutex);
 
+               /* wake up anyone waiting on this slot */
+               ConditionVariableBroadcast(&slot->active_cv);
+
                ereport(fail_softly ? WARNING : ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not rename file \"%s\" to 
\"%s\": %m",
@@ -539,11 +573,14 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
         * grabbing the mutex because nobody else can be scanning the array 
here,
         * and nobody can be attached to this slot and thus access it without
         * scanning the array.
+        *
+        * Also wake up processes waiting for it.
         */
        LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
        slot->active_pid = 0;
        slot->in_use = false;
        LWLockRelease(ReplicationSlotControlLock);
+       ConditionVariableBroadcast(&slot->active_cv);
 
        /*
         * Slot is dead and doesn't prevent resource removal anymore, recompute
diff --git a/src/backend/replication/slotfuncs.c 
b/src/backend/replication/slotfuncs.c
index 6dc808874d..a5ecc85ba5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 
        CheckSlotRequirements();
 
-       ReplicationSlotDrop(NameStr(*name));
+       ReplicationSlotDrop(NameStr(*name), false);
 
        PG_RETURN_VOID();
 }
diff --git a/src/backend/replication/walsender.c 
b/src/backend/replication/walsender.c
index 002143b26a..9a2babef1e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd)
 
        if (cmd->slotname)
        {
-               ReplicationSlotAcquire(cmd->slotname);
+               ReplicationSlotAcquire(cmd->slotname, true);
                if (SlotIsLogical(MyReplicationSlot))
                        ereport(ERROR,
                                        
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 static void
 DropReplicationSlot(DropReplicationSlotCmd *cmd)
 {
-       ReplicationSlotDrop(cmd->slotname);
+       ReplicationSlotDrop(cmd->slotname, false);
        EndCommand("DROP_REPLICATION_SLOT", DestRemote);
 }
 
@@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        Assert(!MyReplicationSlot);
 
-       ReplicationSlotAcquire(cmd->slotname);
+       ReplicationSlotAcquire(cmd->slotname, true);
 
        /*
         * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a283f4e2b8..f52e988a6d 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -12,6 +12,7 @@
 #include "fmgr.h"
 #include "access/xlog.h"
 #include "access/xlogreader.h"
+#include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
@@ -117,6 +118,9 @@ typedef struct ReplicationSlot
        /* is somebody performing io on this slot? */
        LWLock          io_in_progress_lock;
 
+       /* Condition variable signalled when active_pid changes */
+       ConditionVariable active_cv;
+
        /* all the remaining data is only used for logical slots */
 
        /*
@@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void);
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
                                          ReplicationSlotPersistency p);
 extern void ReplicationSlotPersist(void);
-extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
-- 
2.11.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to