On 06/07/17 18:20, Petr Jelinek wrote:
> On 06/07/17 17:33, Petr Jelinek wrote:
>> On 03/07/17 01:54, Tom Lane wrote:
>>> I noticed a recent failure that looked suspiciously like a race condition:
>>>
>>> https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=hornet&dt=2017-07-02%2018%3A02%3A07
>>>
>>> The critical bit in the log file is
>>>
>>> error running SQL: 'psql:<stdin>:1: ERROR:  could not drop the replication 
>>> slot "tap_sub" on publisher
>>> DETAIL:  The error was: ERROR:  replication slot "tap_sub" is active for 
>>> PID 3866790'
>>> while running 'psql -XAtq -d port=59543 host=/tmp/QpCJtafT7R 
>>> dbname='postgres' -f - -v ON_ERROR_STOP=1' with sql 'DROP SUBSCRIPTION 
>>> tap_sub' at 
>>> /home/nm/farm/xlc64/HEAD/pgsql.build/src/test/subscription/../../../src/test/perl/PostgresNode.pm
>>>  line 1198.
>>>
>>> After poking at it a bit, I found that I can cause several different
>>> failures of this ilk in the subscription tests by injecting delays at
>>> the points where a slot's active_pid is about to be cleared, as in the
>>> attached patch (which also adds some extra printouts for debugging
>>> purposes; none of that is meant for commit).  It seems clear that there
>>> is inadequate interlocking going on when we kill and restart a logical
>>> rep worker: we're trying to start a new one before the old one has
>>> gotten out of the slot.
>>>
>>
>> Thanks for the test case.
>>
>> It's not actually that we start new worker fast. It's that we try to
>> drop the slot right after worker process was killed but if the code that
>> clears slot's active_pid takes too long, it still looks like it's being
>> used. I am quite sure it's possible to make this happen for physical
>> replication as well when using slots.
>>
>> This is not something that can be solved by locking on subscriber. ISTM
>> we need to make pg_drop_replication_slot behave more nicely, like making
>> it wait for the slot to become available (either by default or as an
>> option).
>>
>> I'll have to think about how to do it without rewriting half of
>> replication slots or reimplementing lock queue though because the
>> replication slots don't use normal catalog access so there is no object
>> locking with wait queue. We could use some latch wait with small timeout
>> but that seems ugly as that function can be called by user without
>> having dropped the slot before so the wait can be quite long (as in
>> "forever").
>>
> 
> Naive fix would be something like attached. But as I said, it's not
> exactly pretty.
> 

So best idea I could come up with is to make use of the new condition
variable API. That lets us wait for variable which can be set per slot.

It's not backportable however, I am not sure how big problem that is
considering the lack of complaints until now (maybe we could backport
using the ugly timeout version?).

The attached patch is a prototype of such solution and there are some
race conditions (variable can get signaled before the waiting process
starts sleeping for it). I am mainly sending it to get feedback on the
approach.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From b72ea52c865b2d7f0d7d29d0834d71e1ec33d54a Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Thu, 6 Jul 2017 18:16:44 +0200
Subject: [PATCH] Wait for slot to become free in when dropping it

---
 src/backend/replication/logical/logicalfuncs.c |  2 +-
 src/backend/replication/slot.c                 | 43 +++++++++++++++++++++-----
 src/backend/replication/slotfuncs.c            |  2 +-
 src/backend/replication/walsender.c            |  6 ++--
 src/include/replication/slot.h                 |  8 +++--
 5 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 363ca82..a3ba2b1 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-	ReplicationSlotAcquire(NameStr(*name));
+	ReplicationSlotAcquire(NameStr(*name), true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dc7de20..2993bb9 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,6 +46,7 @@
 #include "pgstat.h"
 #include "replication/slot.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
@@ -157,6 +158,7 @@ ReplicationSlotsShmemInit(void)
 			/* everything else is zeroed by the memset above */
 			SpinLockInit(&slot->mutex);
 			LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
+			ConditionVariableInit(&slot->active_cv);
 		}
 	}
 }
@@ -323,7 +325,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  * Find a previously created slot and mark it as used by this backend.
  */
 void
-ReplicationSlotAcquire(const char *name)
+ReplicationSlotAcquire(const char *name, bool nowait)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -331,6 +333,8 @@ ReplicationSlotAcquire(const char *name)
 
 	Assert(MyReplicationSlot == NULL);
 
+retry:
+
 	/* Search for the named slot and mark it active if we find it. */
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 	for (i = 0; i < max_replication_slots; i++)
@@ -342,7 +346,10 @@ ReplicationSlotAcquire(const char *name)
 			SpinLockAcquire(&s->mutex);
 			active_pid = s->active_pid;
 			if (active_pid == 0)
+			{
 				active_pid = s->active_pid = MyProcPid;
+				ConditionVariableBroadcast(&s->active_cv);
+			}
 			SpinLockRelease(&s->mutex);
 			slot = s;
 			break;
@@ -350,16 +357,33 @@ ReplicationSlotAcquire(const char *name)
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 
-	/* If we did not find the slot or it was already active, error out. */
+	/* If we did not find the slot, error out. */
 	if (slot == NULL)
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_OBJECT),
 				 errmsg("replication slot \"%s\" does not exist", name)));
+
+	/*
+	 * If we did find the slot but it's already acquired by another backend,
+	 * we either error out or retry after short wait, depending on what was
+	 * the behavior requested by caller.
+	 */
 	if (active_pid != MyProcPid)
-		ereport(ERROR,
-				(errcode(ERRCODE_OBJECT_IN_USE),
-				 errmsg("replication slot \"%s\" is active for PID %d",
-						name, active_pid)));
+	{
+		if (nowait)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_IN_USE),
+					 errmsg("replication slot \"%s\" is active for PID %d",
+							name, active_pid)));
+
+		/* Wait for condition variable signal from ReplicationSlotRelease. */
+		ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK);
+		ConditionVariableCancelSleep();
+
+		goto retry;
+	}
+
+
 
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = slot;
@@ -393,6 +417,7 @@ ReplicationSlotRelease(void)
 		 */
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
+		ConditionVariableBroadcast(&slot->active_cv);
 		SpinLockRelease(&slot->mutex);
 	}
 
@@ -451,11 +476,11 @@ ReplicationSlotCleanup(void)
  * Permanently drop replication slot identified by the passed in name.
  */
 void
-ReplicationSlotDrop(const char *name)
+ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name);
+	ReplicationSlotAcquire(name, nowait);
 
 	ReplicationSlotDropAcquired();
 }
@@ -525,6 +550,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 
 		SpinLockAcquire(&slot->mutex);
 		slot->active_pid = 0;
+		ConditionVariableBroadcast(&slot->active_cv);
 		SpinLockRelease(&slot->mutex);
 
 		ereport(fail_softly ? WARNING : ERROR,
@@ -543,6 +569,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	slot->active_pid = 0;
 	slot->in_use = false;
+	ConditionVariableBroadcast(&slot->active_cv);
 	LWLockRelease(ReplicationSlotControlLock);
 
 	/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6dc8088..a5ecc85 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 
 	CheckSlotRequirements();
 
-	ReplicationSlotDrop(NameStr(*name));
+	ReplicationSlotDrop(NameStr(*name), false);
 
 	PG_RETURN_VOID();
 }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 002143b..9a2babe 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname);
+		ReplicationSlotAcquire(cmd->slotname, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 static void
 DropReplicationSlot(DropReplicationSlotCmd *cmd)
 {
-	ReplicationSlotDrop(cmd->slotname);
+	ReplicationSlotDrop(cmd->slotname, false);
 	EndCommand("DROP_REPLICATION_SLOT", DestRemote);
 }
 
@@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname);
+	ReplicationSlotAcquire(cmd->slotname, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a283f4e..f97679e 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -12,6 +12,7 @@
 #include "fmgr.h"
 #include "access/xlog.h"
 #include "access/xlogreader.h"
+#include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/spin.h"
@@ -93,6 +94,9 @@ typedef struct ReplicationSlot
 	/* Who is streaming out changes for this slot? 0 in unused slots. */
 	pid_t		active_pid;
 
+	/* Conditional variable which is signalled when the above changes. */
+	ConditionVariable active_cv;
+
 	/* any outstanding modifications? */
 	bool		just_dirtied;
 	bool		dirty;
@@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void);
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
 					  ReplicationSlotPersistency p);
 extern void ReplicationSlotPersist(void);
-extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to