From 72a6252a504f0dc90aa1236a0bc8f560fb75a227 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Sat, 16 Aug 2025 19:28:18 +0200
Subject: [PATCH] LISTEN/NOTIFY: make the latency/throughput trade-off tunable

Background: Currently, listeners are signaled on every NOTIFY as soon as
possible. That minimizes perceived latency, but under bursty traffic it
leads to many redundant wakeups, heavy context switching, and degraded
throughput.

This patch adds listener-side wakeup coalescing controlled by a new GUC,
notify_latency_target.  The setting defines the maximum additional
latency that is acceptable, allowing redundant wakeups to be coalesced
within the specified interval.

Each listener has a shared "wakeup pending" flag.  Senders that observe
the flag is already set do nothing, effectively coalescing their NOTIFY
with the pending wakeup.  The listener records the start time of each
processing cycle; if it is awakened again too soon, it defers work and
arms a timeout to re-awaken after the configured delay.  The flag is
cleared when entering asyncQueueReadAllNotifications().  A new timeout
reason, NOTIFY_DEFERRED_WAKEUP_TIMEOUT, is registered at backend
startup.

This makes the inherent latency/throughput trade-off explicit and
administrator-controlled.  Larger delays increase batching and reduce
wakeup churn, improving throughput at the cost of additional per-notify
latency; a delay of 0 preserves the previous behavior.  Queue ordering,
visibility, and cross-database semantics are unchanged.

User-visible change: new GUC notify_latency_target (ms, default 0).
---
 doc/src/sgml/config.sgml                      | 29 ++++++++++++
 src/backend/commands/async.c                  | 47 ++++++++++++++++++-
 src/backend/utils/init/postinit.c             |  2 +
 src/backend/utils/misc/guc_parameters.dat     | 10 ++++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/include/commands/async.h                  |  1 +
 src/include/utils/timeout.h                   |  1 +
 7 files changed, 90 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e9b420f3ddb..f0156b52a0c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -10267,6 +10267,35 @@ COPY postgres_log FROM '/full/path/to/logfile.csv' WITH csv;
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-notify-min-wakeup-delay" xreflabel="notify_latency_target">
+      <term><varname>notify_latency_target</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>notify_latency_target</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Sets the maximum acceptable additional latency for delivering
+        <command>LISTEN</command>/<command>NOTIFY</command>
+        notifications. During bursty periods, notifications that arrive
+        within this interval are coalesced and delivered together,
+        trading bounded extra latency for fewer wakeups and higher
+        throughput.
+       </para>
+
+       <para>
+        After a listening backend has been idle, the first
+        <command>NOTIFY</command> causes an immediately wakeup.
+        If additional notifications happen before
+        <varname>notify_latency_target</varname> has elapsed since the
+        start of that processing cycle, wakeup is deferred by one full
+        <varname>notify_latency_target</varname> interval from the point
+        of deferral. When that interval expires, the listening backend
+        wakes and catches up in a single wakeup.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-bytea-output" xreflabel="bytea_output">
       <term><varname>bytea_output</varname> (<type>enum</type>)
       <indexterm>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..c2d97f731a7 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -150,6 +150,7 @@
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
+#include "utils/timeout.h"
 
 
 /*
@@ -246,6 +247,7 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	bool		wakeup_pending_flag;	/* for listener wakeup throttling */
 } QueueBackendStatus;
 
 /*
@@ -293,6 +295,8 @@ typedef struct AsyncQueueControl
 
 static AsyncQueueControl *asyncQueueControl;
 
+static TimestampTz last_wakeup_start_time = 0;
+
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
@@ -301,6 +305,9 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) \
+	(asyncQueueControl->backend[i].wakeup_pending_flag)
+
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -423,6 +430,7 @@ static bool tryAdvanceTail = false;
 
 /* GUC parameters */
 bool		Trace_notify = false;
+int			notify_latency_target = 0;
 
 /* For 8 KB pages this gives 8 GB of disk space */
 int			max_notify_queue_pages = 1048576;
@@ -527,6 +535,7 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) = false;
 		}
 	}
 
@@ -1603,7 +1612,18 @@ SignalBackends(void)
 		QueuePosition pos;
 
 		Assert(pid != InvalidPid);
+
+		/*
+		 * If a wakeup is already pending for this listener, do nothing. The
+		 * pending signal guarantees it will wake up and process all messages
+		 * up to the current queue head, including the one we just wrote. This
+		 * coalesces multiple wakeups into one.
+		 */
+		if (QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i))
+			continue;
+
 		pos = QUEUE_BACKEND_POS(i);
+
 		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
 		{
 			/*
@@ -1624,6 +1644,7 @@ SignalBackends(void)
 				continue;
 		}
 		/* OK, need to signal this one */
+		QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) = true;
 		pids[count] = pid;
 		procnos[count] = i;
 		count++;
@@ -1861,10 +1882,13 @@ asyncQueueReadAllNotifications(void)
 		AsyncQueueEntry align;
 	}			page_buffer;
 
-	/* Fetch current state */
+	last_wakeup_start_time = GetCurrentTimestamp();
+
+	/* Fetch current state and clear wakeup-pending flag */
 	LWLockAcquire(NotifyQueueLock, LW_SHARED);
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+	QUEUE_BACKEND_WAKEUP_PENDING_FLAG(MyProcNumber) = false;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
 	LWLockRelease(NotifyQueueLock);
@@ -2189,6 +2213,27 @@ ProcessIncomingNotify(bool flush)
 	if (listenChannels == NIL)
 		return;
 
+	/*
+	 * Throttling check: if we were last active too recently, defer. This
+	 * check is safe without a lock because it's based on a backend-local
+	 * timestamp.
+	 */
+	if (notify_latency_target > 0 &&
+		!TimestampDifferenceExceeds(last_wakeup_start_time,
+									GetCurrentTimestamp(),
+									notify_latency_target))
+	{
+		/*
+		 * Too soon. We leave wakeup_pending_flag untouched (it must be true,
+		 * or we wouldn't have been signaled) to tell senders we are
+		 * intentionally delaying. Arm a timer to re-awaken and process the
+		 * backlog later.
+		 */
+		enable_timeout_after(NOTIFY_DEFERRED_WAKEUP_TIMEOUT,
+							 notify_latency_target);
+		return;
+	}
+
 	if (Trace_notify)
 		elog(DEBUG1, "ProcessIncomingNotify");
 
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 641e535a73c..4afd6eb7441 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -33,6 +33,7 @@
 #include "catalog/pg_database.h"
 #include "catalog/pg_db_role_setting.h"
 #include "catalog/pg_tablespace.h"
+#include "commands/async.h"
 #include "libpq/auth.h"
 #include "libpq/libpq-be.h"
 #include "mb/pg_wchar.h"
@@ -764,6 +765,7 @@ InitPostgres(const char *in_dbname, Oid dboid,
 		RegisterTimeout(TRANSACTION_TIMEOUT, TransactionTimeoutHandler);
 		RegisterTimeout(IDLE_SESSION_TIMEOUT, IdleSessionTimeoutHandler);
 		RegisterTimeout(CLIENT_CONNECTION_CHECK_TIMEOUT, ClientCheckTimeoutHandler);
+		RegisterTimeout(NOTIFY_DEFERRED_WAKEUP_TIMEOUT, HandleNotifyInterrupt);
 		RegisterTimeout(IDLE_STATS_UPDATE_TIMEOUT,
 						IdleStatsUpdateTimeoutHandler);
 	}
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 6bc6be13d2a..2b23a9520bf 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1567,6 +1567,16 @@
   max => 'INT_MAX',
 },
 
+{ name => 'notify_latency_target', type => 'int', context => 'PGC_SUSET', group => 'CLIENT_CONN_OTHER',
+  short_desc => 'Latency target for waking listeners to process NOTIFY.',
+  long_desc => 'First notify after idle wakes immediately; arrivals within the interval defer the next wakeup by one full interval and are coalesced. 0 disables.',
+  flags => 'GUC_UNIT_MS',
+  variable => 'notify_latency_target',
+  boot_val => '0',
+  min => '0',
+  max => 'INT_MAX',
+},
+
 { name => 'wal_decode_buffer_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
   short_desc => 'Buffer size for reading ahead in the WAL during recovery.',
   long_desc => 'Maximum distance to read ahead in the WAL to prefetch referenced data blocks.',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c36fcb9ab61..fd2150b66f9 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -766,6 +766,7 @@ autovacuum_worker_slots = 16	# autovacuum worker slots to allocate
 #lock_timeout = 0				# in milliseconds, 0 is disabled
 #idle_in_transaction_session_timeout = 0	# in milliseconds, 0 is disabled
 #idle_session_timeout = 0			# in milliseconds, 0 is disabled
+#notify_latency_target = 0		# in milliseconds, 0 is disabled
 #bytea_output = 'hex'			# hex, escape
 #xmlbinary = 'base64'
 #xmloption = 'content'
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..ed27456e487 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -16,6 +16,7 @@
 #include <signal.h>
 
 extern PGDLLIMPORT bool Trace_notify;
+extern PGDLLIMPORT int notify_latency_target;
 extern PGDLLIMPORT int max_notify_queue_pages;
 extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 7b19beafdc9..ea720b05043 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -36,6 +36,7 @@ typedef enum TimeoutId
 	IDLE_STATS_UPDATE_TIMEOUT,
 	CLIENT_CONNECTION_CHECK_TIMEOUT,
 	STARTUP_PROGRESS_TIMEOUT,
+	NOTIFY_DEFERRED_WAKEUP_TIMEOUT,
 	/* First user-definable timeout reason */
 	USER_TIMEOUT,
 	/* Maximum number of timeout reasons */
-- 
2.50.1

