From 18004e66974fc9d4a93e00b0183959ac306c7218 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <joel@compiler.org>
Date: Sun, 15 Jun 2025 00:09:43 +0200
Subject: [PATCH] Subject: Optimize LISTEN/NOTIFY signaling for scalability

Previously, the implementation would signal every backend listening on any
channel in the database for every NOTIFY. While robust, this broadcast
approach limits the scalability of application patterns that rely on
targeted notifications to distinct channels. This commit improves
scalability for such workloads by introducing an optimization for
single-listener channels.

A new shared hash table is introduced to track channels that have exactly
one listener. When a NOTIFY is issued, this table is consulted; if a
single listener is found for the target channel, only that backend is
signaled. The system gracefully falls back to the original broadcast
behavior for channels with multiple listeners or if the hash table runs
out of memory.

To avoid introducing a new contention point on a global lock, the hash
table's locking is partitioned. An array of lightweight locks protects
the hash table, with a custom hash function mapping channels to lock
partitions. This allows concurrent LISTEN/UNLISTEN operations on
different channels to proceed in parallel. For high-concurrency workloads
where many backends listen on the *same* channel, an optimistic
read-then-upgrade locking pattern is used to minimize serialization. A
strict lock ordering hierarchy (global NotifyQueueLock before any
partition lock) is observed to prevent deadlocks.

This also incorporates the "wake only tail" optimization to ensure the
global queue tail can always advance without causing a thundering herd
of signals.
---
 src/backend/commands/async.c | 674 +++++++++++++++++++++++++++++++++--
 1 file changed, 641 insertions(+), 33 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..a5b614e1a24 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,11 @@
  *	  All notification messages are placed in the queue and later read out
  *	  by listening backends.
  *
- *	  There is no central knowledge of which backend listens on which channel;
- *	  every backend has its own list of interesting channels.
+ *	  In addition to each backend maintaining its own list of channels, we also
+ *	  maintain a central hash table that tracks channels with single listeners.
+ *	  When a channel has exactly one listening backend, we can signal just that
+ *	  backend. For channels with multiple listeners, we signal all listening
+ *	  backends.
  *
  *	  Although there is only one queue, notifications are treated as being
  *	  database-local; this is done by including the sender's database OID
@@ -71,13 +74,16 @@
  *	  make any actual updates to the effective listen state (listenChannels).
  *	  Then we signal any backends that may be interested in our messages
  *	  (including our own backend, if listening).  This is done by
- *	  SignalBackends(), which scans the list of listening backends and sends a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- *	  know which backend is listening on which channel so we must signal them
- *	  all).  We can exclude backends that are already up to date, though, and
- *	  we can also exclude backends that are in other databases (unless they
- *	  are way behind and should be kicked to make them advance their
- *	  pointers).
+ *	  SignalBackends(), which has two modes of operation, depending on
+ *	  if any of our channels have multiple listening backends or not:
+ *	  a) If there are multiple listening backends, a PROCSIG_NOTIFY_INTERRUPT
+ *	  signal is sent to every listening backend.
+ *	  b) Otherwise, such signals are only sent to each single listening backend
+ *	  per channel.
+ *	  Additionally, we use a "wake only tail" optimization: we always signal
+ *	  the backend furthest behind in the queue to help prevent backends from
+ *	  getting far behind and create a chain reaction of wake-ups.
+ *	  We can exclude backends that are already up to date, though.
  *
  *	  Finally, after we are out of the transaction altogether and about to go
  *	  idle, we scan the queue for messages that need to be sent to our
@@ -128,6 +134,7 @@
 #include <limits.h>
 #include <unistd.h>
 #include <signal.h>
+#include <string.h>
 
 #include "access/parallel.h"
 #include "access/slru.h"
@@ -146,6 +153,7 @@
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/guc_hooks.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
@@ -162,6 +170,68 @@
  */
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
+/*
+ * Number of partitions for the channel hash table's locks.
+ * This must be a power of two.
+ */
+#define NUM_NOTIFY_PARTITIONS 128
+
+/*
+ * Channel hash table definitions
+ *
+ * This hash table provides an optimization by tracking which backend is
+ * listening on each channel. Channels are identified by database OID and
+ * channel name, making them database-specific.
+ *
+ * To improve scalability of concurrent LISTEN/UNLISTEN operations, the hash
+ * table is partitioned, with each partition protected by its own LWLock. This
+ * avoids serializing all operations on a single global lock.
+ *
+ * When exactly one backend listens on a channel, we signal that specific
+ * backend, avoiding unnecessary signals to all listening backends.
+ *
+ * We fall back to broadcast mode and signal all listening backends when:
+ * 1) Multiple backends listen on the same channel, OR
+ * 2) The hash table runs out of shared memory for new entries
+ *
+ * Note that CHANNEL_HASH_MAX_SIZE is not a hard limit - the hash table can
+ * store more entries than this, but performance will degrade due to bucket
+ * overflow. The actual fallback to broadcast mode occurs only when shared
+ * memory is exhausted and we cannot allocate new hash entries.
+ *
+ * The maximum size (CHANNEL_HASH_MAX_SIZE) is based on the typical OS port
+ * range. This provides a reasonable upper bound for systems that use
+ * per-connection channels.
+ *
+ */
+#define CHANNEL_HASH_INIT_SIZE		256
+#define CHANNEL_HASH_MAX_SIZE		65535
+
+/*
+ * Key structure for the channel hash table.
+ * Channels are database-specific, so we need both the database OID
+ * and the channel name to uniquely identify a channel.
+ */
+typedef struct ChannelHashKey
+{
+	Oid			dboid;
+	char		channel[NAMEDATALEN];
+}			ChannelHashKey;
+
+/*
+ * Each entry contains a channel key (database OID + channel name) and a
+ * single backend ProcNumber that is listening on that channel. If multiple
+ * backends try to listen on the same channel, we mark it as having multiple
+ * listeners and fall back to broadcast behavior.
+ */
+typedef struct ChannelEntry
+{
+	ChannelHashKey key;
+	ProcNumber	listener;		/* single backend ID, or INVALID_PROC_NUMBER
+								 * if multiple */
+	bool		has_multiple_listeners;
+}			ChannelEntry;
+
 /*
  * Struct representing an entry in the global notify queue
  *
@@ -269,6 +339,11 @@ typedef struct QueueBackendStatus
  * In order to avoid deadlocks, whenever we need multiple locks, we first get
  * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
  *
+ * The channel hash table is protected by a separate set of partitioned
+ * locks. To prevent deadlocks between these and NotifyQueueLock, the global
+ * lock-ordering rule is: always acquire NotifyQueueLock *before* acquiring
+ * any channel hash partition lock.
+ *
  * Each backend uses the backend[] array entry with index equal to its
  * ProcNumber.  We rely on this to make SendProcSignal fast.
  *
@@ -293,6 +368,60 @@ typedef struct AsyncQueueControl
 
 static AsyncQueueControl *asyncQueueControl;
 
+/* Locks for partitioned channel hash table */
+static LWLock *channelHashLocks;
+static int	channelHashTrancheId = 0;
+
+/* Structure to hold channel hash locks and tranche ID in shared memory */
+typedef struct ChannelHashLockData
+{
+	int			trancheId;
+	LWLock		locks[FLEXIBLE_ARRAY_MEMBER];
+}			ChannelHashLockData;
+
+static ChannelHashLockData * channelHashLockData;
+
+/* Channel hash table for single listening backend signalling */
+static HTAB *channelHash = NULL;
+
+/* Forward declaration needed by GetChannelHash */
+static uint32 channel_hash_func(const void *key, Size keysize);
+
+/*
+ * GetChannelHash
+ *		Get the channel hash table, initializing our backend's pointer if needed.
+ *
+ * This must be called before any access to the channel hash table.
+ * The hash table itself is created in shared memory during AsyncShmemInit,
+ * but each backend needs to get its own pointer to it.
+ */
+static HTAB *
+GetChannelHash(void)
+{
+	if (channelHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		/*
+		 * Set up to attach to the existing shared hash table. The hash
+		 * control parameters must match those used in AsyncShmemInit.
+		 */
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(ChannelHashKey);
+		hash_ctl.entrysize = sizeof(ChannelEntry);
+		hash_ctl.hash = channel_hash_func;
+		hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS;
+
+		channelHash = ShmemInitHash("Channel Hash",
+									CHANNEL_HASH_INIT_SIZE,
+									CHANNEL_HASH_MAX_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+	}
+
+	return channelHash;
+}
+
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
@@ -458,6 +587,14 @@ static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
 
+/* Channel hash table management functions */
+static LWLock *GetChannelHashLock(const char *channel);
+static inline void ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel);
+static void ChannelHashAddListener(const char *channel, ProcNumber procno);
+static void ChannelHashRemoveListener(const char *channel, ProcNumber procno);
+static ChannelEntry * ChannelHashLookup(const char *channel);
+static List *GetPendingNotifyChannels(void);
+
 /*
  * Compute the difference between two queue page numbers.
  * Previously this function accounted for a wraparound.
@@ -492,6 +629,12 @@ AsyncShmemSize(void)
 
 	size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
 
+	size = add_size(size, hash_estimate_size(CHANNEL_HASH_MAX_SIZE,
+											 sizeof(ChannelEntry)));
+
+	size = add_size(size, offsetof(ChannelHashLockData, locks) +
+					mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock)));
+
 	return size;
 }
 
@@ -546,6 +689,49 @@ AsyncShmemInit(void)
 		 */
 		(void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
 	}
+
+	/*
+	 * Create or attach to the channel hash table.
+	 */
+	{
+		HASHCTL		hash_ctl;
+
+		MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+		hash_ctl.keysize = sizeof(ChannelHashKey);
+		hash_ctl.entrysize = sizeof(ChannelEntry);
+		hash_ctl.hash = channel_hash_func;
+		hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS;
+
+		channelHash = ShmemInitHash("Channel Hash",
+									CHANNEL_HASH_INIT_SIZE,
+									CHANNEL_HASH_MAX_SIZE,
+									&hash_ctl,
+									HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+	}
+
+	/* Initialize locks for the partitioned hash table */
+	size = offsetof(ChannelHashLockData, locks) +
+		mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock));
+	channelHashLockData = (ChannelHashLockData *)
+		ShmemInitStruct("Channel Hash Lock Data", size, &found);
+	if (!found)
+	{
+		/* First time through: initialize the locks and tranche ID */
+		channelHashLockData->trancheId = LWLockNewTrancheId();
+		for (int i = 0; i < NUM_NOTIFY_PARTITIONS; i++)
+		{
+			LWLockInitialize(&channelHashLockData->locks[i],
+							 channelHashLockData->trancheId);
+		}
+	}
+
+	/*
+	 * Set up local pointers for convenience. We must also register the
+	 * tranche ID in every backend that will use these locks.
+	 */
+	channelHashLocks = channelHashLockData->locks;
+	channelHashTrancheId = channelHashLockData->trancheId;
+	LWLockRegisterTranche(channelHashTrancheId, "ChannelHashPartition");
 }
 
 
@@ -1110,6 +1296,7 @@ Exec_ListenPreCommit(void)
 		QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
 		QUEUE_FIRST_LISTENER = MyProcNumber;
 	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now we are listed in the global array, so remember we're listening */
@@ -1152,6 +1339,8 @@ Exec_ListenCommit(const char *channel)
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 	listenChannels = lappend(listenChannels, pstrdup(channel));
 	MemoryContextSwitchTo(oldcontext);
+
+	ChannelHashAddListener(channel, MyProcNumber);
 }
 
 /*
@@ -1175,6 +1364,7 @@ Exec_UnlistenCommit(const char *channel)
 		{
 			listenChannels = foreach_delete_current(listenChannels, q);
 			pfree(lchan);
+			ChannelHashRemoveListener(channel, MyProcNumber);
 			break;
 		}
 	}
@@ -1193,9 +1383,22 @@ Exec_UnlistenCommit(const char *channel)
 static void
 Exec_UnlistenAllCommit(void)
 {
+	ListCell   *p;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
+	/*
+	 * Before freeing the local list, iterate through it and perform a
+	 * targeted removal for each of our channels from the shared hash table.
+	 */
+	foreach(p, listenChannels)
+	{
+		char	   *channel = (char *) lfirst(p);
+
+		ChannelHashRemoveListener(channel, MyProcNumber);
+	}
+
 	list_free_deep(listenChannels);
 	listenChannels = NIL;
 }
@@ -1239,6 +1442,7 @@ asyncQueueUnregister(void)
 	 * Need exclusive lock here to manipulate list links.
 	 */
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
 	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
@@ -1565,12 +1769,18 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send.  However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind.  Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * This function operates in two modes:
+ * 1. Selective mode: When all pending notification channels have exactly one
+ *    listener each, we signal only those specific backends that are listening
+ *    on the channels with pending notifications.
+ * 2. Broadcast mode: When any channel has multiple listeners (or we ran out
+ *    of shared memory for the channel hash table), we signal all listening
+ *    backends in our database.
+ *
+ * In addition to the channel-specific signaling, we also implement a "wake
+ * only tail" optimization: we signal the backend that is furthest behind
+ * in the queue to help prevent backends from getting far behind and create
+ * a chain reaction of wake-ups. This avoids thundering herd problems.
  *
  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
  *
@@ -1583,6 +1793,11 @@ SignalBackends(void)
 	int32	   *pids;
 	ProcNumber *procnos;
 	int			count;
+	List	   *channels;
+	ListCell   *p;
+	bool	   *signaled;
+	bool		broadcast_mode = false;
+	bool		tail_woken = false;
 
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
@@ -1594,40 +1809,179 @@ SignalBackends(void)
 	 */
 	pids = (int32 *) palloc(MaxBackends * sizeof(int32));
 	procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+	signaled = (bool *) palloc0(MaxBackends * sizeof(bool));
 	count = 0;
 
+	/* Get list of channels that have pending notifications */
+	channels = GetPendingNotifyChannels();
+
+	/*
+	 * To prevent deadlocks, we must always acquire locks in the same order:
+	 * global NotifyQueueLock first, then individual partition locks.
+	 */
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+
+	/*
+	 * Determine if we can use targeted signaling or must broadcast. This
+	 * check must be done while holding NotifyQueueLock to prevent deadlocks
+	 * against other backends that might be modifying the listener list and
+	 * hash table simultaneously (e.g., asyncQueueUnregister).
+	 */
+	foreach(p, channels)
 	{
-		int32		pid = QUEUE_BACKEND_PID(i);
-		QueuePosition pos;
+		char	   *channel = (char *) lfirst(p);
+		ChannelEntry *entry;
+		LWLock	   *lock = GetChannelHashLock(channel);
+
+		LWLockAcquire(lock, LW_SHARED);
+		entry = ChannelHashLookup(channel);
+
+		/*
+		 * If there is no entry, it could mean we ran out of shared memory
+		 * when trying to add this channel to the hash table, so we need to
+		 * broadcast in that case as well.
+		 */
+		if (!entry || entry->has_multiple_listeners)
+		{
+			broadcast_mode = true;
+			LWLockRelease(lock);
+			break;
+		}
+		LWLockRelease(lock);
+	}
 
-		Assert(pid != InvalidPid);
-		pos = QUEUE_BACKEND_POS(i);
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+	if (broadcast_mode)
+	{
+		/*
+		 * In broadcast mode, we iterate over all listening backends and
+		 * signal the ones in our database that are not already caught up.
+		 */
+		for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
 		{
+			int32		pid;
+			QueuePosition pos;
+
+			if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+
 			/*
 			 * Always signal listeners in our own database, unless they're
-			 * already caught up (unlikely, but possible).
+			 * already caught up.
 			 */
 			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
 				continue;
+
+			pid = QUEUE_BACKEND_PID(i);
+			Assert(pid != InvalidPid);
+
+			/* OK, need to signal this one */
+			pids[count] = pid;
+			procnos[count] = i;
+			signaled[i] = true;
+			count++;
 		}
-		else
+	}
+	else
+	{
+		/*
+		 * In targeted mode, signal specific listening backends. We must
+		 * re-check the hash entries here inside the lock to avoid races.
+		 */
+		foreach(p, channels)
 		{
-			/*
-			 * Listeners in other databases should be signaled only if they
-			 * are far behind.
-			 */
-			if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-								   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
-				continue;
+			char	   *channel = (char *) lfirst(p);
+			ChannelEntry *entry;
+			LWLock	   *lock = GetChannelHashLock(channel);
+
+			LWLockAcquire(lock, LW_SHARED);
+			entry = ChannelHashLookup(channel);
+
+			if (entry && !entry->has_multiple_listeners)
+			{
+				ProcNumber	i = entry->listener;
+				int32		pid;
+				QueuePosition pos;
+
+				if (signaled[i])
+				{
+					LWLockRelease(lock);
+					continue;
+				}
+
+				pos = QUEUE_BACKEND_POS(i);
+
+				if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+				{
+					LWLockRelease(lock);
+					continue;
+				}
+
+				if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+				{
+					LWLockRelease(lock);
+					continue;
+				}
+
+				pid = QUEUE_BACKEND_PID(i);
+				Assert(pid != InvalidPid);
+
+				pids[count] = pid;
+				procnos[count] = i;
+				signaled[i] = true;
+				count++;
+			}
+			LWLockRelease(lock);
 		}
+	}
+
+	/*
+	 * Also check for any backends that are far behind. This ensures the
+	 * global tail can advance even if they're not actively receiving
+	 * notifications on their channels.
+	 */
+	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+	{
+		int32		pid;
+		QueuePosition pos;
+
+		/*
+		 * Skip if we've already decided to signal this one.
+		 */
+		if (signaled[i])
+			continue;
+
+		pos = QUEUE_BACKEND_POS(i);
+
+		/*
+		 * Skip signaling listeners if they already caught up.
+		 */
+		if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+			continue;
+
+		/*
+		 * Wake only tail optimization: Signal the backend that is furthest
+		 * behind to help prevent backends from getting far behind in the
+		 * first place. This finds the backend(s) on the same page as the
+		 * global tail, which are the ones holding up truncation. This creates
+		 * a chain reaction where each backend eventually wakes up the next
+		 * one as notifications are processed, avoiding thundering herd.
+		 */
+		if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
+											  QUEUE_POS_PAGE(pos)) == 0)
+			tail_woken = true;
+		else
+			continue;
+
+		pid = QUEUE_BACKEND_PID(i);
+		Assert(pid != InvalidPid);
 		/* OK, need to signal this one */
 		pids[count] = pid;
 		procnos[count] = i;
 		count++;
 	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	/* Now send signals */
@@ -1647,9 +2001,9 @@ SignalBackends(void)
 
 		/*
 		 * Note: assuming things aren't broken, a signal failure here could
-		 * only occur if the target backend exited since we released
-		 * NotifyQueueLock; which is unlikely but certainly possible. So we
-		 * just log a low-level debug message if it happens.
+		 * only occur if the target backend exited since we released the lock;
+		 * which is unlikely but certainly possible. So we just log a
+		 * low-level debug message if it happens.
 		 */
 		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
 			elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
@@ -1657,6 +2011,7 @@ SignalBackends(void)
 
 	pfree(pids);
 	pfree(procnos);
+	pfree(signaled);
 }
 
 /*
@@ -2395,3 +2750,256 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+/*
+ * Channel hash table management functions
+ */
+
+/*
+ * channel_hash_func
+ *     Custom hash function for the channel hash table. This function ensures
+ *     that the low-order bits of the hash are well-distributed, which is
+ *     critical for partitioned hash tables.
+ */
+static uint32
+channel_hash_func(const void *key, Size keysize)
+{
+	const		ChannelHashKey *k = (const ChannelHashKey *) key;
+	uint32		h;
+
+	/*
+	 * Mix the dboid and the channel name to produce a good hash. hash_any()
+	 * is a high-quality portable hash function. This prevents channels with
+	 * the same name in different databases from always mapping to the same
+	 * partition.
+	 */
+	h = DatumGetUInt32(hash_uint32(k->dboid));
+	h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+								 strnlen(k->channel, NAMEDATALEN)));
+
+	return h;
+}
+
+/*
+ * GetChannelHashLock
+ *     Return the LWLock that protects the partition for the given channel name.
+ */
+static LWLock *
+GetChannelHashLock(const char *channel)
+{
+	ChannelHashKey key;
+	uint32		hash;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+	hash = get_hash_value(GetChannelHash(), &key);
+
+	return &channelHashLocks[hash % NUM_NOTIFY_PARTITIONS];
+}
+
+/*
+ * ChannelHashPrepareKey
+ *		Prepare a channel key (database OID + channel name) for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel)
+{
+	memset(key, 0, sizeof(ChannelHashKey));
+	key->dboid = dboid;
+	strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * ChannelHashAddListener
+ *     Register the given backend as a listener for the specified channel.
+ *
+ * This function uses an optimistic read-locking strategy to maximize
+ * concurrency when many backends listen on the same channel.
+ *
+ * 1. It first takes a shared lock and checks the channel's state. If the
+ *    channel is already marked as having multiple listeners, no write is
+ *    needed, and we can return immediately. This is the fast path for the
+ *    3rd, 4th, etc., listener on a given channel.
+ *
+ * 2. If a write is needed (either to create the entry or to mark it as
+ *    multi-listener), it releases the shared lock and acquires an exclusive
+ *    lock.
+ *
+ * 3. CRUCIALLY, after acquiring the exclusive lock, it must re-check the
+ *    state, as another backend may have modified the entry in the interim.
+ */
+static void
+ChannelHashAddListener(const char *channel, ProcNumber procno)
+{
+	ChannelEntry *entry;
+	bool		found;
+	ChannelHashKey key;
+	LWLock	   *lock = GetChannelHashLock(channel);
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/*
+	 * FAST PATH: Optimistically take a shared lock. If the channel already
+	 * has multiple listeners, we don't need to do anything.
+	 */
+	LWLockAcquire(lock, LW_SHARED);
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (entry && entry->has_multiple_listeners)
+	{
+		LWLockRelease(lock);
+		return;
+	}
+	LWLockRelease(lock);
+
+	/*
+	 * SLOW PATH: We need to write. Acquire exclusive lock.
+	 */
+	LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	/*
+	 * Re-check state after acquiring exclusive lock, as it may have changed.
+	 */
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_ENTER_NULL, &found);
+
+	if (entry == NULL)
+	{
+		/* Out of memory in the hash partition. */
+		ereport(DEBUG1, (errmsg("too many notification channels are already being tracked")));
+		LWLockRelease(lock);
+		return;
+	}
+
+	if (!found)
+	{
+		/* We are the first listener. */
+		entry->listener = procno;
+		entry->has_multiple_listeners = false;
+	}
+	else if (!entry->has_multiple_listeners)
+	{
+		/* We are the second listener. */
+		if (entry->listener != procno)
+		{
+			entry->has_multiple_listeners = true;
+			entry->listener = INVALID_PROC_NUMBER;
+		}
+	}
+	/* If entry->has_multiple_listeners is now true, do nothing. */
+	LWLockRelease(lock);
+}
+
+/*
+ * ChannelHashRemoveListener
+ *		Update the channel hash when a backend stops listening on a channel.
+ *
+ * This function uses an optimistic read-lock strategy to maximize concurrency.
+ * An exclusive lock is only taken if we are the sole listener on a channel
+ * and need to remove the entry from the hash table.
+ */
+static void
+ChannelHashRemoveListener(const char *channel, ProcNumber procno)
+{
+	ChannelEntry *entry;
+	ChannelHashKey key;
+	LWLock	   *lock = GetChannelHashLock(channel);
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/*
+	 * Take a shared lock first to see if a removal is even necessary. If the
+	 * entry doesn't exist, or it's a multi-listener entry, we have nothing to
+	 * do. This is the fast path.
+	 */
+	LWLockAcquire(lock, LW_SHARED);
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (!entry || entry->has_multiple_listeners || entry->listener != procno)
+	{
+		LWLockRelease(lock);
+		return;
+	}
+	LWLockRelease(lock);
+
+	/*
+	 * A removal is likely needed. Acquire an exclusive lock.
+	 */
+	LWLockAcquire(lock, LW_EXCLUSIVE);
+
+	/*
+	 * Re-check the state, as another backend might have changed it. The only
+	 * state change we care about is if it became a multi-listener channel, in
+	 * which case we should no longer remove it.
+	 */
+	entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+	if (entry && !entry->has_multiple_listeners && entry->listener == procno)
+	{
+		/* Still a single-listener entry for us, so remove it. */
+		(void) hash_search(GetChannelHash(), &key, HASH_REMOVE, NULL);
+	}
+	LWLockRelease(lock);
+}
+
+/*
+ * ChannelHashLookup
+ *		Look up the channel hash entry for the given channel name in the
+ *		current database.
+ *
+ * Returns NULL if the channel is not being tracked (no listeners, or channel
+ * fell back to broadcast mode because we ran out of shared memory when trying
+ * to add entries to the hash table).
+ *
+ * Caller must hold the appropriate partition lock (shared is sufficient).
+ */
+static ChannelEntry *
+ChannelHashLookup(const char *channel)
+{
+	ChannelHashKey key;
+
+	Assert(LWLockHeldByMe(GetChannelHashLock(channel)));
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	return (ChannelEntry *) hash_search(GetChannelHash(),
+										&key,
+										HASH_FIND,
+										NULL);
+}
+
+/*
+ * GetPendingNotifyChannels
+ *		Get list of unique channel names from pending notifications.
+ */
+static List *
+GetPendingNotifyChannels(void)
+{
+	List	   *channels = NIL;
+	ListCell   *p;
+	ListCell   *q;
+	bool		found;
+
+	if (!pendingNotifies)
+		return NIL;
+
+	/* Collect unique channel names from pending notifications */
+	foreach(p, pendingNotifies->events)
+	{
+		Notification *n = (Notification *) lfirst(p);
+		char	   *channel = n->data;
+
+		/* Check if we already have this channel in our list */
+		found = false;
+		foreach(q, channels)
+		{
+			char	   *existing = (char *) lfirst(q);
+
+			if (strcmp(existing, channel) == 0)
+			{
+				found = true;
+				break;
+			}
+		}
+
+		if (!found)
+			channels = lappend(channels, channel);
+	}
+
+	return channels;
+}
-- 
2.47.1

