Hi,

I've in the past wanted to listen on all notification channels in the current database for debugging purposes. But recently I came across another use case. Since having multiple postgres backends listening for notifications is very inefficient (one Thursday I found out 30% of our CPU time was spent spinning on s_locks around the notification code), it makes sense to implement a notification server of sorts which then passes on notifications from postgres to interested clients. A server like this would be a lot more simple to implement if there was a way to announce that the client wants to see all notifications, regardless of the name of the channel.

Attached is a very crude proof-of-concept patch implementing this. Any thoughts on the idea?


.m
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 294,299 **** static SlruCtlData AsyncCtlData;
--- 294,304 ----
  static List *listenChannels = NIL;            /* list of C strings */
  
  /*
+  * If listenWildcard is set, we're listening on all channels.
+  */
+ static bool listenWildcard = false;
+ 
+ /*
   * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
   * all actions requested in the current transaction.  As explained above,
   * we don't actually change listenChannels until we reach transaction commit.
***************
*** 306,311 **** static List *listenChannels = NIL;             /* list of C 
strings */
--- 311,317 ----
  typedef enum
  {
        LISTEN_LISTEN,
+       LISTEN_LISTEN_ALL,
        LISTEN_UNLISTEN,
        LISTEN_UNLISTEN_ALL
  } ListenActionKind;
***************
*** 373,378 **** static void queue_listen(ListenActionKind action, const char 
*channel);
--- 379,385 ----
  static void Async_UnlistenOnExit(int code, Datum arg);
  static void Exec_ListenPreCommit(void);
  static void Exec_ListenCommit(const char *channel);
+ static void Exec_ListenAllCommit(void);
  static void Exec_UnlistenCommit(const char *channel);
  static void Exec_UnlistenAllCommit(void);
  static bool IsListeningOn(const char *channel);
***************
*** 598,604 **** Async_Notify(const char *channel, const char *payload)
  
  /*
   * queue_listen
!  *            Common code for listen, unlisten, unlisten all commands.
   *
   *            Adds the request to the list of pending actions.
   *            Actual update of the listenChannels list happens during 
transaction
--- 605,611 ----
  
  /*
   * queue_listen
!  *            Common code for listen, listen all, unlisten, unlisten all 
commands.
   *
   *            Adds the request to the list of pending actions.
   *            Actual update of the listenChannels list happens during 
transaction
***************
*** 613,620 **** queue_listen(ListenActionKind action, const char *channel)
        /*
         * Unlike Async_Notify, we don't try to collapse out duplicates. It 
would
         * be too complicated to ensure we get the right interactions of
!        * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that 
there
!        * would be any performance benefit anyway in sane applications.
         */
        oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
--- 620,627 ----
        /*
         * Unlike Async_Notify, we don't try to collapse out duplicates. It 
would
         * be too complicated to ensure we get the right interactions of
!        * conflicting LISTEN/LISTEN_ALL/UNLISTEN/UNLISTEN_ALL, and it's 
unlikely
!        * that there would be any performance benefit anyway in sane 
applications.
         */
        oldcontext = MemoryContextSwitchTo(CurTransactionContext);
  
***************
*** 644,649 **** Async_Listen(const char *channel)
--- 651,671 ----
  }
  
  /*
+  * Async_ListenAll
+  *
+  *            This is executed by the LISTEN * command.
+  */
+ void
+ Async_ListenAll(void)
+ {
+       if (Trace_notify)
+               elog(DEBUG1, "Async_ListenAll(%d)", MyProcPid);
+ 
+       queue_listen(LISTEN_LISTEN_ALL, "");
+ }
+ 
+ 
+ /*
   * Async_Unlisten
   *
   *            This is executed by the SQL unlisten command.
***************
*** 790,795 **** PreCommit_Notify(void)
--- 812,818 ----
                switch (actrec->action)
                {
                        case LISTEN_LISTEN:
+                       case LISTEN_LISTEN_ALL:
                                Exec_ListenPreCommit();
                                break;
                        case LISTEN_UNLISTEN:
***************
*** 895,900 **** AtCommit_Notify(void)
--- 918,926 ----
                        case LISTEN_LISTEN:
                                Exec_ListenCommit(actrec->channel);
                                break;
+                       case LISTEN_LISTEN_ALL:
+                               Exec_ListenAllCommit();
+                               break;
                        case LISTEN_UNLISTEN:
                                Exec_UnlistenCommit(actrec->channel);
                                break;
***************
*** 905,911 **** AtCommit_Notify(void)
        }
  
        /* If no longer listening to anything, get out of listener array */
!       if (amRegisteredListener && listenChannels == NIL)
                asyncQueueUnregister();
  
        /* And clean up */
--- 931,937 ----
        }
  
        /* If no longer listening to anything, get out of listener array */
!       if (amRegisteredListener && listenChannels == NIL && !listenWildcard)
                asyncQueueUnregister();
  
        /* And clean up */
***************
*** 1026,1031 **** Exec_ListenCommit(const char *channel)
--- 1052,1076 ----
  }
  
  /*
+  * Exec_ListenAllCommit --- subroutine for AtCommit_Notify
+  *
+  * Start listening on all notification channels.
+  */
+ static void
+ Exec_ListenAllCommit(void)
+ {
+       listenWildcard = true;
+ 
+       /*
+        * We can forget all notification channels right away.  The only way to
+        * undo a "LISTEN *" is to "UNLISTEN *", and in that case we'd just 
release
+        * the list of channels anyway.
+        */
+       list_free_deep(listenChannels);
+       listenChannels = NIL;
+ }
+ 
+ /*
   * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
   *
   * Remove the specified channel name from listenChannels.
***************
*** 1072,1077 **** Exec_UnlistenAllCommit(void)
--- 1117,1123 ----
  
        list_free_deep(listenChannels);
        listenChannels = NIL;
+       listenWildcard = false;
  }
  
  /*
***************
*** 1130,1136 **** ProcessCompletedNotifies(void)
        /* Send signals to other backends */
        signalled = SignalBackends();
  
!       if (listenChannels != NIL)
        {
                /* Read the queue ourselves, and send relevant stuff to the 
frontend */
                asyncQueueReadAllNotifications();
--- 1176,1182 ----
        /* Send signals to other backends */
        signalled = SignalBackends();
  
!       if (listenChannels != NIL || listenWildcard)
        {
                /* Read the queue ourselves, and send relevant stuff to the 
frontend */
                asyncQueueReadAllNotifications();
***************
*** 1956,1962 **** asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                /* qe->data is the null-terminated channel name 
*/
                                char       *channel = qe->data;
  
!                               if (IsListeningOn(channel))
                                {
                                        /* payload follows channel name */
                                        char       *payload = qe->data + 
strlen(channel) + 1;
--- 2002,2008 ----
                                /* qe->data is the null-terminated channel name 
*/
                                char       *channel = qe->data;
  
!                               if (listenWildcard || IsListeningOn(channel))
                                {
                                        /* payload follows channel name */
                                        char       *payload = qe->data + 
strlen(channel) + 1;
***************
*** 2044,2050 **** ProcessIncomingNotify(void)
        notifyInterruptPending = false;
  
        /* Do nothing else if we aren't actively listening */
!       if (listenChannels == NIL)
                return;
  
        if (Trace_notify)
--- 2090,2096 ----
        notifyInterruptPending = false;
  
        /* Do nothing else if we aren't actively listening */
!       if (listenChannels == NIL && !listenWildcard)
                return;
  
        if (Trace_notify)
*** a/src/backend/parser/gram.y
--- b/src/backend/parser/gram.y
***************
*** 8543,8548 **** ListenStmt: LISTEN ColId
--- 8543,8554 ----
                                        n->conditionname = $2;
                                        $$ = (Node *)n;
                                }
+                       | LISTEN '*'
+                               {
+                                       ListenStmt *n = makeNode(ListenStmt);
+                                       n->conditionname = NULL;
+                                       $$ = (Node *)n;
+                               }
                ;
  
  UnlistenStmt:
*** a/src/backend/tcop/utility.c
--- b/src/backend/tcop/utility.c
***************
*** 609,615 **** standard_ProcessUtility(Node *parsetree,
  
                                PreventCommandDuringRecovery("LISTEN");
                                CheckRestrictedOperation("LISTEN");
!                               Async_Listen(stmt->conditionname);
                        }
                        break;
  
--- 609,618 ----
  
                                PreventCommandDuringRecovery("LISTEN");
                                CheckRestrictedOperation("LISTEN");
!                               if (stmt->conditionname)
!                                       Async_Listen(stmt->conditionname);
!                               else
!                                       Async_ListenAll();
                        }
                        break;
  
*** a/src/include/commands/async.h
--- b/src/include/commands/async.h
***************
*** 31,36 **** extern void AsyncShmemInit(void);
--- 31,37 ----
  /* notify-related SQL statements */
  extern void Async_Notify(const char *channel, const char *payload);
  extern void Async_Listen(const char *channel);
+ extern void Async_ListenAll(void);
  extern void Async_Unlisten(const char *channel);
  extern void Async_UnlistenAll(void);
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to