On Sat, Oct 11, 2025, at 09:43, Joel Jacobson wrote:
> On Sat, Oct 11, 2025, at 08:43, Joel Jacobson wrote:
>> In addition to previously suggested optimization, there is another major
...
>> I'm not entirely sure this approach is correct though

Having investigated this, the "direct advancement" approach seems
correct to me.

(I understand the exclusive lock in PreCommit_Notify on NotifyQueueLock
is of course needed because there are other operations that don't
acquire the heavyweight-lock, that take shared/exclusive lock on
NotifyQueueLock to read/modify QUEUE_HEAD, so the exclusive lock on
NotifyQueueLock in PreCommit_Notify is needed, since it modifies the
QUEUE_HEAD.)

Given all the experiments since my earlier message, here is a fresh,
self-contained write-up:

This series has two patches:

* 0001-optimize_listen_notify-v16.patch:
Improve test coverage of async.c. Adds isolation specs covering
previously untested paths (subxact LISTEN reparenting/merge/abort,
simple NOTIFY reparenting, notification_match dedup, and an array-growth
case used by the follow-on patch.

* 0002-optimize_listen_notify-v16.patch:
Optimize LISTEN/NOTIFY by maintaining a shared channel map and using
direct advancement to avoid useless wakeups.

Problem
-------

Today SignalBackends wakes all listeners in the same database, with no
knowledge of which backends listen on which channels. When some backends
are listening on different channels, each NOTIFY causes unnecessary
wakeups and context switches, which can become the bottleneck in
workloads.

Overview of the solution (patch 0002)
-------------------------------------

* Introduce a lazily-created DSA+dshash map (dboid, channel) ->
  [ProcNumber] (channelHash). AtCommit_Notify maintains it for
  LISTEN/UNLISTEN, and SignalBackends consults it to signal only
  listeners on the channels notified within the transaction.
* Add a per-backend wakeupPending flag to suppress duplicate signals.
* Direct advancement: while queuing, PreCommit_Notify records the queue
  head before and after our writes. Writers are globally serialized, so
  the interval [oldHead, newHead) contains only our entries.
  SignalBackends advances any backend still at oldHead directly to
  newHead, avoiding a pointless wakeup.
* Keep the queue healthy by signaling backends that have fallen too far
  behind (lag >= QUEUE_CLEANUP_DELAY) so the global tail can advance.
* pg_listening_channels and IsListeningOn now read from channelHash.
* Add LWLock tranche NOTIFY_CHANNEL_HASH and wait event
  NotifyChannelHash.

No user-visible semantic changes are intended; this is an internal
performance improvement.

Benchmark
---------

Using a patched pgbench (adds --listen-notify-benchmark; attached as
.txt to avoid confusing cfbot). Each run performs 10 000 round trips and
adds 100 idle listeners per iteration.

master (HEAD):

% ./pgbench_patched --listen-notify-benchmark --notify-round-trips=10000 
--notify-idle-step=100

idle_listeners  round_trips_per_sec     max_latency_usec
             0              32123.7                  893
           100               1952.5                 1465
           200                991.4                 3438
           300                663.5                 2454
           400                494.6                 2950
           500                398.6                 3394
           600                332.8                 4272
           700                287.1                 4692
           800                252.6                 5208
           900                225.4                 5614
          1000                202.5                 6212

0002-optimize_listen_notify-v16.patch:

% ./pgbench_patched --listen-notify-benchmark --notify-round-trips=10000 
--notify-idle-step=100

idle_listeners  round_trips_per_sec     max_latency_usec
             0              31832.6                 1067
           100              32341.0                 1035
           200              31562.5                 1054
           300              30040.1                 1057
           400              29287.1                 1023
           500              28191.9                 1201
           600              28166.5                 1019
           700              26994.3                 1094
           800              26501.0                 1043
           900              25974.2                 1005
          1000              25720.6                 1008

Benchmarked on MacBook Pro Apple M3 Max.

Files
-----

* 0001-optimize_listen_notify-v16.patch - tests only.
* 0002-optimize_listen_notify-v16.patch - implementation.
* pgbench-listen-notify-benchmark-patch.txt - adds --listen-notify-benchmark.

Feedback and review much welcomed.

/Joel

Attachment: 0001-optimize_listen_notify-v16.patch
Description: Binary data

Attachment: 0002-optimize_listen_notify-v16.patch
Description: Binary data

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1515ed405ba..3f47c50847d 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -35,6 +35,7 @@
 
 #include <ctype.h>
 #include <float.h>
+#include <inttypes.h>
 #include <limits.h>
 #include <math.h>
 #include <signal.h>
@@ -237,6 +238,11 @@ static const char *const PARTITION_METHOD[] = {"none", 
"range", "hash"};
 /* random seed used to initialize base_random_sequence */
 static int64 random_seed = -1;
 
+/* LISTEN/NOTIFY benchmark mode parameters */
+static bool listen_notify_mode = false;        /* enable LISTEN/NOTIFY 
benchmark */
+static int     notify_round_trips = 100;       /* number of round-trips per 
iteration */
+static int     notify_idle_step = 10;          /* idle listeners to add per 
iteration */
+
 /*
  * end of configurable parameters
  *********************************************************************/
@@ -930,6 +936,10 @@ usage(void)
                   "                           (same as \"-b simple-update\")\n"
                   "  -S, --select-only        perform SELECT-only 
transactions\n"
                   "                           (same as \"-b select-only\")\n"
+                  "  --listen-notify-benchmark\n"
+                  "                           run LISTEN/NOTIFY round-trip 
benchmark\n"
+                  "  --notify-round-trips=NUM number of round-trips per 
iteration (default: 100)\n"
+                  "  --notify-idle-step=NUM   idle listeners to add per 
iteration (default: 10)\n"
                   "\nBenchmarking options:\n"
                   "  -c, --client=NUM         number of concurrent database 
clients (default: 1)\n"
                   "  -C, --connect            establish new connection for 
each transaction\n"
@@ -6689,6 +6699,216 @@ set_random_seed(const char *seed)
        return true;
 }
 
+/*
+ * Run LISTEN/NOTIFY round-trip benchmark
+ *
+ * This benchmark measures the round-trip time between two processes that
+ * ping-pong NOTIFY messages while adding idle listening connections.
+ */
+static void
+runListenNotifyBenchmark(void)
+{
+       PGconn     *conn1 = NULL;
+       PGconn     *conn2 = NULL;
+       PGconn    **idle_conns = NULL;
+       int                     num_idle = 0;
+       int                     max_idle = 100000;      /* reasonable upper 
limit */
+       PGresult   *res;
+       char            channel1[] = "pgbench_channel_1";
+       char            channel2[] = "pgbench_channel_2";
+       char            notify_cmd[256];
+       bool            first_failure = false;
+
+       pg_log_info("starting LISTEN/NOTIFY round-trip benchmark");
+       pg_log_info("round-trips per iteration: %d", notify_round_trips);
+       pg_log_info("idle listeners added per iteration: %d", notify_idle_step);
+       printf("\n%14s  %19s  %19s\n", "idle_listeners", "round_trips_per_sec", 
"max_latency_usec");
+
+       /* Allocate array for idle connections */
+       idle_conns = (PGconn **) pg_malloc0(max_idle * sizeof(PGconn *));
+
+       /* Create two active connections for ping-pong */
+       conn1 = doConnect();
+       if (conn1 == NULL)
+               pg_fatal("failed to create connection 1");
+
+       conn2 = doConnect();
+       if (conn2 == NULL)
+               pg_fatal("failed to create connection 2");
+
+       /* Set up LISTEN on both connections */
+       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel1);
+       res = PQexec(conn1, notify_cmd);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("LISTEN failed on connection 1: %s", 
PQerrorMessage(conn1));
+       PQclear(res);
+
+       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel2);
+       res = PQexec(conn2, notify_cmd);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("LISTEN failed on connection 2: %s", 
PQerrorMessage(conn2));
+       PQclear(res);
+
+       /* Main benchmark loop: measure round-trips then add idle connections */
+       while (num_idle < max_idle)
+       {
+               int                     i;
+               int64           total_latency = 0;
+               int64           max_latency = 0;
+
+               /* Perform round-trip measurements */
+               for (i = 0; i < notify_round_trips; i++)
+               {
+                       pg_time_usec_t start_time,
+                                               end_time;
+                       int64           latency;
+                       PGnotify   *notify;
+                       int                     sock;
+                       fd_set          input_mask;
+                       struct timeval tv;
+
+                       /* Clear any pending notifications */
+                       PQconsumeInput(conn1);
+                       while ((notify = PQnotifies(conn1)) != NULL)
+                               PQfreemem(notify);
+                       PQconsumeInput(conn2);
+                       while ((notify = PQnotifies(conn2)) != NULL)
+                               PQfreemem(notify);
+
+                       /* Start timer and send notification from conn1 */
+                       start_time = pg_time_now();
+                       snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", 
channel2);
+                       res = PQexec(conn1, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                               pg_fatal("NOTIFY failed: %s", 
PQerrorMessage(conn1));
+                       PQclear(res);
+
+                       /* Wait for notification on conn2 */
+                       sock = PQsocket(conn2);
+                       notify = NULL;
+                       while (notify == NULL)
+                       {
+                               PQconsumeInput(conn2);
+                               notify = PQnotifies(conn2);
+                               if (notify == NULL)
+                               {
+                                       /* Wait for data on socket */
+                                       FD_ZERO(&input_mask);
+                                       FD_SET(sock, &input_mask);
+                                       tv.tv_sec = 10; /* 10 second timeout */
+                                       tv.tv_usec = 0;
+                                       if (select(sock + 1, &input_mask, NULL, 
NULL, &tv) < 0)
+                                               pg_fatal("select() failed: %m");
+                               }
+                       }
+                       PQfreemem(notify);
+
+                       /* Send notification back from conn2 */
+                       snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", 
channel1);
+                       res = PQexec(conn2, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                               pg_fatal("NOTIFY failed: %s", 
PQerrorMessage(conn2));
+                       PQclear(res);
+
+                       /* Wait for notification on conn1 */
+                       sock = PQsocket(conn1);
+                       notify = NULL;
+                       while (notify == NULL)
+                       {
+                               PQconsumeInput(conn1);
+                               notify = PQnotifies(conn1);
+                               if (notify == NULL)
+                               {
+                                       /* Wait for data on socket */
+                                       FD_ZERO(&input_mask);
+                                       FD_SET(sock, &input_mask);
+                                       tv.tv_sec = 10; /* 10 second timeout */
+                                       tv.tv_usec = 0;
+                                       if (select(sock + 1, &input_mask, NULL, 
NULL, &tv) < 0)
+                                               pg_fatal("select() failed: %m");
+                               }
+                       }
+                       PQfreemem(notify);
+
+                       /* End timer */
+                       end_time = pg_time_now();
+
+                       /* Calculate individual round-trip latency */
+                       latency = end_time - start_time;
+
+                       /* Accumulate total latency and track maximum */
+                       total_latency += latency;
+                       if (latency > max_latency)
+                               max_latency = latency;
+               }
+
+               /* Calculate and report round-trips per second and max latency 
*/
+               fprintf(stdout, "%14d  %19.1f  %19" PRId64 "\n",
+                               num_idle,
+                               1000000.0 * notify_round_trips / total_latency,
+                               max_latency);
+               fflush(stdout);
+
+               /* Stop if we hit connection limit */
+               if (first_failure)
+                       break;
+
+               /* Add idle listening connections */
+               for (i = 0; i < notify_idle_step && num_idle < max_idle; i++)
+               {
+                       PGconn     *idle_conn;
+                       char            idle_channel[256];
+
+                       idle_conn = doConnect();
+                       if (idle_conn == NULL)
+                       {
+                               if (!first_failure)
+                               {
+                                       pg_log_info("reached max_connections at 
%d idle listeners", num_idle);
+                                       first_failure = true;
+                               }
+                               break;
+                       }
+
+                       /* Each idle connection listens on a unique channel */
+                       snprintf(idle_channel, sizeof(idle_channel), "idle_%d", 
num_idle);
+                       snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", 
idle_channel);
+
+                       res = PQexec(idle_conn, notify_cmd);
+                       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       {
+                               pg_log_warning("LISTEN failed on idle 
connection %d: %s",
+                                                          num_idle, 
PQerrorMessage(idle_conn));
+                               PQfinish(idle_conn);
+                               PQclear(res);
+                               first_failure = true;
+                               break;
+                       }
+                       PQclear(res);
+
+                       idle_conns[num_idle] = idle_conn;
+                       num_idle++;
+               }
+
+               /* Stop if we couldn't add any connections */
+               if (first_failure && i == 0)
+                       break;
+       }
+
+       /* Clean up */
+       pg_log_info("cleaning up connections");
+       PQfinish(conn1);
+       PQfinish(conn2);
+       for (int i = 0; i < num_idle; i++)
+       {
+               if (idle_conns[i])
+                       PQfinish(idle_conns[i]);
+       }
+       pg_free(idle_conns);
+
+       pg_log_info("LISTEN/NOTIFY benchmark completed");
+}
+
 int
 main(int argc, char **argv)
 {
@@ -6739,6 +6959,9 @@ main(int argc, char **argv)
                {"verbose-errors", no_argument, NULL, 15},
                {"exit-on-abort", no_argument, NULL, 16},
                {"debug", no_argument, NULL, 17},
+               {"listen-notify-benchmark", no_argument, NULL, 18},
+               {"notify-round-trips", required_argument, NULL, 19},
+               {"notify-idle-step", required_argument, NULL, 20},
                {NULL, 0, NULL, 0}
        };
 
@@ -7092,6 +7315,22 @@ main(int argc, char **argv)
                        case 17:                        /* debug */
                                pg_logging_increase_verbosity();
                                break;
+                       case 18:                        /* 
listen-notify-benchmark */
+                               listen_notify_mode = true;
+                               benchmarking_option_set = true;
+                               break;
+                       case 19:                        /* notify-round-trips */
+                               benchmarking_option_set = true;
+                               if (!option_parse_int(optarg, 
"--notify-round-trips", 1, INT_MAX,
+                                                                         
&notify_round_trips))
+                                       exit(1);
+                               break;
+                       case 20:                        /* notify-idle-step */
+                               benchmarking_option_set = true;
+                               if (!option_parse_int(optarg, 
"--notify-idle-step", 1, INT_MAX,
+                                                                         
&notify_idle_step))
+                                       exit(1);
+                               break;
                        default:
                                /* getopt_long already emitted a complaint */
                                pg_log_error_hint("Try \"%s --help\" for more 
information.", progname);
@@ -7210,6 +7449,20 @@ main(int argc, char **argv)
                        pg_fatal("some of the specified options cannot be used 
in benchmarking mode");
        }
 
+       /* Handle LISTEN/NOTIFY benchmark mode */
+       if (listen_notify_mode)
+       {
+               /* Establish a database connection for setup */
+               if ((con = doConnect()) == NULL)
+                       pg_fatal("could not connect to database");
+
+               /* Run the LISTEN/NOTIFY benchmark */
+               runListenNotifyBenchmark();
+
+               PQfinish(con);
+               exit(0);
+       }
+
        if (nxacts > 0 && duration > 0)
                pg_fatal("specify either a number of transactions (-t) or a 
duration (-T), not both");
 

Reply via email to