Hoi Tom,

On Fri, 13 Sep 2019 at 22:04, Tom Lane <t...@sss.pgh.pa.us> wrote:
>
> This throws multiple compiler warnings for me:

Fixed.

> Also, I don't exactly believe this bit:
[snip]
> It seems unlikely that insertion would stop exactly at a page boundary,
> but that seems to be what this is looking for.

This is how asyncQueueAddEntries() works. Entries are never split over
pages. If there is not enough room, then it advances to the beginning
of the next page and returns. Hence here the offset is zero. I could
set the global inside asyncQueueAddEntries() but that seems icky.
Another alternative is to have asyncQueueAddEntries() return a boolean
"moved to new page", but that's just a long-winded way of doing what
it is now.

> But, really ... do we need the backendTryAdvanceTail flag at all?
> I'm dubious, because it seems like asyncQueueReadAllNotifications
> would have already covered the case if we're listening.  If we're
> not listening, but we signalled some other listeners, it falls
> to them to kick us if we're the slowest backend.  If we're not the
> slowest backend then doing asyncQueueAdvanceTail isn't useful.

There are multiple issues here. asyncQueueReadAllNotifications() is
going to be called by each listener simultaneously, so each listener
is going to come to the same conclusion. On the other side, there is
no guarantee we wake up anyone as a result of the NOTIFY, e.g. if
there are no listeners in the current database. To be sure you try to
advance the tail, you have to trigger on the sending side. The global
is there because at the point we are inserting entries we are still in
a user transaction, potentially holding many table locks (the issue we
were running into in the first place). By setting
backendTryAdvanceTail we can move the work to
ProcessCompletedNotifies() which is after the transaction has
committed and the locks released.

> I agree with getting rid of the asyncQueueAdvanceTail call in
> asyncQueueUnregister; on reflection doing that there seems pretty unsafe,
> because we're not necessarily in a transaction and hence anything that
> could possibly error is a bad idea.  However, it'd be good to add a
> comment explaining that we're not doing that and why it's ok not to.

Comment added.

> I'm fairly unimpressed with the "kick a random slow backend" logic.
> There can be no point in kicking any but the slowest backend, ie
> one whose pointer is exactly the oldest.  Since we're already computing
> the min pointer in that loop, it would actually take *less* logic inside
> the loop to remember the/a backend that had that pointer value, and then
> decide afterwards whether it's slow enough to merit a kick.

Adjusted this. I'm not sure it's actually clearer this way, but it is
less work inside the loop. A small change is that now it won't signal
anyone if this backend is the slowest, which more correct.

Thanks for the feedback. Attached is version 3.

Have a nice weekend,
-- 
Martijn van Oosterhout <klep...@gmail.com> http://svana.org/kleptog/
From 539d97b47c4535314c23df22e5e87ecc43149f3a Mon Sep 17 00:00:00 2001
From: Martijn van Oosterhout <klep...@svana.org>
Date: Sat, 14 Sep 2019 11:01:11 +0200
Subject: [PATCH 1/2] Improve performance of async notifications

Advancing the tail pointer requires an exclusive lock which can block
backends from other databases, so it's worth keeping these attempts to a
minimum.

Instead of tracking the slowest backend exactly we update the queue more
lazily, only checking when we switch to a new SLRU page.  Additionally,
instead of waking up every slow backend at once, we do them one at a time.
---
 src/backend/commands/async.c | 167 ++++++++++++++++++++++++++---------
 1 file changed, 124 insertions(+), 43 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index f26269b5ea..ffd7c7e90b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -73,10 +73,11 @@
  *	  Finally, after we are out of the transaction altogether, we check if
  *	  we need to signal listening backends.  In SignalBackends() we scan the
  *	  list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
- *	  to every listening backend (we don't know which backend is listening on
- *	  which channel so we must signal them all). We can exclude backends that
- *	  are already up to date, though.  We don't bother with a self-signal
- *	  either, but just process the queue directly.
+ *	  to every listening backend for the relavent database (we don't know
+ *	  which backend is listening on which channel so we must signal them
+ *	  all).  We can exclude backends that are already up to date, though.
+ *	  We don't bother with a self-signal either, but just process the queue
+ *	  directly.
  *
  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
  *	  sets the process's latch, which triggers the event to be processed
@@ -89,13 +90,25 @@
  *	  Inbound-notify processing consists of reading all of the notifications
  *	  that have arrived since scanning last time. We read every notification
  *	  until we reach either a notification from an uncommitted transaction or
- *	  the head pointer's position. Then we check if we were the laziest
- *	  backend: if our pointer is set to the same position as the global tail
- *	  pointer is set, then we move the global tail pointer ahead to where the
- *	  second-laziest backend is (in general, we take the MIN of the current
- *	  head position and all active backends' new tail pointers). Whenever we
- *	  move the global tail pointer we also truncate now-unused pages (i.e.,
- *	  delete files in pg_notify/ that are no longer used).
+ *	  the head pointer's position.
+ *
+ * 6. To avoid SLRU wraparound and minimize disk space the tail pointer
+ *	  needs to be advanced so that old pages can be truncated.  This
+ *	  however requires an exclusive lock and as such should be done
+ *	  infrequently.
+ *
+ *	  When a new notification is added, the writer checks to see if the
+ *	  tail pointer is more than QUEUE_CLEANUP_DELAY pages behind.  If
+ *	  so, it attempts to advance the tail, and if there are slow
+ *	  backends (perhaps because all the notifications were for other
+ *	  databases), wake the slowest by sending it a signal.
+ *
+ *	  When the slow backend processes the queue it notes it was behind
+ *	  and so also tries to advance the tail, possibly waking up another
+ *	  slow backend.  Eventually all backends will have processed the
+ *	  queue and the global tail pointer is move to a new page and we
+ *	  also truncate now-unused pages (i.e., delete files in pg_notify/
+ *	  that are no longer used).
  *
  * An application that listens on the same channel it notifies will get
  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
@@ -199,6 +212,12 @@ typedef struct QueuePosition
 #define QUEUE_POS_EQUAL(x,y) \
 	 ((x).page == (y).page && (x).offset == (y).offset)
 
+/* compare QueuePositions */
+#define QUEUE_POS_LT(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) ? (1) : \
+	 (x).page != (y).page ? (0) : \
+	 (x).offset < (y).offset ? (1) : (0))
+
 /* choose logically smaller QueuePosition */
 #define QUEUE_POS_MIN(x,y) \
 	(asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
@@ -211,6 +230,12 @@ typedef struct QueuePosition
 	 (x).page != (y).page ? (x) : \
 	 (x).offset > (y).offset ? (x) : (y))
 
+/* how many pages does a backend need to be behind before it needs to be signalled */
+#define QUEUE_CLEANUP_DELAY 4
+
+/* is a backend so far behind it needs to be signalled? */
+#define QUEUE_SLOW_BACKEND(i) \
+	(asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(QUEUE_BACKEND_POS(i))) > QUEUE_CLEANUP_DELAY)
 /*
  * Struct describing a listening backend's status
  */
@@ -252,7 +277,7 @@ typedef struct QueueBackendStatus
 typedef struct AsyncQueueControl
 {
 	QueuePosition head;			/* head points to the next free location */
-	QueuePosition tail;			/* the global tail is equivalent to the pos of
+	QueuePosition tail;			/* the global tail is some place older than the
 								 * the "slowest" backend */
 	BackendId	firstListener;	/* id of first listener, or InvalidBackendId */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
@@ -402,10 +427,15 @@ static bool amRegisteredListener = false;
 /* has this backend sent notifications in the current transaction? */
 static bool backendHasSentNotifications = false;
 
+/* has this backend switched to new page, and so should attempt to advance
+ * the queue tail?  */
+static bool backendTryAdvanceTail = false;
+
 /* GUC parameter */
 bool		Trace_notify = false;
 
 /* local function prototypes */
+static int asyncQueuePageDiff(int p, int q);
 static bool asyncQueuePagePrecedes(int p, int q);
 static void queue_listen(ListenActionKind action, const char *channel);
 static void Async_UnlistenOnExit(int code, Datum arg);
@@ -421,7 +451,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
 static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
 static double asyncQueueUsage(void);
 static void asyncQueueFillWarning(void);
-static bool SignalBackends(void);
+static bool SignalMyDBBackends(void);
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
@@ -438,8 +468,8 @@ static void ClearPendingActionsAndNotifies(void);
 /*
  * We will work on the page range of 0..QUEUE_MAX_PAGE.
  */
-static bool
-asyncQueuePagePrecedes(int p, int q)
+static int
+asyncQueuePageDiff(int p, int q)
 {
 	int			diff;
 
@@ -455,7 +485,13 @@ asyncQueuePagePrecedes(int p, int q)
 		diff -= QUEUE_MAX_PAGE + 1;
 	else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
 		diff += QUEUE_MAX_PAGE + 1;
-	return diff < 0;
+	return diff;
+}
+
+static bool
+asyncQueuePagePrecedes(int p, int q)
+{
+	return asyncQueuePageDiff(p, q) < 0;
 }
 
 /*
@@ -905,6 +941,12 @@ PreCommit_Notify(void)
 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 						 errmsg("too many notifications in the NOTIFY queue")));
 			nextNotify = asyncQueueAddEntries(nextNotify);
+
+			/* If we are advancing to a new page, remember this so after the
+			 * transaction commits we can attempt to advance the tail
+			 * pointer, see ProcessCompletedNotifies() */
+			if (QUEUE_POS_OFFSET(QUEUE_HEAD) == 0)
+				backendTryAdvanceTail = true;
 			LWLockRelease(AsyncQueueLock);
 		}
 	}
@@ -1051,8 +1093,6 @@ Exec_ListenPreCommit(void)
 	 * notification to the frontend.  Also, although our transaction might
 	 * have executed NOTIFY, those message(s) aren't queued yet so we can't
 	 * see them in the queue.
-	 *
-	 * This will also advance the global tail pointer if possible.
 	 */
 	if (!QUEUE_POS_EQUAL(max, head))
 		asyncQueueReadAllNotifications();
@@ -1185,7 +1225,7 @@ ProcessCompletedNotifies(void)
 	StartTransactionCommand();
 
 	/* Send signals to other backends */
-	signalled = SignalBackends();
+	signalled = SignalMyDBBackends();
 
 	if (listenChannels != NIL)
 	{
@@ -1203,6 +1243,16 @@ ProcessCompletedNotifies(void)
 		 * harmless.)
 		 */
 		asyncQueueAdvanceTail();
+		backendTryAdvanceTail = false;
+	}
+
+	if (backendTryAdvanceTail)
+	{
+		/* We switched to a new page while writing our notifies to the
+		 * queue, so we try to advance the tail ourselves, possibly waking
+		 * up another backend if it is running behind */
+		backendTryAdvanceTail = false;
+		asyncQueueAdvanceTail();
 	}
 
 	CommitTransactionCommand();
@@ -1242,8 +1292,6 @@ IsListeningOn(const char *channel)
 static void
 asyncQueueUnregister(void)
 {
-	bool		advanceTail;
-
 	Assert(listenChannels == NIL);	/* else caller error */
 
 	if (!amRegisteredListener)	/* nothing to do */
@@ -1253,10 +1301,7 @@ asyncQueueUnregister(void)
 	 * Need exclusive lock here to manipulate list links.
 	 */
 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
-	/* check if entry is valid and oldest ... */
-	advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
-		QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
-	/* ... then mark it invalid */
+	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
 	/* and remove it from the list */
@@ -1279,9 +1324,9 @@ asyncQueueUnregister(void)
 	/* mark ourselves as no longer listed in the global array */
 	amRegisteredListener = false;
 
-	/* If we were the laziest backend, try to advance the tail pointer */
-	if (advanceTail)
-		asyncQueueAdvanceTail();
+	/* Don't try to advance the tail.  We're possibly not in a
+	   transaction to handle errors, and it'll get cleaned up later
+	   anyway. */
 }
 
 /*
@@ -1570,7 +1615,7 @@ asyncQueueFillWarning(void)
 }
 
 /*
- * Send signals to all listening backends (except our own).
+ * Send signals to all listening backends (except our own) for our database.
  *
  * Returns true if we sent at least one signal.
  *
@@ -1583,7 +1628,7 @@ asyncQueueFillWarning(void)
  * Since we know the BackendId and the Pid the signalling is quite cheap.
  */
 static bool
-SignalBackends(void)
+SignalMyDBBackends(void)
 {
 	bool		signalled = false;
 	int32	   *pids;
@@ -1592,9 +1637,9 @@ SignalBackends(void)
 	int32		pid;
 
 	/*
-	 * Identify all backends that are listening and not already up-to-date. We
-	 * don't want to send signals while holding the AsyncQueueLock, so we just
-	 * build a list of target PIDs.
+	 * Identify all backends with MyDatabaseId that are listening and not
+	 * already up-to-date.  We don't want to send signals while holding the
+	 * AsyncQueueLock, so we just build a list of target PIDs.
 	 *
 	 * XXX in principle these pallocs could fail, which would be bad. Maybe
 	 * preallocate the arrays?	But in practice this is only run in trivial
@@ -1609,7 +1654,7 @@ SignalBackends(void)
 	{
 		pid = QUEUE_BACKEND_PID(i);
 		Assert(pid != InvalidPid);
-		if (pid != MyProcPid)
+		if (pid != MyProcPid && QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
 		{
 			QueuePosition pos = QUEUE_BACKEND_POS(i);
 
@@ -1859,6 +1904,9 @@ asyncQueueReadAllNotifications(void)
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
 	pos = oldpos = QUEUE_BACKEND_POS(MyBackendId);
 	head = QUEUE_HEAD;
+	/* If we're behind, we possibly got signalled to catchup.  Remember
+	 * this so we attempt to advance the tail later */
+	advanceTail = QUEUE_SLOW_BACKEND(MyBackendId);
 	LWLockRelease(AsyncQueueLock);
 
 	if (QUEUE_POS_EQUAL(pos, head))
@@ -1966,12 +2014,9 @@ asyncQueueReadAllNotifications(void)
 		/* Update shared state */
 		LWLockAcquire(AsyncQueueLock, LW_SHARED);
 		QUEUE_BACKEND_POS(MyBackendId) = pos;
-		advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
 		LWLockRelease(AsyncQueueLock);
 
-		/* If we were the laziest backend, try to advance the tail pointer */
-		if (advanceTail)
-			asyncQueueAdvanceTail();
+		/* We don't try to advance the tail here. */
 
 		PG_RE_THROW();
 	}
@@ -1980,10 +2025,10 @@ asyncQueueReadAllNotifications(void)
 	/* Update shared state */
 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
 	QUEUE_BACKEND_POS(MyBackendId) = pos;
-	advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
 	LWLockRelease(AsyncQueueLock);
 
-	/* If we were the laziest backend, try to advance the tail pointer */
+	/* We were behind, so try to advance the tail pointer, possibly
+	 * signalling another backend if necessary */
 	if (advanceTail)
 		asyncQueueAdvanceTail();
 
@@ -2093,8 +2138,8 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 }
 
 /*
- * Advance the shared queue tail variable to the minimum of all the
- * per-backend tail pointers.  Truncate pg_notify space if possible.
+ * Advance the shared queue tail variable if possible.  If a slow backend is
+ * holding everything up, signal it.  Truncate pg_notify space if possible.
  */
 static void
 asyncQueueAdvanceTail(void)
@@ -2103,18 +2148,54 @@ asyncQueueAdvanceTail(void)
 	int			oldtailpage;
 	int			newtailpage;
 	int			boundary;
+	int			slowestbackendid = InvalidBackendId;
+	int			slowestbackendpid;
 
+	/* Advance the tail as far as possible, noting if there is a slow
+	 * backend we could kick */
 	LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
 	min = QUEUE_HEAD;
 	for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
 	{
 		Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-		min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+		if (QUEUE_POS_LT(QUEUE_BACKEND_POS(i), min))
+		{
+			/* this finds the tail of the queue and remembers who */
+			min = QUEUE_BACKEND_POS(i);
+			slowestbackendid = i;
+		}
 	}
 	oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
 	QUEUE_TAIL = min;
+	/* if the we weren't the slowest, get the pid so we can kick it */
+	if (slowestbackendid != InvalidBackendId)
+	{
+		if (QUEUE_SLOW_BACKEND(slowestbackendid) &&
+			QUEUE_BACKEND_PID(slowestbackendid) != MyProcPid)
+		{
+			slowestbackendpid = QUEUE_BACKEND_PID(slowestbackendid);
+		}
+		else
+		{
+			slowestbackendid = InvalidBackendId;
+		}
+	}
 	LWLockRelease(AsyncQueueLock);
 
+	/* Wake up the backend furthest behind, if it is considered "slow".
+	 * It should in turn call this function to signal the next, see
+	 * asyncQueueReadAllNotifications() */
+	if (slowestbackendid != InvalidBackendId) {
+
+		/* Note: assuming things aren't broken, a signal failure here could
+		 * only occur if the target backend exited since we released
+		 * AsyncQueueLock; which is unlikely but certainly possible. So we
+		 * just log a low-level debug message if it happens.
+		 */
+		if (SendProcSignal(slowestbackendpid, PROCSIG_NOTIFY_INTERRUPT, slowestbackendid) < 0)
+			elog(DEBUG3, "could not signal backend with PID %d: %m", slowestbackendpid);
+	}
+
 	/*
 	 * We can truncate something if the global tail advanced across an SLRU
 	 * segment boundary.
-- 
2.17.1

Reply via email to