Hi,
I've in the past wanted to listen on all notification channels in the
current database for debugging purposes. But recently I came across
another use case. Since having multiple postgres backends listening for
notifications is very inefficient (one Thursday I found out 30% of our
CPU time was spent spinning on s_locks around the notification code), it
makes sense to implement a notification server of sorts which then
passes on notifications from postgres to interested clients. A server
like this would be a lot more simple to implement if there was a way to
announce that the client wants to see all notifications, regardless of
the name of the channel.
Attached is a very crude proof-of-concept patch implementing this. Any
thoughts on the idea?
.m
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 294,299 **** static SlruCtlData AsyncCtlData;
--- 294,304 ----
static List *listenChannels = NIL; /* list of C strings */
/*
+ * If listenWildcard is set, we're listening on all channels.
+ */
+ static bool listenWildcard = false;
+
+ /*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction. As explained above,
* we don't actually change listenChannels until we reach transaction commit.
***************
*** 306,311 **** static List *listenChannels = NIL; /* list of C
strings */
--- 311,317 ----
typedef enum
{
LISTEN_LISTEN,
+ LISTEN_LISTEN_ALL,
LISTEN_UNLISTEN,
LISTEN_UNLISTEN_ALL
} ListenActionKind;
***************
*** 373,378 **** static void queue_listen(ListenActionKind action, const char
*channel);
--- 379,385 ----
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void);
static void Exec_ListenCommit(const char *channel);
+ static void Exec_ListenAllCommit(void);
static void Exec_UnlistenCommit(const char *channel);
static void Exec_UnlistenAllCommit(void);
static bool IsListeningOn(const char *channel);
***************
*** 598,604 **** Async_Notify(const char *channel, const char *payload)
/*
* queue_listen
! * Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
* Actual update of the listenChannels list happens during
transaction
--- 605,611 ----
/*
* queue_listen
! * Common code for listen, listen all, unlisten, unlisten all
commands.
*
* Adds the request to the list of pending actions.
* Actual update of the listenChannels list happens during
transaction
***************
*** 613,620 **** queue_listen(ListenActionKind action, const char *channel)
/*
* Unlike Async_Notify, we don't try to collapse out duplicates. It
would
* be too complicated to ensure we get the right interactions of
! * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that
there
! * would be any performance benefit anyway in sane applications.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
--- 620,627 ----
/*
* Unlike Async_Notify, we don't try to collapse out duplicates. It
would
* be too complicated to ensure we get the right interactions of
! * conflicting LISTEN/LISTEN_ALL/UNLISTEN/UNLISTEN_ALL, and it's
unlikely
! * that there would be any performance benefit anyway in sane
applications.
*/
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
***************
*** 644,649 **** Async_Listen(const char *channel)
--- 651,671 ----
}
/*
+ * Async_ListenAll
+ *
+ * This is executed by the LISTEN * command.
+ */
+ void
+ Async_ListenAll(void)
+ {
+ if (Trace_notify)
+ elog(DEBUG1, "Async_ListenAll(%d)", MyProcPid);
+
+ queue_listen(LISTEN_LISTEN_ALL, "");
+ }
+
+
+ /*
* Async_Unlisten
*
* This is executed by the SQL unlisten command.
***************
*** 790,795 **** PreCommit_Notify(void)
--- 812,818 ----
switch (actrec->action)
{
case LISTEN_LISTEN:
+ case LISTEN_LISTEN_ALL:
Exec_ListenPreCommit();
break;
case LISTEN_UNLISTEN:
***************
*** 895,900 **** AtCommit_Notify(void)
--- 918,926 ----
case LISTEN_LISTEN:
Exec_ListenCommit(actrec->channel);
break;
+ case LISTEN_LISTEN_ALL:
+ Exec_ListenAllCommit();
+ break;
case LISTEN_UNLISTEN:
Exec_UnlistenCommit(actrec->channel);
break;
***************
*** 905,911 **** AtCommit_Notify(void)
}
/* If no longer listening to anything, get out of listener array */
! if (amRegisteredListener && listenChannels == NIL)
asyncQueueUnregister();
/* And clean up */
--- 931,937 ----
}
/* If no longer listening to anything, get out of listener array */
! if (amRegisteredListener && listenChannels == NIL && !listenWildcard)
asyncQueueUnregister();
/* And clean up */
***************
*** 1026,1031 **** Exec_ListenCommit(const char *channel)
--- 1052,1076 ----
}
/*
+ * Exec_ListenAllCommit --- subroutine for AtCommit_Notify
+ *
+ * Start listening on all notification channels.
+ */
+ static void
+ Exec_ListenAllCommit(void)
+ {
+ listenWildcard = true;
+
+ /*
+ * We can forget all notification channels right away. The only way to
+ * undo a "LISTEN *" is to "UNLISTEN *", and in that case we'd just
release
+ * the list of channels anyway.
+ */
+ list_free_deep(listenChannels);
+ listenChannels = NIL;
+ }
+
+ /*
* Exec_UnlistenCommit --- subroutine for AtCommit_Notify
*
* Remove the specified channel name from listenChannels.
***************
*** 1072,1077 **** Exec_UnlistenAllCommit(void)
--- 1117,1123 ----
list_free_deep(listenChannels);
listenChannels = NIL;
+ listenWildcard = false;
}
/*
***************
*** 1130,1136 **** ProcessCompletedNotifies(void)
/* Send signals to other backends */
signalled = SignalBackends();
! if (listenChannels != NIL)
{
/* Read the queue ourselves, and send relevant stuff to the
frontend */
asyncQueueReadAllNotifications();
--- 1176,1182 ----
/* Send signals to other backends */
signalled = SignalBackends();
! if (listenChannels != NIL || listenWildcard)
{
/* Read the queue ourselves, and send relevant stuff to the
frontend */
asyncQueueReadAllNotifications();
***************
*** 1956,1962 **** asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* qe->data is the null-terminated channel name
*/
char *channel = qe->data;
! if (IsListeningOn(channel))
{
/* payload follows channel name */
char *payload = qe->data +
strlen(channel) + 1;
--- 2002,2008 ----
/* qe->data is the null-terminated channel name
*/
char *channel = qe->data;
! if (listenWildcard || IsListeningOn(channel))
{
/* payload follows channel name */
char *payload = qe->data +
strlen(channel) + 1;
***************
*** 2044,2050 **** ProcessIncomingNotify(void)
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
! if (listenChannels == NIL)
return;
if (Trace_notify)
--- 2090,2096 ----
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
! if (listenChannels == NIL && !listenWildcard)
return;
if (Trace_notify)
*** a/src/backend/parser/gram.y
--- b/src/backend/parser/gram.y
***************
*** 8543,8548 **** ListenStmt: LISTEN ColId
--- 8543,8554 ----
n->conditionname = $2;
$$ = (Node *)n;
}
+ | LISTEN '*'
+ {
+ ListenStmt *n = makeNode(ListenStmt);
+ n->conditionname = NULL;
+ $$ = (Node *)n;
+ }
;
UnlistenStmt:
*** a/src/backend/tcop/utility.c
--- b/src/backend/tcop/utility.c
***************
*** 609,615 **** standard_ProcessUtility(Node *parsetree,
PreventCommandDuringRecovery("LISTEN");
CheckRestrictedOperation("LISTEN");
! Async_Listen(stmt->conditionname);
}
break;
--- 609,618 ----
PreventCommandDuringRecovery("LISTEN");
CheckRestrictedOperation("LISTEN");
! if (stmt->conditionname)
! Async_Listen(stmt->conditionname);
! else
! Async_ListenAll();
}
break;
*** a/src/include/commands/async.h
--- b/src/include/commands/async.h
***************
*** 31,36 **** extern void AsyncShmemInit(void);
--- 31,37 ----
/* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload);
extern void Async_Listen(const char *channel);
+ extern void Async_ListenAll(void);
extern void Async_Unlisten(const char *channel);
extern void Async_UnlistenAll(void);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers