Alvaro Herrera wrote: > Alvaro Herrera wrote: > > I just noticed that Jacana failed again in the subscription test, and it > > looks like it's related to this. I'll take a look tomorrow if no one > > beats me to it. > > https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=jacana&dt=2017-07-26%2014%3A39%3A54 > > Ahh, I misread the message. This one is actually about the replication > *origin* being still active, not the replication *slot*. I suppose > origins need the same treatment as we just did for slots. Anybody wants > to volunteer a patch?
So here's a patch. I was able to reproduce the issue locally by adding a couple of sleeps, just like Tom did in the case of replication slots. With this patch it seems the problem is solved. The mechanism is the same as Petr used to fix replication origins -- if an origin is busy, sleep on the CV until someone else signals us; and each time the acquirer PID changes, broadcast a signal. Like before, this is likely to be a problem in older branches too, but we have no CVs to backpatch this. BTW, I noticed that the PG_WAIT_LOCK value that we're using for wait event here (and in the replication slot case) is bogus. We probably need something new here. I found four instances of this problem in buildfarm, https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=jacana&dt=2017-07-26%2014%3A39%3A54 https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=jacana&dt=2017-07-28%2021%3A00%3A15 https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=jacana&dt=2017-07-31%2007%3A03%3A20 https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=nightjar&dt=2017-08-04%2004%3A34%3A04 Jacana only stopped having it because it broke for unrelated reasons. I bet we'll see another failure soon ... -- Álvaro Herrera https://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 3593712791..ae40f7164d 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -939,7 +939,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); if (originid != InvalidRepOriginId) - replorigin_drop(originid); + replorigin_drop(originid, false); /* * If there is no slot associated with the subscription, we can finish diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 1c665312a4..4f32e7861c 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -79,15 +79,15 @@ #include "access/xact.h" #include "catalog/indexing.h" - #include "nodes/execnodes.h" #include "replication/origin.h" #include "replication/logical.h" - +#include "pgstat.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/condition_variable.h" #include "storage/copydir.h" #include "utils/builtins.h" @@ -125,6 +125,11 @@ typedef struct ReplicationState int acquired_by; /* + * Condition variable that's signalled when acquired_by changes. + */ + ConditionVariable origin_cv; + + /* * Lock protecting remote_lsn and local_lsn. */ LWLock lock; @@ -324,9 +329,9 @@ replorigin_create(char *roname) * Needs to be called in a transaction. */ void -replorigin_drop(RepOriginId roident) +replorigin_drop(RepOriginId roident, bool nowait) { - HeapTuple tuple = NULL; + HeapTuple tuple; Relation rel; int i; @@ -334,6 +339,8 @@ replorigin_drop(RepOriginId roident) rel = heap_open(ReplicationOriginRelationId, ExclusiveLock); +restart: + tuple = NULL; /* cleanup the slot state info */ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); @@ -346,11 +353,21 @@ replorigin_drop(RepOriginId roident) { if (state->acquired_by != 0) { - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("could not drop replication origin with OID %d, in use by PID %d", - state->roident, - state->acquired_by))); + ConditionVariable *cv; + + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("could not drop replication origin with OID %d, in use by PID %d", + state->roident, + state->acquired_by))); + cv = &state->origin_cv; + + LWLockRelease(ReplicationOriginLock); + ConditionVariablePrepareToSleep(cv); + ConditionVariableSleep(cv, PG_WAIT_LOCK); + ConditionVariableCancelSleep(); + goto restart; } /* first WAL log */ @@ -476,8 +493,11 @@ ReplicationOriginShmemInit(void) MemSet(replication_states, 0, ReplicationOriginShmemSize()); for (i = 0; i < max_replication_slots; i++) + { LWLockInitialize(&replication_states[i].lock, replication_states_ctl->tranche_id); + ConditionVariableInit(&replication_states[i].origin_cv); + } } LWLockRegisterTranche(replication_states_ctl->tranche_id, @@ -957,16 +977,23 @@ replorigin_get_progress(RepOriginId node, bool flush) static void ReplicationOriginExitCleanup(int code, Datum arg) { + ConditionVariable *cv = NULL; + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); if (session_replication_state != NULL && session_replication_state->acquired_by == MyProcPid) { + cv = &session_replication_state->origin_cv; + session_replication_state->acquired_by = 0; session_replication_state = NULL; } LWLockRelease(ReplicationOriginLock); + + if (cv) + ConditionVariableBroadcast(cv); } /* @@ -1056,6 +1083,9 @@ replorigin_session_setup(RepOriginId node) session_replication_state->acquired_by = MyProcPid; LWLockRelease(ReplicationOriginLock); + + /* probably this one is pointless */ + ConditionVariableBroadcast(&session_replication_state->origin_cv); } /* @@ -1067,6 +1097,8 @@ replorigin_session_setup(RepOriginId node) void replorigin_session_reset(void) { + ConditionVariable *cv; + Assert(max_replication_slots != 0); if (session_replication_state == NULL) @@ -1077,9 +1109,12 @@ replorigin_session_reset(void) LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); session_replication_state->acquired_by = 0; + cv = &session_replication_state->origin_cv; session_replication_state = NULL; LWLockRelease(ReplicationOriginLock); + + ConditionVariableBroadcast(cv); } /* @@ -1170,7 +1205,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS) roident = replorigin_by_name(name, false); Assert(OidIsValid(roident)); - replorigin_drop(roident); + replorigin_drop(roident, false); pfree(name); diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index ca56c01469..a9595c3c3d 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -41,7 +41,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; /* API for querying & manipulating replication origins */ extern RepOriginId replorigin_by_name(char *name, bool missing_ok); extern RepOriginId replorigin_create(char *name); -extern void replorigin_drop(RepOriginId roident); +extern void replorigin_drop(RepOriginId roident, bool nowait); extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers