Hello hackers,

I present a patch to add a new built-in function
pg_notify_queue_saturation().

The purpose of the function is to allow users to monitor the health of
their notification queue.  In certain cases, a client connection listening
for notifications might get stuck inside a transaction, and this would
cause the queue to keep filling up, until finally it reaches capacity and
further attempts to NOTIFY error out.

The current documentation under LISTEN explains this possible gotcha, but
doesn't really suggest a useful way to address it, except to mention that
warnings will show up in the log once you get to 50% saturation of the
queue.  Unless you happen to be eyeballing the logs when it happens, that's
not a huge help.  The choice of 50% as a threshold is also very much
arbitrary, and by the time you hit 50% the problem has likely been going on
for quite a while.  If you want your nagios (or whatever) to say, alert you
when the queue goes over 5% or 1%, your options are limited and awkward.

The patch has almost no new code.  It makes use of the existing logic for
the 50% warning.  I simply refactored that logic into a separate function
asyncQueueSaturation, and then added pg_notify_queue_saturation to make
that available in SQL.

I am not convinced that pg_notify_queue_saturation is the best possible
name for this function, and am very much open to other suggestions.

The patch includes documentation, a regression test and an isolation test.

Cheers,
BJ
*** a/doc/src/sgml/func.sgml
--- b/doc/src/sgml/func.sgml
***************
*** 14800,14805 **** SELECT * FROM pg_ls_dir('.') WITH ORDINALITY AS t(ls,n);
--- 14800,14811 ----
        </row>
  
        <row>
+        <entry><literal><function>pg_notify_queue_saturation()</function></literal></entry>
+        <entry><type>double</type></entry>
+        <entry>proportion of the asynchronous notification queue currently occupied</entry>
+       </row>
+ 
+       <row>
         <entry><literal><function>pg_my_temp_schema()</function></literal></entry>
         <entry><type>oid</type></entry>
         <entry>OID of session's temporary schema, or 0 if none</entry>
***************
*** 14939,14948 **** SET search_path TO <replaceable>schema</> <optional>, <replaceable>schema</>, ..
      <primary>pg_listening_channels</primary>
     </indexterm>
  
     <para>
      <function>pg_listening_channels</function> returns a set of names of
!     channels that the current session is listening to.  See <xref
!     linkend="sql-listen"> for more information.
     </para>
  
     <indexterm>
--- 14945,14962 ----
      <primary>pg_listening_channels</primary>
     </indexterm>
  
+    <indexterm>
+     <primary>pg_notify_queue_saturation</primary>
+    </indexterm>
+ 
     <para>
      <function>pg_listening_channels</function> returns a set of names of
!     asynchronous notification channels that the current session is listening
!     to.  <function>pg_notify_queue_saturation</function> returns the proportion
!     of the total available space for notifications currently occupied by
!     notifications that are waiting to be processed.  See
!     <xref linkend="sql-listen"> and <xref linkend="sql-notify">
!     for more information.
     </para>
  
     <indexterm>
*** a/doc/src/sgml/ref/notify.sgml
--- b/doc/src/sgml/ref/notify.sgml
***************
*** 166,171 **** NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
--- 166,175 ----
     current transaction so that cleanup can proceed.
    </para>
    <para>
+    The function <function>pg_notify_queue_saturation</function> returns the
+    proportion of the queue that is currently occupied by pending notifications.
+   </para>
+   <para>
     A transaction that has executed <command>NOTIFY</command> cannot be
     prepared for two-phase commit.
    </para>
*** a/src/backend/commands/async.c
--- b/src/backend/commands/async.c
***************
*** 371,376 **** static bool asyncQueueIsFull(void);
--- 371,377 ----
  static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
  static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
  static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
+ static double asyncQueueSaturation(void);
  static void asyncQueueFillWarning(void);
  static bool SignalBackends(void);
  static void asyncQueueReadAllNotifications(void);
***************
*** 1362,1387 **** asyncQueueAddEntries(ListCell *nextNotify)
  }
  
  /*
!  * Check whether the queue is at least half full, and emit a warning if so.
!  *
!  * This is unlikely given the size of the queue, but possible.
!  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
   *
!  * Caller must hold exclusive AsyncQueueLock.
   */
! static void
! asyncQueueFillWarning(void)
  {
! 	int			headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
! 	int			tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
! 	int			occupied;
! 	double		fillDegree;
! 	TimestampTz t;
  
  	occupied = headPage - tailPage;
  
  	if (occupied == 0)
! 		return;					/* fast exit for common case */
  
  	if (occupied < 0)
  	{
--- 1363,1399 ----
  }
  
  /*
!  * SQL function to return the proportion of the notification queue currently
!  * occupied.
!  */
! Datum
! pg_notify_queue_saturation(PG_FUNCTION_ARGS)
! {
! 	double saturation;
! 
! 	LWLockAcquire(AsyncQueueLock, LW_SHARED);
! 	saturation = asyncQueueSaturation();
! 	LWLockRelease(AsyncQueueLock);
! 
! 	PG_RETURN_FLOAT8(saturation);
! }
! 
! /*
!  * Return the proportion of the queue that is currently occupied.
   *
!  * The caller must hold (at least) shared AysncQueueLock.
   */
! static double
! asyncQueueSaturation(void)
  {
! 	int		headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
! 	int		tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
! 	int		occupied;
  
  	occupied = headPage - tailPage;
  
  	if (occupied == 0)
! 		return (double) 0;		/* fast exit for common case */
  
  	if (occupied < 0)
  	{
***************
*** 1389,1396 **** asyncQueueFillWarning(void)
  		occupied += QUEUE_MAX_PAGE + 1;
  	}
  
! 	fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
  
  	if (fillDegree < 0.5)
  		return;
  
--- 1401,1424 ----
  		occupied += QUEUE_MAX_PAGE + 1;
  	}
  
! 	return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
! }
! 
! /*
!  * Check whether the queue is at least half full, and emit a warning if so.
!  *
!  * This is unlikely given the size of the queue, but possible.
!  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
!  *
!  * Caller must hold exclusive AsyncQueueLock.
!  */
! static void
! asyncQueueFillWarning(void)
! {
! 	double		fillDegree;
! 	TimestampTz t;
  
+ 	fillDegree = asyncQueueSaturation();
  	if (fillDegree < 0.5)
  		return;
  
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 4036,4045 **** DATA(insert OID = 2856 (  pg_timezone_names		PGNSP PGUID 12 1 1000 0 0 f f f f t
  DESCR("get the available time zone names");
  DATA(insert OID = 2730 (  pg_get_triggerdef		PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ ));
  DESCR("trigger description with pretty-print option");
! DATA(insert OID = 3035 (  pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
  DESCR("get the channels that the current backend listens to");
! DATA(insert OID = 3036 (  pg_notify				PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
  DESCR("send a notification event");
  
  /* non-persistent series generator */
  DATA(insert OID = 1066 (  generate_series PGNSP PGUID 12 1 1000 0 0 f f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ ));
--- 4036,4049 ----
  DESCR("get the available time zone names");
  DATA(insert OID = 2730 (  pg_get_triggerdef		PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ ));
  DESCR("trigger description with pretty-print option");
! 
! /* asynchronous notifications */
! DATA(insert OID = 3035 (  pg_listening_channels			PGNSP PGUID 12 1 10 0 0 f f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
  DESCR("get the channels that the current backend listens to");
! DATA(insert OID = 3036 (  pg_notify						PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
  DESCR("send a notification event");
+ DATA(insert OID = 3293 (  pg_notify_queue_saturation	PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notify_queue_saturation _null_ _null_ _null_ ));
+ DESCR("get the saturation of the asynchronous notification queue");
  
  /* non-persistent series generator */
  DATA(insert OID = 1066 (  generate_series PGNSP PGUID 12 1 1000 0 0 f f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ ));
*** a/src/include/commands/async.h
--- b/src/include/commands/async.h
***************
*** 37,42 **** extern void Async_UnlistenAll(void);
--- 37,43 ----
  /* notify-related SQL functions */
  extern Datum pg_listening_channels(PG_FUNCTION_ARGS);
  extern Datum pg_notify(PG_FUNCTION_ARGS);
+ extern Datum pg_notify_queue_saturation(PG_FUNCTION_ARGS);
  
  /* perform (or cancel) outbound notify processing at transaction commit */
  extern void PreCommit_Notify(void);
*** /dev/null
--- b/src/test/isolation/expected/async-notify.out
***************
*** 0 ****
--- 1,17 ----
+ Parsed test spec with 2 sessions
+ 
+ starting permutation: listen begin check notify check
+ step listen: LISTEN a;
+ step begin: BEGIN;
+ step check: SELECT pg_notify_queue_saturation() > 0 AS nonzero;
+ nonzero        
+ 
+ f              
+ step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s;
+ count          
+ 
+ 1000           
+ step check: SELECT pg_notify_queue_saturation() > 0 AS nonzero;
+ nonzero        
+ 
+ t              
*** /dev/null
--- b/src/test/isolation/specs/async-notify.spec
***************
*** 0 ****
--- 1,14 ----
+ # Verify that pg_notify_queue_saturation correctly reports a non-zero result,
+ # after submitting notifications while another connection is listening for
+ # those notifications and waiting inside an active transaction.
+ 
+ session "listener"
+ step "listen"	{ LISTEN a; }
+ step "begin"	{ BEGIN; }
+ teardown		{ ROLLBACK; }
+ 
+ session "notifier"
+ step "check"	{ SELECT pg_notify_queue_saturation() > 0 AS nonzero; }
+ step "notify"	{ SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; }
+ 
+ permutation "listen" "begin" "check" "notify" "check"
*** a/src/test/regress/expected/async.out
--- b/src/test/regress/expected/async.out
***************
*** 32,34 **** NOTIFY notify_async2;
--- 32,42 ----
  LISTEN notify_async2;
  UNLISTEN notify_async2;
  UNLISTEN *;
+ -- Should return zero while there are no pending notifications.
+ -- src/test/isolation/specs/async-notify.spec actually tests for saturation.
+ SELECT pg_notify_queue_saturation();
+  pg_notify_queue_saturation 
+ ----------------------------
+                           0
+ (1 row)
+ 
*** a/src/test/regress/sql/async.sql
--- b/src/test/regress/sql/async.sql
***************
*** 17,19 **** NOTIFY notify_async2;
--- 17,23 ----
  LISTEN notify_async2;
  UNLISTEN notify_async2;
  UNLISTEN *;
+ 
+ -- Should return zero while there are no pending notifications.
+ -- src/test/isolation/specs/async-notify.spec actually tests for saturation.
+ SELECT pg_notify_queue_saturation();
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to