From 3fe37ec554905d69f71a05e9dec26d5b3ac7fd23 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Sat, 11 Oct 2025 07:28:57 +0200
Subject: [PATCH 3/3] Optimize LISTEN/NOTIFY by advancing idle backends
 directly

Building on the previous channel-specific listener tracking
optimization, this patch further reduces context switching by detecting
idle listening backends that don't listen to any of the channels being
notified and advancing their queue positions directly without waking
them up.

When a backend commits notifications, it now saves both the queue head
position before and after writing. In SignalBackends(), backends that
are at the old queue head and weren't marked for wakeup (meaning they
don't listen to any of the notified channels) are advanced directly to
the new queue head. This eliminates unnecessary wakeups for these
backends, which would otherwise wake up, scan through all the
notifications, skip each one, and advance to the same position anyway.

The implementation carefully handles the race condition where other
backends may write notifications after the heavyweight lock is released
but before SignalBackends() is called. By saving queueHeadAfterWrite
immediately after writing (before releasing the lock), we ensure
backends are only advanced over the exact notifications we wrote, not
notifications from other concurrent backends.
---
 src/backend/commands/async.c | 79 ++++++++++++++++++++++++++++--------
 1 file changed, 62 insertions(+), 17 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bb5ebfab26d..5570f73dd13 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -500,6 +500,8 @@ typedef struct NotificationList
 	int			nestingLevel;	/* current transaction nesting depth */
 	List	   *events;			/* list of Notification structs */
 	HTAB	   *hashtab;		/* hash of NotificationHash structs, or NULL */
+	QueuePosition queueHeadBeforeWrite; /* QUEUE_HEAD before writing notifies */
+	QueuePosition queueHeadAfterWrite;	/* QUEUE_HEAD after writing notifies */
 	struct NotificationList *upper; /* details for upper transaction levels */
 } NotificationList;
 
@@ -1048,6 +1050,7 @@ PreCommit_Notify(void)
 	if (pendingNotifies)
 	{
 		ListCell   *nextNotify;
+		bool		firstIteration = true;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
@@ -1076,6 +1079,9 @@ PreCommit_Notify(void)
 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
 						 AccessExclusiveLock);
 
+		/* Initialize queueHeadBeforeWrite to a safe default */
+		SET_QUEUE_POS(pendingNotifies->queueHeadBeforeWrite, 0, 0);
+
 		/* Now push the notifications into the queue */
 		nextNotify = list_head(pendingNotifies->events);
 		while (nextNotify != NULL)
@@ -1093,6 +1099,19 @@ PreCommit_Notify(void)
 			 * point in time we can still roll the transaction back.
 			 */
 			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+			/*
+			 * On the first iteration, save the queue head position before we
+			 * write any notifications.  This is used by SignalBackends() to
+			 * identify backends that can be advanced directly without waking
+			 * them up.
+			 */
+			if (firstIteration)
+			{
+				pendingNotifies->queueHeadBeforeWrite = QUEUE_HEAD;
+				firstIteration = false;
+			}
+
 			asyncQueueFillWarning();
 			if (asyncQueueIsFull())
 				ereport(ERROR,
@@ -1102,6 +1121,18 @@ PreCommit_Notify(void)
 			LWLockRelease(NotifyQueueLock);
 		}
 
+		/*
+		 * Save the queue head after writing all our notifications.  This is
+		 * used by SignalBackends() to know where to advance idle backends to.
+		 * We must save this now because other backends may write their own
+		 * notifications after we release the heavyweight lock but before we
+		 * call SignalBackends(), and we must not advance backends over those
+		 * other notifications.
+		 */
+		LWLockAcquire(NotifyQueueLock, LW_SHARED);
+		pendingNotifies->queueHeadAfterWrite = QUEUE_HEAD;
+		LWLockRelease(NotifyQueueLock);
+
 		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
 	}
 }
@@ -1934,14 +1965,43 @@ SignalBackends(void)
 		dshash_release_lock(channelHash, entry);
 	}
 
+	/*
+	 * Avoid needing to wake listening backends that are at the old queue head
+	 * (before we wrote our notifications) that we know are not interested in
+	 * our notifications, since otherwise they would have been marked for
+	 * wakeup by now.  Do this by advancing them directly to the new queue
+	 * head.
+	 */
+	if (pendingNotifies != NULL)
+	{
+		QueuePosition oldHead = pendingNotifies->queueHeadBeforeWrite;
+		QueuePosition newHead = pendingNotifies->queueHeadAfterWrite;
+
+		for (ProcNumber i = QUEUE_FIRST_LISTENER;
+			 i != INVALID_PROC_NUMBER;
+			 i = QUEUE_NEXT_LISTENER(i))
+		{
+			QueuePosition pos;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+
+			if (QUEUE_POS_EQUAL(pos, oldHead) &&
+				QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+			{
+				QUEUE_BACKEND_POS(i) = newHead;
+			}
+		}
+	}
+
 	queue_length = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
 									  QUEUE_POS_PAGE(QUEUE_TAIL));
 
 	/* Check for lagging backends when the queue spans multiple pages */
 	if (queue_length > 0)
 	{
-		bool		tail_woken = false;
-
 		for (ProcNumber i = QUEUE_FIRST_LISTENER;
 			 i != INVALID_PROC_NUMBER;
 			 i = QUEUE_NEXT_LISTENER(i))
@@ -1955,21 +2015,6 @@ SignalBackends(void)
 
 			pos = QUEUE_BACKEND_POS(i);
 
-			/* Signal one backend positioned at the global tail */
-			if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
-												  QUEUE_POS_PAGE(pos)) == 0)
-			{
-				pid = QUEUE_BACKEND_PID(i);
-				Assert(pid != InvalidPid);
-
-				QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
-				pids[count] = pid;
-				procnos[count] = i;
-				count++;
-				tail_woken = true;
-				continue;
-			}
-
 			lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
 									 QUEUE_POS_PAGE(pos));
 
-- 
2.50.1

