Alvaro Herrera wrote:
> Alvaro Herrera wrote:

> > I just noticed that Jacana failed again in the subscription test, and it
> > looks like it's related to this.  I'll take a look tomorrow if no one
> > beats me to it.
> > https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=jacana&dt=2017-07-26%2014%3A39%3A54
> 
> Ahh, I misread the message.  This one is actually about the replication
> *origin* being still active, not the replication *slot*.  I suppose
> origins need the same treatment as we just did for slots.  Anybody wants
> to volunteer a patch?

So here's a patch.  I was able to reproduce the issue locally by adding
a couple of sleeps, just like Tom did in the case of replication slots.
With this patch it seems the problem is solved.  The mechanism is the
same as Petr used to fix replication origins -- if an origin is busy,
sleep on the CV until someone else signals us; and each time the
acquirer PID changes, broadcast a signal.  Like before, this is likely
to be a problem in older branches too, but we have no CVs to backpatch
this.

BTW, I noticed that the PG_WAIT_LOCK value that we're using for wait
event here (and in the replication slot case) is bogus.  We probably
need something new here.

I found four instances of this problem in buildfarm,
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=jacana&dt=2017-07-26%2014%3A39%3A54
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=jacana&dt=2017-07-28%2021%3A00%3A15
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=jacana&dt=2017-07-31%2007%3A03%3A20
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=nightjar&dt=2017-08-04%2004%3A34%3A04

Jacana only stopped having it because it broke for unrelated reasons.  I
bet we'll see another failure soon ...

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 3593712791..ae40f7164d 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -939,7 +939,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool 
isTopLevel)
        snprintf(originname, sizeof(originname), "pg_%u", subid);
        originid = replorigin_by_name(originname, true);
        if (originid != InvalidRepOriginId)
-               replorigin_drop(originid);
+               replorigin_drop(originid, false);
 
        /*
         * If there is no slot associated with the subscription, we can finish
diff --git a/src/backend/replication/logical/origin.c 
b/src/backend/replication/logical/origin.c
index 1c665312a4..4f32e7861c 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -79,15 +79,15 @@
 #include "access/xact.h"
 
 #include "catalog/indexing.h"
-
 #include "nodes/execnodes.h"
 
 #include "replication/origin.h"
 #include "replication/logical.h"
-
+#include "pgstat.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/copydir.h"
 
 #include "utils/builtins.h"
@@ -125,6 +125,11 @@ typedef struct ReplicationState
        int                     acquired_by;
 
        /*
+        * Condition variable that's signalled when acquired_by changes.
+        */
+       ConditionVariable origin_cv;
+
+       /*
         * Lock protecting remote_lsn and local_lsn.
         */
        LWLock          lock;
@@ -324,9 +329,9 @@ replorigin_create(char *roname)
  * Needs to be called in a transaction.
  */
 void
-replorigin_drop(RepOriginId roident)
+replorigin_drop(RepOriginId roident, bool nowait)
 {
-       HeapTuple       tuple = NULL;
+       HeapTuple       tuple;
        Relation        rel;
        int                     i;
 
@@ -334,6 +339,8 @@ replorigin_drop(RepOriginId roident)
 
        rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
 
+restart:
+       tuple = NULL;
        /* cleanup the slot state info */
        LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
@@ -346,11 +353,21 @@ replorigin_drop(RepOriginId roident)
                {
                        if (state->acquired_by != 0)
                        {
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_OBJECT_IN_USE),
-                                                errmsg("could not drop 
replication origin with OID %d, in use by PID %d",
-                                                               state->roident,
-                                                               
state->acquired_by)));
+                               ConditionVariable  *cv;
+
+                               if (nowait)
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_OBJECT_IN_USE),
+                                                        errmsg("could not drop 
replication origin with OID %d, in use by PID %d",
+                                                                       
state->roident,
+                                                                       
state->acquired_by)));
+                               cv = &state->origin_cv;
+
+                               LWLockRelease(ReplicationOriginLock);
+                               ConditionVariablePrepareToSleep(cv);
+                               ConditionVariableSleep(cv, PG_WAIT_LOCK);
+                               ConditionVariableCancelSleep();
+                               goto restart;
                        }
 
                        /* first WAL log */
@@ -476,8 +493,11 @@ ReplicationOriginShmemInit(void)
                MemSet(replication_states, 0, ReplicationOriginShmemSize());
 
                for (i = 0; i < max_replication_slots; i++)
+               {
                        LWLockInitialize(&replication_states[i].lock,
                                                         
replication_states_ctl->tranche_id);
+                       ConditionVariableInit(&replication_states[i].origin_cv);
+               }
        }
 
        LWLockRegisterTranche(replication_states_ctl->tranche_id,
@@ -957,16 +977,23 @@ replorigin_get_progress(RepOriginId node, bool flush)
 static void
 ReplicationOriginExitCleanup(int code, Datum arg)
 {
+       ConditionVariable   *cv = NULL;
+
        LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
        if (session_replication_state != NULL &&
                session_replication_state->acquired_by == MyProcPid)
        {
+               cv = &session_replication_state->origin_cv;
+
                session_replication_state->acquired_by = 0;
                session_replication_state = NULL;
        }
 
        LWLockRelease(ReplicationOriginLock);
+
+       if (cv)
+               ConditionVariableBroadcast(cv);
 }
 
 /*
@@ -1056,6 +1083,9 @@ replorigin_session_setup(RepOriginId node)
        session_replication_state->acquired_by = MyProcPid;
 
        LWLockRelease(ReplicationOriginLock);
+
+       /* probably this one is pointless */
+       ConditionVariableBroadcast(&session_replication_state->origin_cv);
 }
 
 /*
@@ -1067,6 +1097,8 @@ replorigin_session_setup(RepOriginId node)
 void
 replorigin_session_reset(void)
 {
+       ConditionVariable  *cv;
+
        Assert(max_replication_slots != 0);
 
        if (session_replication_state == NULL)
@@ -1077,9 +1109,12 @@ replorigin_session_reset(void)
        LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
        session_replication_state->acquired_by = 0;
+       cv = &session_replication_state->origin_cv;
        session_replication_state = NULL;
 
        LWLockRelease(ReplicationOriginLock);
+
+       ConditionVariableBroadcast(cv);
 }
 
 /*
@@ -1170,7 +1205,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS)
        roident = replorigin_by_name(name, false);
        Assert(OidIsValid(roident));
 
-       replorigin_drop(roident);
+       replorigin_drop(roident, false);
 
        pfree(name);
 
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index ca56c01469..a9595c3c3d 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -41,7 +41,7 @@ extern PGDLLIMPORT TimestampTz 
replorigin_session_origin_timestamp;
 /* API for querying & manipulating replication origins */
 extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
 extern RepOriginId replorigin_create(char *name);
-extern void replorigin_drop(RepOriginId roident);
+extern void replorigin_drop(RepOriginId roident, bool nowait);
 extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
                                  char **roname);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to