On Sat, Oct 11, 2025, at 09:43, Joel Jacobson wrote: > On Sat, Oct 11, 2025, at 08:43, Joel Jacobson wrote: >> In addition to previously suggested optimization, there is another major ... >> I'm not entirely sure this approach is correct though
Having investigated this, the "direct advancement" approach seems
correct to me.
(I understand the exclusive lock in PreCommit_Notify on NotifyQueueLock
is of course needed because there are other operations that don't
acquire the heavyweight-lock, that take shared/exclusive lock on
NotifyQueueLock to read/modify QUEUE_HEAD, so the exclusive lock on
NotifyQueueLock in PreCommit_Notify is needed, since it modifies the
QUEUE_HEAD.)
Given all the experiments since my earlier message, here is a fresh,
self-contained write-up:
This series has two patches:
* 0001-optimize_listen_notify-v16.patch:
Improve test coverage of async.c. Adds isolation specs covering
previously untested paths (subxact LISTEN reparenting/merge/abort,
simple NOTIFY reparenting, notification_match dedup, and an array-growth
case used by the follow-on patch.
* 0002-optimize_listen_notify-v16.patch:
Optimize LISTEN/NOTIFY by maintaining a shared channel map and using
direct advancement to avoid useless wakeups.
Problem
-------
Today SignalBackends wakes all listeners in the same database, with no
knowledge of which backends listen on which channels. When some backends
are listening on different channels, each NOTIFY causes unnecessary
wakeups and context switches, which can become the bottleneck in
workloads.
Overview of the solution (patch 0002)
-------------------------------------
* Introduce a lazily-created DSA+dshash map (dboid, channel) ->
[ProcNumber] (channelHash). AtCommit_Notify maintains it for
LISTEN/UNLISTEN, and SignalBackends consults it to signal only
listeners on the channels notified within the transaction.
* Add a per-backend wakeupPending flag to suppress duplicate signals.
* Direct advancement: while queuing, PreCommit_Notify records the queue
head before and after our writes. Writers are globally serialized, so
the interval [oldHead, newHead) contains only our entries.
SignalBackends advances any backend still at oldHead directly to
newHead, avoiding a pointless wakeup.
* Keep the queue healthy by signaling backends that have fallen too far
behind (lag >= QUEUE_CLEANUP_DELAY) so the global tail can advance.
* pg_listening_channels and IsListeningOn now read from channelHash.
* Add LWLock tranche NOTIFY_CHANNEL_HASH and wait event
NotifyChannelHash.
No user-visible semantic changes are intended; this is an internal
performance improvement.
Benchmark
---------
Using a patched pgbench (adds --listen-notify-benchmark; attached as
.txt to avoid confusing cfbot). Each run performs 10 000 round trips and
adds 100 idle listeners per iteration.
master (HEAD):
% ./pgbench_patched --listen-notify-benchmark --notify-round-trips=10000
--notify-idle-step=100
idle_listeners round_trips_per_sec max_latency_usec
0 32123.7 893
100 1952.5 1465
200 991.4 3438
300 663.5 2454
400 494.6 2950
500 398.6 3394
600 332.8 4272
700 287.1 4692
800 252.6 5208
900 225.4 5614
1000 202.5 6212
0002-optimize_listen_notify-v16.patch:
% ./pgbench_patched --listen-notify-benchmark --notify-round-trips=10000
--notify-idle-step=100
idle_listeners round_trips_per_sec max_latency_usec
0 31832.6 1067
100 32341.0 1035
200 31562.5 1054
300 30040.1 1057
400 29287.1 1023
500 28191.9 1201
600 28166.5 1019
700 26994.3 1094
800 26501.0 1043
900 25974.2 1005
1000 25720.6 1008
Benchmarked on MacBook Pro Apple M3 Max.
Files
-----
* 0001-optimize_listen_notify-v16.patch - tests only.
* 0002-optimize_listen_notify-v16.patch - implementation.
* pgbench-listen-notify-benchmark-patch.txt - adds --listen-notify-benchmark.
Feedback and review much welcomed.
/Joel
0001-optimize_listen_notify-v16.patch
Description: Binary data
0002-optimize_listen_notify-v16.patch
Description: Binary data
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1515ed405ba..3f47c50847d 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -35,6 +35,7 @@
#include <ctype.h>
#include <float.h>
+#include <inttypes.h>
#include <limits.h>
#include <math.h>
#include <signal.h>
@@ -237,6 +238,11 @@ static const char *const PARTITION_METHOD[] = {"none",
"range", "hash"};
/* random seed used to initialize base_random_sequence */
static int64 random_seed = -1;
+/* LISTEN/NOTIFY benchmark mode parameters */
+static bool listen_notify_mode = false; /* enable LISTEN/NOTIFY
benchmark */
+static int notify_round_trips = 100; /* number of round-trips per
iteration */
+static int notify_idle_step = 10; /* idle listeners to add per
iteration */
+
/*
* end of configurable parameters
*********************************************************************/
@@ -930,6 +936,10 @@ usage(void)
" (same as \"-b simple-update\")\n"
" -S, --select-only perform SELECT-only
transactions\n"
" (same as \"-b select-only\")\n"
+ " --listen-notify-benchmark\n"
+ " run LISTEN/NOTIFY round-trip
benchmark\n"
+ " --notify-round-trips=NUM number of round-trips per
iteration (default: 100)\n"
+ " --notify-idle-step=NUM idle listeners to add per
iteration (default: 10)\n"
"\nBenchmarking options:\n"
" -c, --client=NUM number of concurrent database
clients (default: 1)\n"
" -C, --connect establish new connection for
each transaction\n"
@@ -6689,6 +6699,216 @@ set_random_seed(const char *seed)
return true;
}
+/*
+ * Run LISTEN/NOTIFY round-trip benchmark
+ *
+ * This benchmark measures the round-trip time between two processes that
+ * ping-pong NOTIFY messages while adding idle listening connections.
+ */
+static void
+runListenNotifyBenchmark(void)
+{
+ PGconn *conn1 = NULL;
+ PGconn *conn2 = NULL;
+ PGconn **idle_conns = NULL;
+ int num_idle = 0;
+ int max_idle = 100000; /* reasonable upper
limit */
+ PGresult *res;
+ char channel1[] = "pgbench_channel_1";
+ char channel2[] = "pgbench_channel_2";
+ char notify_cmd[256];
+ bool first_failure = false;
+
+ pg_log_info("starting LISTEN/NOTIFY round-trip benchmark");
+ pg_log_info("round-trips per iteration: %d", notify_round_trips);
+ pg_log_info("idle listeners added per iteration: %d", notify_idle_step);
+ printf("\n%14s %19s %19s\n", "idle_listeners", "round_trips_per_sec",
"max_latency_usec");
+
+ /* Allocate array for idle connections */
+ idle_conns = (PGconn **) pg_malloc0(max_idle * sizeof(PGconn *));
+
+ /* Create two active connections for ping-pong */
+ conn1 = doConnect();
+ if (conn1 == NULL)
+ pg_fatal("failed to create connection 1");
+
+ conn2 = doConnect();
+ if (conn2 == NULL)
+ pg_fatal("failed to create connection 2");
+
+ /* Set up LISTEN on both connections */
+ snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel1);
+ res = PQexec(conn1, notify_cmd);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("LISTEN failed on connection 1: %s",
PQerrorMessage(conn1));
+ PQclear(res);
+
+ snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel2);
+ res = PQexec(conn2, notify_cmd);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("LISTEN failed on connection 2: %s",
PQerrorMessage(conn2));
+ PQclear(res);
+
+ /* Main benchmark loop: measure round-trips then add idle connections */
+ while (num_idle < max_idle)
+ {
+ int i;
+ int64 total_latency = 0;
+ int64 max_latency = 0;
+
+ /* Perform round-trip measurements */
+ for (i = 0; i < notify_round_trips; i++)
+ {
+ pg_time_usec_t start_time,
+ end_time;
+ int64 latency;
+ PGnotify *notify;
+ int sock;
+ fd_set input_mask;
+ struct timeval tv;
+
+ /* Clear any pending notifications */
+ PQconsumeInput(conn1);
+ while ((notify = PQnotifies(conn1)) != NULL)
+ PQfreemem(notify);
+ PQconsumeInput(conn2);
+ while ((notify = PQnotifies(conn2)) != NULL)
+ PQfreemem(notify);
+
+ /* Start timer and send notification from conn1 */
+ start_time = pg_time_now();
+ snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s",
channel2);
+ res = PQexec(conn1, notify_cmd);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("NOTIFY failed: %s",
PQerrorMessage(conn1));
+ PQclear(res);
+
+ /* Wait for notification on conn2 */
+ sock = PQsocket(conn2);
+ notify = NULL;
+ while (notify == NULL)
+ {
+ PQconsumeInput(conn2);
+ notify = PQnotifies(conn2);
+ if (notify == NULL)
+ {
+ /* Wait for data on socket */
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ tv.tv_sec = 10; /* 10 second timeout */
+ tv.tv_usec = 0;
+ if (select(sock + 1, &input_mask, NULL,
NULL, &tv) < 0)
+ pg_fatal("select() failed: %m");
+ }
+ }
+ PQfreemem(notify);
+
+ /* Send notification back from conn2 */
+ snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s",
channel1);
+ res = PQexec(conn2, notify_cmd);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("NOTIFY failed: %s",
PQerrorMessage(conn2));
+ PQclear(res);
+
+ /* Wait for notification on conn1 */
+ sock = PQsocket(conn1);
+ notify = NULL;
+ while (notify == NULL)
+ {
+ PQconsumeInput(conn1);
+ notify = PQnotifies(conn1);
+ if (notify == NULL)
+ {
+ /* Wait for data on socket */
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ tv.tv_sec = 10; /* 10 second timeout */
+ tv.tv_usec = 0;
+ if (select(sock + 1, &input_mask, NULL,
NULL, &tv) < 0)
+ pg_fatal("select() failed: %m");
+ }
+ }
+ PQfreemem(notify);
+
+ /* End timer */
+ end_time = pg_time_now();
+
+ /* Calculate individual round-trip latency */
+ latency = end_time - start_time;
+
+ /* Accumulate total latency and track maximum */
+ total_latency += latency;
+ if (latency > max_latency)
+ max_latency = latency;
+ }
+
+ /* Calculate and report round-trips per second and max latency
*/
+ fprintf(stdout, "%14d %19.1f %19" PRId64 "\n",
+ num_idle,
+ 1000000.0 * notify_round_trips / total_latency,
+ max_latency);
+ fflush(stdout);
+
+ /* Stop if we hit connection limit */
+ if (first_failure)
+ break;
+
+ /* Add idle listening connections */
+ for (i = 0; i < notify_idle_step && num_idle < max_idle; i++)
+ {
+ PGconn *idle_conn;
+ char idle_channel[256];
+
+ idle_conn = doConnect();
+ if (idle_conn == NULL)
+ {
+ if (!first_failure)
+ {
+ pg_log_info("reached max_connections at
%d idle listeners", num_idle);
+ first_failure = true;
+ }
+ break;
+ }
+
+ /* Each idle connection listens on a unique channel */
+ snprintf(idle_channel, sizeof(idle_channel), "idle_%d",
num_idle);
+ snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s",
idle_channel);
+
+ res = PQexec(idle_conn, notify_cmd);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ pg_log_warning("LISTEN failed on idle
connection %d: %s",
+ num_idle,
PQerrorMessage(idle_conn));
+ PQfinish(idle_conn);
+ PQclear(res);
+ first_failure = true;
+ break;
+ }
+ PQclear(res);
+
+ idle_conns[num_idle] = idle_conn;
+ num_idle++;
+ }
+
+ /* Stop if we couldn't add any connections */
+ if (first_failure && i == 0)
+ break;
+ }
+
+ /* Clean up */
+ pg_log_info("cleaning up connections");
+ PQfinish(conn1);
+ PQfinish(conn2);
+ for (int i = 0; i < num_idle; i++)
+ {
+ if (idle_conns[i])
+ PQfinish(idle_conns[i]);
+ }
+ pg_free(idle_conns);
+
+ pg_log_info("LISTEN/NOTIFY benchmark completed");
+}
+
int
main(int argc, char **argv)
{
@@ -6739,6 +6959,9 @@ main(int argc, char **argv)
{"verbose-errors", no_argument, NULL, 15},
{"exit-on-abort", no_argument, NULL, 16},
{"debug", no_argument, NULL, 17},
+ {"listen-notify-benchmark", no_argument, NULL, 18},
+ {"notify-round-trips", required_argument, NULL, 19},
+ {"notify-idle-step", required_argument, NULL, 20},
{NULL, 0, NULL, 0}
};
@@ -7092,6 +7315,22 @@ main(int argc, char **argv)
case 17: /* debug */
pg_logging_increase_verbosity();
break;
+ case 18: /*
listen-notify-benchmark */
+ listen_notify_mode = true;
+ benchmarking_option_set = true;
+ break;
+ case 19: /* notify-round-trips */
+ benchmarking_option_set = true;
+ if (!option_parse_int(optarg,
"--notify-round-trips", 1, INT_MAX,
+
¬ify_round_trips))
+ exit(1);
+ break;
+ case 20: /* notify-idle-step */
+ benchmarking_option_set = true;
+ if (!option_parse_int(optarg,
"--notify-idle-step", 1, INT_MAX,
+
¬ify_idle_step))
+ exit(1);
+ break;
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more
information.", progname);
@@ -7210,6 +7449,20 @@ main(int argc, char **argv)
pg_fatal("some of the specified options cannot be used
in benchmarking mode");
}
+ /* Handle LISTEN/NOTIFY benchmark mode */
+ if (listen_notify_mode)
+ {
+ /* Establish a database connection for setup */
+ if ((con = doConnect()) == NULL)
+ pg_fatal("could not connect to database");
+
+ /* Run the LISTEN/NOTIFY benchmark */
+ runListenNotifyBenchmark();
+
+ PQfinish(con);
+ exit(0);
+ }
+
if (nxacts > 0 && duration > 0)
pg_fatal("specify either a number of transactions (-t) or a
duration (-T), not both");
