On 06/07/17 18:20, Petr Jelinek wrote: > On 06/07/17 17:33, Petr Jelinek wrote: >> On 03/07/17 01:54, Tom Lane wrote: >>> I noticed a recent failure that looked suspiciously like a race condition: >>> >>> https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=hornet&dt=2017-07-02%2018%3A02%3A07 >>> >>> The critical bit in the log file is >>> >>> error running SQL: 'psql:<stdin>:1: ERROR: could not drop the replication >>> slot "tap_sub" on publisher >>> DETAIL: The error was: ERROR: replication slot "tap_sub" is active for >>> PID 3866790' >>> while running 'psql -XAtq -d port=59543 host=/tmp/QpCJtafT7R >>> dbname='postgres' -f - -v ON_ERROR_STOP=1' with sql 'DROP SUBSCRIPTION >>> tap_sub' at >>> /home/nm/farm/xlc64/HEAD/pgsql.build/src/test/subscription/../../../src/test/perl/PostgresNode.pm >>> line 1198. >>> >>> After poking at it a bit, I found that I can cause several different >>> failures of this ilk in the subscription tests by injecting delays at >>> the points where a slot's active_pid is about to be cleared, as in the >>> attached patch (which also adds some extra printouts for debugging >>> purposes; none of that is meant for commit). It seems clear that there >>> is inadequate interlocking going on when we kill and restart a logical >>> rep worker: we're trying to start a new one before the old one has >>> gotten out of the slot. >>> >> >> Thanks for the test case. >> >> It's not actually that we start new worker fast. It's that we try to >> drop the slot right after worker process was killed but if the code that >> clears slot's active_pid takes too long, it still looks like it's being >> used. I am quite sure it's possible to make this happen for physical >> replication as well when using slots. >> >> This is not something that can be solved by locking on subscriber. ISTM >> we need to make pg_drop_replication_slot behave more nicely, like making >> it wait for the slot to become available (either by default or as an >> option). >> >> I'll have to think about how to do it without rewriting half of >> replication slots or reimplementing lock queue though because the >> replication slots don't use normal catalog access so there is no object >> locking with wait queue. We could use some latch wait with small timeout >> but that seems ugly as that function can be called by user without >> having dropped the slot before so the wait can be quite long (as in >> "forever"). >> > > Naive fix would be something like attached. But as I said, it's not > exactly pretty. >
So best idea I could come up with is to make use of the new condition variable API. That lets us wait for variable which can be set per slot. It's not backportable however, I am not sure how big problem that is considering the lack of complaints until now (maybe we could backport using the ugly timeout version?). The attached patch is a prototype of such solution and there are some race conditions (variable can get signaled before the waiting process starts sleeping for it). I am mainly sending it to get feedback on the approach. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From b72ea52c865b2d7f0d7d29d0834d71e1ec33d54a Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Thu, 6 Jul 2017 18:16:44 +0200 Subject: [PATCH] Wait for slot to become free in when dropping it --- src/backend/replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slot.c | 43 +++++++++++++++++++++----- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 6 ++-- src/include/replication/slot.h | 8 +++-- 5 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 363ca82..a3ba2b1 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - ReplicationSlotAcquire(NameStr(*name)); + ReplicationSlotAcquire(NameStr(*name), true); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index dc7de20..2993bb9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,6 +46,7 @@ #include "pgstat.h" #include "replication/slot.h" #include "storage/fd.h" +#include "storage/ipc.h" #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" @@ -157,6 +158,7 @@ ReplicationSlotsShmemInit(void) /* everything else is zeroed by the memset above */ SpinLockInit(&slot->mutex); LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS); + ConditionVariableInit(&slot->active_cv); } } } @@ -323,7 +325,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * Find a previously created slot and mark it as used by this backend. */ void -ReplicationSlotAcquire(const char *name) +ReplicationSlotAcquire(const char *name, bool nowait) { ReplicationSlot *slot = NULL; int i; @@ -331,6 +333,8 @@ ReplicationSlotAcquire(const char *name) Assert(MyReplicationSlot == NULL); +retry: + /* Search for the named slot and mark it active if we find it. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) @@ -342,7 +346,10 @@ ReplicationSlotAcquire(const char *name) SpinLockAcquire(&s->mutex); active_pid = s->active_pid; if (active_pid == 0) + { active_pid = s->active_pid = MyProcPid; + ConditionVariableBroadcast(&s->active_cv); + } SpinLockRelease(&s->mutex); slot = s; break; @@ -350,16 +357,33 @@ ReplicationSlotAcquire(const char *name) } LWLockRelease(ReplicationSlotControlLock); - /* If we did not find the slot or it was already active, error out. */ + /* If we did not find the slot, error out. */ if (slot == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); + + /* + * If we did find the slot but it's already acquired by another backend, + * we either error out or retry after short wait, depending on what was + * the behavior requested by caller. + */ if (active_pid != MyProcPid) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication slot \"%s\" is active for PID %d", - name, active_pid))); + { + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is active for PID %d", + name, active_pid))); + + /* Wait for condition variable signal from ReplicationSlotRelease. */ + ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK); + ConditionVariableCancelSleep(); + + goto retry; + } + + /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; @@ -393,6 +417,7 @@ ReplicationSlotRelease(void) */ SpinLockAcquire(&slot->mutex); slot->active_pid = 0; + ConditionVariableBroadcast(&slot->active_cv); SpinLockRelease(&slot->mutex); } @@ -451,11 +476,11 @@ ReplicationSlotCleanup(void) * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name) +ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name); + ReplicationSlotAcquire(name, nowait); ReplicationSlotDropAcquired(); } @@ -525,6 +550,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) SpinLockAcquire(&slot->mutex); slot->active_pid = 0; + ConditionVariableBroadcast(&slot->active_cv); SpinLockRelease(&slot->mutex); ereport(fail_softly ? WARNING : ERROR, @@ -543,6 +569,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); slot->active_pid = 0; slot->in_use = false; + ConditionVariableBroadcast(&slot->active_cv); LWLockRelease(ReplicationSlotControlLock); /* diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6dc8088..a5ecc85 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name)); + ReplicationSlotDrop(NameStr(*name), false); PG_RETURN_VOID(); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 002143b..9a2babe 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname); + ReplicationSlotDrop(cmd->slotname, false); EndCommand("DROP_REPLICATION_SLOT", DestRemote); } @@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); /* * Force a disconnect, so that the decoding code doesn't need to care diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a283f4e..f97679e 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -12,6 +12,7 @@ #include "fmgr.h" #include "access/xlog.h" #include "access/xlogreader.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" @@ -93,6 +94,9 @@ typedef struct ReplicationSlot /* Who is streaming out changes for this slot? 0 in unused slots. */ pid_t active_pid; + /* Conditional variable which is signalled when the above changes. */ + ConditionVariable active_cv; + /* any outstanding modifications? */ bool just_dirtied; bool dirty; @@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void); extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency p); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name); +extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name); +extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers