On 2020-Apr-06, Alvaro Herrera wrote:

> I think there's a race condition in this: if we kill a walsender and it
> restarts immediately before we (checkpoint) can acquire the slot, we
> will wait for it to terminate on its own.  Fixing this requires changing
> the ReplicationSlotAcquire API so that it knows not to wait but not
> raise error either (so we can use an infinite loop: "acquire, if busy
> send signal")

I think this should do it, but I didn't test it super-carefully and the
usage of the condition variable is not entirely kosher.

-- 
Álvaro Herrera                https://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
commit af47f2e05316e7e89ee7e5b59b5f5fe6ae421508
Author:     Alvaro Herrera <alvhe...@alvh.no-ip.org>
AuthorDate: Mon Apr 6 19:29:56 2020 -0400
CommitDate: Mon Apr 6 19:51:06 2020 -0400

    loop in InvalidateObsoleteReplicationSlots
    
    fixes a race condition on slot acquisition

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 04510094a8..f5384f1df8 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	(void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 31e12e4043..e7960cb48e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -325,9 +325,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
 /*
  * Find a previously created slot and mark it as used by this backend.
+ *
+ * The return value is only useful if behavior is SAB_Inquire, in which
+ * it's zero if we successfully acquired the slot, or the PID of the
+ * owning process otherwise.  If behavior is SAB_Error, then trying to
+ * acquire an owned slot is an error.  If SAB_Block, we sleep until the
+ * slot is released by the owning process.
  */
-void
-ReplicationSlotAcquire(const char *name, bool nowait)
+int
+ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
 {
 	ReplicationSlot *slot;
 	int			active_pid;
@@ -392,11 +398,13 @@ retry:
 	 */
 	if (active_pid != MyProcPid)
 	{
-		if (nowait)
+		if (behavior == SAB_Error)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
 					 errmsg("replication slot \"%s\" is active for PID %d",
 							name, active_pid)));
+		else if (behavior == SAB_Inquire)
+			return active_pid;
 
 		/* Wait here until we get signaled, and then restart */
 		ConditionVariableSleep(&slot->active_cv,
@@ -412,6 +420,9 @@ retry:
 
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = slot;
+
+	/* success */
+	return 0;
 }
 
 /*
@@ -518,7 +529,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	(void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
 
 	ReplicationSlotDropAcquired();
 }
@@ -1097,7 +1108,6 @@ restart:
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
 		char	   *slotname;
-		int			wspid;
 
 		if (!s->in_use)
 			continue;
@@ -1112,21 +1122,27 @@ restart:
 
 		slotname = pstrdup(NameStr(s->data.name));
 		restart_lsn = s->data.restart_lsn;
-		wspid = s->active_pid;
 
 		SpinLockRelease(&s->mutex);
 		LWLockRelease(ReplicationSlotControlLock);
 
-		if (wspid != 0)
+		for (;;)
 		{
-			ereport(LOG,
-					(errmsg("terminating walsender %d because replication slot is too far behind",
-						   wspid)));
-			(void) kill(wspid, SIGTERM);
-		}
+			int		wspid = ReplicationSlotAcquire(slotname, SAB_Inquire);
 
-		/* Here we wait until the walsender is gone */
-		ReplicationSlotAcquire(slotname, false);
+			/* no walsender? success! */
+			if (wspid == 0)
+				break;
+
+			ereport(LOG,
+					(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
+						   wspid, slotname)));
+			(void) kill(wspid, SIGTERM);
+
+			ConditionVariableTimedSleep(&s->active_cv,
+										10, WAIT_EVENT_REPLICATION_SLOT_DROP);
+			ConditionVariableCancelSleep();
+		}
 
 		ereport(LOG,
 				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 91a5d0f290..f8336129d9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -592,7 +592,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	(void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9e5611574c..06e8b79036 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -595,7 +595,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		(void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1132,7 +1132,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	(void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 6e469ea749..f984bfd7a6 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -36,6 +36,14 @@ typedef enum ReplicationSlotPersistency
 	RS_TEMPORARY
 } ReplicationSlotPersistency;
 
+/* For ReplicationSlotAcquire, q.v. */
+typedef enum SlotAcquireBehavior
+{
+	SAB_Error,
+	SAB_Block,
+	SAB_Inquire
+} SlotAcquireBehavior;
+
 /*
  * On-Disk data of a replication slot, preserved across restarts.
  */
@@ -184,7 +192,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
 extern void ReplicationSlotSave(void);

Reply via email to