From 151b803346562e0553dd7985d1cb42e6c309d0ab Mon Sep 17 00:00:00 2001
From: "Boudreau, Trey" <trey@treysoft.com>
Date: Tue, 24 Dec 2024 11:18:03 -0600
Subject: [PATCH v1 2/3] Improve LISTEN-NOTIFY-UNLISTEN performance.

Track LISTEN channels with a simplehash.h table instead of a List.
You won't notice a difference in performance unless you have more more
than a handful of channels.

Extend the test case to cover pg_listening_channels() and verify
that unlistening an already unlistened channel has no side effects.
---
 src/backend/commands/async.c        | 113 +++++++++++++++++-----------
 src/test/regress/expected/async.out |   9 +++
 src/test/regress/sql/async.sql      |   4 +
 3 files changed, 81 insertions(+), 45 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8ed503e1c1..54f47f024f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -312,12 +312,38 @@ static SlruCtlData NotifyCtlData;
 
 #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
 
+typedef struct
+{
+	/* needed for simplehash */
+	char	   *channel;
+	uint32		hash;
+	char		status;
+}			ListenHashEntry;
+
+#define SH_PREFIX		listen
+#define SH_ELEMENT_TYPE	ListenHashEntry
+#define SH_KEY			channel
+#define SH_KEY_TYPE		const char *
+#define SH_EQUAL(tb, a, b)	(strcmp(a, b) == 0)
+#define SH_STORE_HASH
+#define SH_MAKE_KEY(tb, k)	MemoryContextStrdup(tb->ctx, k);
+#define SH_UNMAKE_KEY(tb, k)	pfree(k)
+#define SH_HASH_KEY(tb, k)	hash_bytes((const unsigned char *)k, strlen(k))
+#define SH_GET_HASH(tb, a)	a->hash
+#define SH_SCOPE		static inline
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
 /*
  * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on).  It is a simple list of channel names,
+ * (ie, have committed a LISTEN on).  It is a simplehash.h of channel names,
  * allocated in TopMemoryContext.
  */
-static List *listenChannels = NIL;	/* list of C strings */
+static listen_hash * listenChannels = NULL; /* hash table of ListenHashEntry */
+
+/* Initialize the hash table with this many elements when we start listening. */
+#define LISTEN_HASH_START_SIZE 16
 
 /*
  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
@@ -790,23 +816,30 @@ Datum
 pg_listening_channels(PG_FUNCTION_ARGS)
 {
 	FuncCallContext *funcctx;
+	ListenHashEntry *item;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
+		if (listenChannels)
+		{
+			funcctx->user_fctx = MemoryContextAlloc(funcctx->multi_call_memory_ctx, sizeof(listen_iterator));
+			listen_start_iterate(listenChannels, (listen_iterator *) funcctx->user_fctx);
+		}
 	}
 
 	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();
 
-	if (funcctx->call_cntr < list_length(listenChannels))
+	if (listenChannels)
 	{
-		char	   *channel = (char *) list_nth(listenChannels,
-												funcctx->call_cntr);
-
-		SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+		/* next item from the hash table iterator */
+		item = listen_iterate(listenChannels, (listen_iterator *) funcctx->user_fctx);
+		/* iterate has skipped over all empty slots for us */
+		if (item)
+			SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(item->channel));
 	}
 
 	SRF_RETURN_DONE(funcctx);
@@ -1002,7 +1035,7 @@ AtCommit_Notify(void)
 	}
 
 	/* If no longer listening to anything, get out of listener array */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener && (listenChannels->members == 0))
 		asyncQueueUnregister();
 
 	/*
@@ -1112,6 +1145,9 @@ Exec_ListenPreCommit(void)
 	}
 	LWLockRelease(NotifyQueueLock);
 
+	/* Create the structures to manage listens */
+	listenChannels = listen_create(TopMemoryContext, LISTEN_HASH_START_SIZE, NULL);
+
 	/* Now we are listed in the global array, so remember we're listening */
 	amRegisteredListener = true;
 
@@ -1135,11 +1171,9 @@ Exec_ListenPreCommit(void)
 static void
 Exec_ListenCommit(const char *channel)
 {
-	MemoryContext oldcontext;
+	bool		found;
 
-	/* Do nothing if we are already listening on this channel */
-	if (IsListeningOn(channel))
-		return;
+	Assert(listenChannels != NULL);
 
 	/*
 	 * Add the new channel name to listenChannels.
@@ -1148,10 +1182,10 @@ Exec_ListenCommit(const char *channel)
 	 * which would be bad because we already committed.  For the moment it
 	 * doesn't seem worth trying to guard against that, but maybe improve this
 	 * later.
+	 *
+	 * Does nothing if we are already listening on this channel.
 	 */
-	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
-	listenChannels = lappend(listenChannels, pstrdup(channel));
-	MemoryContextSwitchTo(oldcontext);
+	listen_insert(listenChannels, channel, &found);
 }
 
 /*
@@ -1162,22 +1196,14 @@ Exec_ListenCommit(const char *channel)
 static void
 Exec_UnlistenCommit(const char *channel)
 {
-	ListCell   *q;
-
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
 
-	foreach(q, listenChannels)
-	{
-		char	   *lchan = (char *) lfirst(q);
+	/* Ignore UNLISTEN when not listening */
+	if (!listenChannels)
+		return;
 
-		if (strcmp(lchan, channel) == 0)
-		{
-			listenChannels = foreach_delete_current(listenChannels, q);
-			pfree(lchan);
-			break;
-		}
-	}
+	listen_delete(listenChannels, channel);
 
 	/*
 	 * We do not complain about unlistening something not being listened;
@@ -1196,31 +1222,24 @@ Exec_UnlistenAllCommit(void)
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
-	list_free_deep(listenChannels);
-	listenChannels = NIL;
+	/* Ignore UNLISTEN * when not listening */
+	if (!listenChannels)
+		return;
+
+	listen_reset(listenChannels);
 }
 
 /*
  * Test whether we are actively listening on the given channel name.
  *
  * Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it.  In practice the list is likely to be
- * fairly short, though.
  */
 static bool
 IsListeningOn(const char *channel)
 {
-	ListCell   *p;
-
-	foreach(p, listenChannels)
-	{
-		char	   *lchan = (char *) lfirst(p);
-
-		if (strcmp(lchan, channel) == 0)
-			return true;
-	}
-	return false;
+	if (listenChannels == NULL)
+		return false;
+	return listen_lookup(listenChannels, channel) != NULL;
 }
 
 /*
@@ -1230,11 +1249,15 @@ IsListeningOn(const char *channel)
 static void
 asyncQueueUnregister(void)
 {
-	Assert(listenChannels == NIL);	/* else caller error */
+	Assert((listenChannels == NULL) || (listenChannels->members == 0)); /* else caller error */
 
 	if (!amRegisteredListener)	/* nothing to do */
 		return;
 
+	/* Release our listen_hash data structures */
+	listen_destroy(listenChannels);
+	listenChannels = NULL;
+
 	/*
 	 * Need exclusive lock here to manipulate list links.
 	 */
@@ -1675,7 +1698,7 @@ AtAbort_Notify(void)
 	 * we have registered as a listener but have not made any entry in
 	 * listenChannels.  In that case, deregister again.
 	 */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener && (listenChannels->members == 0))
 		asyncQueueUnregister();
 
 	/* And clean up */
@@ -2186,7 +2209,7 @@ ProcessIncomingNotify(bool flush)
 	notifyInterruptPending = false;
 
 	/* Do nothing else if we aren't actively listening */
-	if (listenChannels == NIL)
+	if (listenChannels == NULL)
 		return;
 
 	if (Trace_notify)
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38e63..e01c953ae9 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -30,7 +30,16 @@ ERROR:  channel name too long
 --Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
 NOTIFY notify_async2;
 LISTEN notify_async2;
+SELECT pg_listening_channels(); -- expect one entry
+ pg_listening_channels 
+-----------------------
+ notify_async2
+(1 row)
+
 UNLISTEN notify_async2;
+--Should work. Ignore unlistened channels
+UNLISTEN notify_async2; -- no-op
+--Should work. Ignore unlisten all with no channels
 UNLISTEN *;
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01538..382c80ac2a 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -15,7 +15,11 @@ SELECT pg_notify('notify_async_channel_name_too_long____________________________
 --Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
 NOTIFY notify_async2;
 LISTEN notify_async2;
+SELECT pg_listening_channels(); -- expect one entry
 UNLISTEN notify_async2;
+--Should work. Ignore unlistened channels
+UNLISTEN notify_async2; -- no-op
+--Should work. Ignore unlisten all with no channels
 UNLISTEN *;
 
 -- Should return zero while there are no pending notifications.
-- 
2.43.0

