On 10/10/2018 19:42, Catalin Iacob wrote:
On Tue, Oct 9, 2018 at 2:17 PM <jul...@jdemoor.com> wrote:
I just caught an error in my patch, it's fixed in the attachment. The
'never' and 'maybe' collapse modes were mixed up in one location.

Here's a partial review of this version, did not read the doc part
very carefully.

First of all, I agree that this is a desirable feature as, for a large
number of notiifications, the O(n^2) overhead quickly becomes very
noticeable.

I would expect the collapse mode to be an enum which is created from
the string early on during parsing and used for the rest of the code.
Instead the string is used all the way leading to string comparisons
in the notification dispatcher and to the need of hardcoding special
strings in various places, including the contrib module.

This comment in the beginning of async.c should also be updated:
*   Duplicate notifications from the same transaction are sent out as one
*   notification only. This is done to save work when for example a trigger

pg_notify_3args duplicates pg_notify, I would expect a helper function
to be extracted and called from both.

There are braces placed on the same line as the if, for example if
(strlen(collapse_mode) != 0) { which seems to not be the project's
style.

Thank you for the review. I've addressed all your points in the attached patch. The patch was made against release 11.1.

I couldn't find a way to make a good helper function for pg_notify_3args and pg_notify, I hope my proposed solution is acceptable.
diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 0c274322bd..1494a35a5a 100644
--- a/contrib/tcn/tcn.c
+++ b/contrib/tcn/tcn.c
@@ -161,7 +161,7 @@ triggered_change_notification(PG_FUNCTION_ARGS)
                                        strcpy_quoted(payload, 
SPI_getvalue(trigtuple, tupdesc, colno), '\'');
                                }
 
-                               Async_Notify(channel, payload->data);
+                               Async_Notify(channel, payload->data, 
NOTIFY_COLLAPSE_MODE_MAYBE);
                        }
                        ReleaseSysCache(indexTuple);
                        break;
diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml
index e0e125a2a2..96e0d7a990 100644
--- a/doc/src/sgml/ref/notify.sgml
+++ b/doc/src/sgml/ref/notify.sgml
@@ -21,7 +21,8 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable 
class="parameter">payload</replaceable> ]
+NOTIFY <replaceable class="parameter">channel</replaceable> [ , <replaceable 
class="parameter">payload</replaceable> [ , <replaceable 
class="parameter">collapse_mode</replaceable> ] ]
+
 </synopsis>
  </refsynopsisdiv>
 
@@ -93,20 +94,6 @@ NOTIFY <replaceable class="parameter">channel</replaceable> 
[ , <replaceable cla
    should try to keep their transactions short.
   </para>
 
-  <para>
-   If the same channel name is signaled multiple times from the same
-   transaction with identical payload strings, the
-   database server can decide to deliver a single notification only.
-   On the other hand, notifications with distinct payload strings will
-   always be delivered as distinct notifications. Similarly, notifications from
-   different transactions will never get folded into one notification.
-   Except for dropping later instances of duplicate notifications,
-   <command>NOTIFY</command> guarantees that notifications from the same
-   transaction get delivered in the order they were sent.  It is also
-   guaranteed that messages from different transactions are delivered in
-   the order in which the transactions committed.
-  </para>
-
   <para>
    It is common for a client that executes <command>NOTIFY</command>
    to be listening on the same notification channel itself.  In that case
@@ -121,6 +108,41 @@ NOTIFY <replaceable 
class="parameter">channel</replaceable> [ , <replaceable cla
    are the same, the notification event is one's own work bouncing
    back, and can be ignored.
   </para>
+
+</refsect1>
+
+<refsect1>
+
+  <title>Ordering and collapsing of notifications</title>
+
+  <para>
+   If the same channel name is signaled multiple times from the same
+   transaction with identical payload strings, the
+   database server can decide to deliver a single notification only,
+   when the value of the <literal>collapse_mode</literal> parameter is 
+   <literal>'maybe'</literal> or <literal>''</literal> (the empty string).
+
+   If the <literal>'never'</literal> collapse mode is specified, the server 
will 
+   deliver all notifications, including duplicates. Turning off deduplication 
+   in this way can considerably speed up transactions that emit large numbers 
+   of notifications.
+   
+   Removal of duplicate notifications takes place within transaction block,
+   finished with <literal>COMMIT</literal>, <literal>END</literal> or 
<literal>SAVEPOINT</literal>.
+  </para>
+
+  <para>
+   Notifications with distinct payload strings will
+   always be delivered as distinct notifications. Similarly, notifications from
+   different transactions will never get folded into one notification.
+   Except for dropping later instances of duplicate notifications,
+   <command>NOTIFY</command> guarantees that notifications from the same
+   transaction get delivered in the order they were sent.  It is also
+   guaranteed that messages from different transactions are delivered in
+   the order in which the transactions committed.
+  </para>
+
+
  </refsect1>
 
  <refsect1>
@@ -147,6 +169,16 @@ NOTIFY <replaceable 
class="parameter">channel</replaceable> [ , <replaceable cla
      </para>
     </listitem>
    </varlistentry>
+   <varlistentry>
+    <term><replaceable class="parameter">collapse_mode</replaceable></term>
+    <listitem>
+     <para>
+      The collapse mode to apply when identical notifications are issued 
within 
+      a transaction. The acceptable values are <literal>'maybe'</literal> (the 
+      default) and <literal>'never'</literal>.
+     </para>
+    </listitem>
+   </varlistentry>
   </variablelist>
  </refsect1>
 
@@ -190,6 +222,11 @@ NOTIFY <replaceable 
class="parameter">channel</replaceable> [ , <replaceable cla
     to use than the <command>NOTIFY</command> command if you need to work with
     non-constant channel names and payloads.
    </para>
+   <para>
+    There is a three-argument version, <literal>pg_notify(<type>text</type>,
+    <type>text</type>, <type>text</type>)</literal> where the third argument 
takes 
+    the value of the <literal>collapse_mode</literal> parameter. 
+   </para>
   </refsect2>
  </refsect1>
 
@@ -210,6 +247,21 @@ Asynchronous notification "virtual" with payload "This is 
the payload" received
 LISTEN foo;
 SELECT pg_notify('fo' || 'o', 'pay' || 'load');
 Asynchronous notification "foo" with payload "payload" received from server 
process with PID 14728.
+
+/* Identical messages from same (sub-) transaction can be eliminated - unless 
you use the 'never' collapse mode */
+LISTEN bar;
+BEGIN;
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Coffee please';
+NOTIFY bar, 'Milk please';
+NOTIFY bar, 'Milk please', 'never';
+SAVEPOINT s;
+NOTIFY bar, 'Coffee please';
+COMMIT;
+Asynchronous notification "bar" with payload "Coffee please" received from 
server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from 
server process with PID 31517.
+Asynchronous notification "bar" with payload "Milk please" received from 
server process with PID 31517.
+Asynchronous notification "bar" with payload "Coffee please" received from 
server process with PID 31517.
 </programlisting></para>
  </refsect1>
 
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index ee7c6d41b4..5bcde40cfd 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -15,99 +15,98 @@
 /*-------------------------------------------------------------------------
  * Async Notification Model as of 9.0:
  *
- * 1. Multiple backends on same machine. Multiple backends listening on
- *       several channels. (Channels are also called "conditions" in other
- *       parts of the code.)
+ * 1. Multiple backends on same machine. Multiple backends listening on several
+ *    channels. (Channels are also called "conditions" in other parts of the
+ *    code.)
  *
  * 2. There is one central queue in disk-based storage (directory pg_notify/),
- *       with actively-used pages mapped into shared memory by the slru.c 
module.
- *       All notification messages are placed in the queue and later read out
- *       by listening backends.
+ *    with actively-used pages mapped into shared memory by the slru.c module.
+ *    All notification messages are placed in the queue and later read out by
+ *    listening backends.
  *
- *       There is no central knowledge of which backend listens on which 
channel;
- *       every backend has its own list of interesting channels.
+ *    There is no central knowledge of which backend listens on which channel;
+ *    every backend has its own list of interesting channels.
  *
- *       Although there is only one queue, notifications are treated as being
- *       database-local; this is done by including the sender's database OID
- *       in each notification message.  Listening backends ignore messages
- *       that don't match their database OID.  This is important because it
- *       ensures senders and receivers have the same database encoding and 
won't
- *       misinterpret non-ASCII text in the channel name or payload string.
+ *    Although there is only one queue, notifications are treated as being
+ *    database-local; this is done by including the sender's database OID in
+ *    each notification message.  Listening backends ignore messages that don't
+ *    match their database OID.  This is important because it ensures senders
+ *    and receivers have the same database encoding and won't misinterpret
+ *    non-ASCII text in the channel name or payload string.
  *
- *       Since notifications are not expected to survive database crashes,
- *       we can simply clean out the pg_notify data at any reboot, and there
- *       is no need for WAL support or fsync'ing.
+ *    Since notifications are not expected to survive database crashes, we can
+ *    simply clean out the pg_notify data at any reboot, and there is no need
+ *    for WAL support or fsync'ing.
  *
  * 3. Every backend that is listening on at least one channel registers by
- *       entering its PID into the array in AsyncQueueControl. It then scans 
all
- *       incoming notifications in the central queue and first compares the
- *       database OID of the notification with its own database OID and then
- *       compares the notified channel with the list of channels that it 
listens
- *       to. In case there is a match it delivers the notification event to its
- *       frontend.  Non-matching events are simply skipped.
- *
- * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
- *       a backend-local list which will not be processed until transaction 
end.
- *
- *       Duplicate notifications from the same transaction are sent out as one
- *       notification only. This is done to save work when for example a 
trigger
- *       on a 2 million row table fires a notification for each row that has 
been
- *       changed. If the application needs to receive every single notification
- *       that has been sent, it can easily add some unique string into the 
extra
- *       payload parameter.
- *
- *       When the transaction is ready to commit, PreCommit_Notify() adds the
- *       pending notifications to the head of the queue. The head pointer of 
the
- *       queue always points to the next free position and a position is just a
- *       page number and the offset in that page. This is done before marking 
the
- *       transaction as committed in clog. If we run into problems writing the
- *       notifications, we can still call elog(ERROR, ...) and the transaction
- *       will roll back.
- *
- *       Once we have put all of the notifications into the queue, we return to
- *       CommitTransaction() which will then do the actual transaction commit.
- *
- *       After commit we are called another time (AtCommit_Notify()). Here we
- *       make the actual updates to the effective listen state 
(listenChannels).
- *
- *       Finally, after we are out of the transaction altogether, we check if
- *       we need to signal listening backends.  In SignalBackends() we scan the
- *       list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
- *       to every listening backend (we don't know which backend is listening 
on
- *       which channel so we must signal them all). We can exclude backends 
that
- *       are already up to date, though.  We don't bother with a self-signal
- *       either, but just process the queue directly.
- *
- * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
- *       sets the process's latch, which triggers the event to be processed
- *       immediately if this backend is idle (i.e., it is waiting for a 
frontend
- *       command and is not within a transaction block. C.f.
- *       ProcessClientReadInterrupt()).  Otherwise the handler may only set a
- *       flag, which will cause the processing to occur just before we next go
- *       idle.
- *
- *       Inbound-notify processing consists of reading all of the notifications
- *       that have arrived since scanning last time. We read every notification
- *       until we reach either a notification from an uncommitted transaction 
or
- *       the head pointer's position. Then we check if we were the laziest
- *       backend: if our pointer is set to the same position as the global tail
- *       pointer is set, then we move the global tail pointer ahead to where 
the
- *       second-laziest backend is (in general, we take the MIN of the current
- *       head position and all active backends' new tail pointers). Whenever we
- *       move the global tail pointer we also truncate now-unused pages (i.e.,
- *       delete files in pg_notify/ that are no longer used).
- *
- * An application that listens on the same channel it notifies will get
- * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
- * by comparing be_pid in the NOTIFY message to the application's own backend's
+ *    entering its PID into the array in AsyncQueueControl. It then scans all
+ *    incoming notifications in the central queue and first compares the
+ *    database OID of the notification with its own database OID and then
+ *    compares the notified channel with the list of channels that it listens
+ *    to. In case there is a match it delivers the notification event to its
+ *    frontend.  Non-matching events are simply skipped.
+ *
+ * 4. The NOTIFY statement (routine Async_Notify) stores the notification in a
+ *    backend-local list which will not be processed until transaction end.
+ *
+ *    Duplicate notifications from the same transaction are, by default, sent
+ *    out as one notification only. This is intended to save work when for
+ *    example a trigger on a 2 million row table fires a notification for each
+ *    row that has been changed. However, since the check for duplicates can be
+ *    expensive, with O(n2) complexity, the collapse_mode argument allows
+ *    turning this feature off.
+ *
+ *    When the transaction is ready to commit, PreCommit_Notify() adds the
+ *    pending notifications to the head of the queue. The head pointer of the
+ *    queue always points to the next free position and a position is just a
+ *    page number and the offset in that page. This is done before marking the
+ *    transaction as committed in clog. If we run into problems writing the
+ *    notifications, we can still call elog(ERROR, ...) and the transaction 
will
+ *    roll back.
+ *
+ *    Once we have put all of the notifications into the queue, we return to
+ *    CommitTransaction() which will then do the actual transaction commit.
+ *
+ *    After commit we are called another time (AtCommit_Notify()). Here we make
+ *    the actual updates to the effective listen state (listenChannels).
+ *
+ *    Finally, after we are out of the transaction altogether, we check if we
+ *    need to signal listening backends.  In SignalBackends() we scan the list
+ *    of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal to every
+ *    listening backend (we don't know which backend is listening on which
+ *    channel so we must signal them all). We can exclude backends that are
+ *    already up to date, though.  We don't bother with a self-signal either,
+ *    but just process the queue directly.
+ *
+ * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler 
sets
+ *    the process's latch, which triggers the event to be processed immediately
+ *    if this backend is idle (i.e., it is waiting for a frontend command and 
is
+ *    not within a transaction block. C.f. ProcessClientReadInterrupt()).
+ *    Otherwise the handler may only set a flag, which will cause the 
processing
+ *    to occur just before we next go idle.
+ *
+ *    Inbound-notify processing consists of reading all of the notifications
+ *    that have arrived since scanning last time. We read every notification
+ *    until we reach either a notification from an uncommitted transaction or
+ *    the head pointer's position. Then we check if we were the laziest 
backend:
+ *    if our pointer is set to the same position as the global tail pointer is
+ *    set, then we move the global tail pointer ahead to where the
+ *    second-laziest backend is (in general, we take the MIN of the current 
head
+ *    position and all active backends' new tail pointers). Whenever we move 
the
+ *    global tail pointer we also truncate now-unused pages (i.e., delete files
+ *    in pg_notify/ that are no longer used).
+ *
+ * An application that listens on the same channel it notifies will get NOTIFY
+ * messages for its own NOTIFYs.  These can be ignored, if not useful, by
+ * comparing be_pid in the NOTIFY message to the application's own backend's
  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
  * frontend during startup.)  The above design guarantees that notifies from
  * other backends will never be missed by ignoring self-notifies.
  *
  * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS)
- * can be varied without affecting anything but performance.  The maximum
- * amount of notification data that can be queued at one time is determined
- * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below.
+ * can be varied without affecting anything but performance.  The maximum 
amount
+ * of notification data that can be queued at one time is determined by 
slru.c's
+ * wraparound limit; see QUEUE_MAX_PAGE below.
  *-------------------------------------------------------------------------
  */
 
@@ -507,9 +506,21 @@ AsyncShmemInit(void)
  */
 Datum
 pg_notify(PG_FUNCTION_ARGS)
+{
+       return pg_notify_3args(fcinfo);
+}
+
+
+/*
+ * pg_notify_3args
+ *    SQL function to send a notification event, 3-argument version
+ */
+Datum
+pg_notify_3args(PG_FUNCTION_ARGS)
 {
        const char *channel;
        const char *payload;
+       NotifyCollapseMode collapse_mode;
 
        if (PG_ARGISNULL(0))
                channel = "";
@@ -521,15 +532,61 @@ pg_notify(PG_FUNCTION_ARGS)
        else
                payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
 
+       if (PG_NARGS() < 3 || PG_ARGISNULL(2))
+       {
+               collapse_mode = NOTIFY_COLLAPSE_MODE_MAYBE;
+       }
+       else
+       {
+               collapse_mode = 
str2collapse_mode(text_to_cstring(PG_GETARG_TEXT_PP(2)));
+       }
+
        /* For NOTIFY as a statement, this is checked in ProcessUtility */
        PreventCommandDuringRecovery("NOTIFY");
 
-       Async_Notify(channel, payload);
+       Async_Notify(channel, payload, collapse_mode);
 
        PG_RETURN_VOID();
 }
 
 
+NotifyCollapseMode str2collapse_mode(const char *mode_str)
+{
+       NotifyCollapseMode collapse_mode;
+       if (!mode_str)
+       {
+               collapse_mode = NOTIFY_COLLAPSE_MODE_MAYBE;
+       }
+       else
+       {
+               if (strlen(mode_str) == 0) {
+                       collapse_mode = NOTIFY_COLLAPSE_MODE_MAYBE;
+               }
+               else if (strcmp(mode_str, "always") == 0) 
+               {
+                       collapse_mode = NOTIFY_COLLAPSE_MODE_ALWAYS;
+               }
+               else if (strcmp(mode_str, "never") == 0) 
+               {
+                       collapse_mode = NOTIFY_COLLAPSE_MODE_NEVER;
+               }
+               else if (strcmp(mode_str, "maybe") == 0) 
+               {
+                       collapse_mode = NOTIFY_COLLAPSE_MODE_MAYBE;
+               }
+               else
+               {
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                               errmsg("invalid collapse_mode 
value '%s'", mode_str)));
+               }
+       }
+
+       return collapse_mode;
+
+}
+
+
 /*
  * Async_Notify
  *
@@ -540,10 +597,11 @@ pg_notify(PG_FUNCTION_ARGS)
  *             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  */
 void
-Async_Notify(const char *channel, const char *payload)
+Async_Notify(const char *channel, const char *payload, NotifyCollapseMode 
collapse_mode)
 {
        Notification *n;
        MemoryContext oldcontext;
+       bool removeDuplicates = false;
 
        if (IsParallelWorker())
                elog(ERROR, "cannot send notifications from a parallel worker");
@@ -570,9 +628,17 @@ Async_Notify(const char *channel, const char *payload)
                                         errmsg("payload string too long")));
        }
 
-       /* no point in making duplicate entries in the list ... */
-       if (AsyncExistsPendingNotify(channel, payload))
-               return;
+       if (collapse_mode == NOTIFY_COLLAPSE_MODE_ALWAYS || collapse_mode == 
NOTIFY_COLLAPSE_MODE_MAYBE)
+       {
+               removeDuplicates = true;
+       }
+
+       if (removeDuplicates)
+       {
+               /* remove duplicate entries in the list */
+               if (AsyncExistsPendingNotify(channel, payload))
+                       return;
+       }
 
        /*
         * The notification list needs to live until end of transaction, so 
store
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 02b500e5a0..3cda7773b5 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -521,7 +521,7 @@ static Node *makeRecursiveViewSelect(char *relname, List 
*aliases, Node *query);
 %type <boolean> opt_varying opt_timezone opt_no_inherit
 
 %type <ival>   Iconst SignedIconst
-%type <str>            Sconst comment_text notify_payload
+%type <str>            Sconst comment_text notify_payload notify_collapse_mode
 %type <str>            RoleId opt_boolean_or_string
 %type <list>   var_list
 %type <str>            ColId ColLabel var_name type_function_name param_name
@@ -9809,18 +9809,32 @@ opt_instead:
  *
  *****************************************************************************/
 
-NotifyStmt: NOTIFY ColId notify_payload
+NotifyStmt: 
+                       NOTIFY ColId
+                               {
+                                       NotifyStmt *n = makeNode(NotifyStmt);
+                                       n->conditionname = $2;
+                                       n->payload = NULL;
+                                       n->collapse_mode = "";
+                                       $$ = (Node *)n;
+                               }
+                       | NOTIFY ColId notify_payload notify_collapse_mode
                                {
                                        NotifyStmt *n = makeNode(NotifyStmt);
                                        n->conditionname = $2;
                                        n->payload = $3;
+                                       n->collapse_mode = $4;
                                        $$ = (Node *)n;
                                }
                ;
 
 notify_payload:
                        ',' Sconst                                              
        { $$ = $2; }
-                       | /*EMPTY*/                                             
        { $$ = NULL; }
+               ;
+
+notify_collapse_mode:
+                       ',' Sconst                                              
        { $$ = $2; }
+                       | /*EMPTY*/                                             
        { $$ = ""; }
                ;
 
 ListenStmt: LISTEN ColId
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index ad3a68a79b..2927b7de50 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -611,7 +611,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                                NotifyStmt *stmt = (NotifyStmt *) parsetree;
 
                                PreventCommandDuringRecovery("NOTIFY");
-                               Async_Notify(stmt->conditionname, 
stmt->payload);
+                               Async_Notify(stmt->conditionname, 
stmt->payload, str2collapse_mode(stmt->collapse_mode));
                        }
                        break;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index c4fc50dceb..e0fa28c08a 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7670,6 +7670,11 @@
   proname => 'pg_notify', proisstrict => 'f', provolatile => 'v',
   proparallel => 'r', prorettype => 'void', proargtypes => 'text text',
   prosrc => 'pg_notify' },
+{ oid => '3423', descr => 'send a notification event',
+  proname => 'pg_notify', proisstrict => 'f', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => 'text text text',
+  prosrc => 'pg_notify_3args' },
+
 { oid => '3296',
   descr => 'get the fraction of the asynchronous notification queue currently 
in use',
   proname => 'pg_notification_queue_usage', provolatile => 'v',
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index d5868c42a0..5f3f40af7c 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -32,8 +32,16 @@ extern void NotifyMyFrontEnd(const char *channel,
                                 const char *payload,
                                 int32 srcPid);
 
+/* collapse mode argument to NOTIFY and pg_notify() */
+typedef enum NotifyCollapseMode {
+       NOTIFY_COLLAPSE_MODE_MAYBE,
+       NOTIFY_COLLAPSE_MODE_NEVER,
+       NOTIFY_COLLAPSE_MODE_ALWAYS
+} NotifyCollapseMode;
+extern NotifyCollapseMode str2collapse_mode(const char *mode_str);
+
 /* notify-related SQL statements */
-extern void Async_Notify(const char *channel, const char *payload);
+extern void Async_Notify(const char *channel, const char *payload, 
NotifyCollapseMode collapse_mode);
 extern void Async_Listen(const char *channel);
 extern void Async_Unlisten(const char *channel);
 extern void Async_UnlistenAll(void);
@@ -54,4 +62,5 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(void);
 
+
 #endif                                                 /* ASYNC_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index a49b0131cf..75840e67a5 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2926,11 +2926,13 @@ typedef struct RuleStmt
  *             Notify Statement
  * ----------------------
  */
+
 typedef struct NotifyStmt
 {
        NodeTag         type;
        char       *conditionname;      /* condition name to notify */
        char       *payload;            /* the payload string, or NULL if none 
*/
+       char       *collapse_mode;      /* the collapse mode (empty string by 
default, which is equivalent to 'maybe') */
 } NotifyStmt;
 
 /* ----------------------
diff --git a/src/test/regress/expected/async.out 
b/src/test/regress/expected/async.out
index 19cbe38e63..beff62b895 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -8,6 +8,18 @@ SELECT pg_notify('notify_async1','sample message1');
  
 (1 row)
 
+SELECT pg_notify('notify_async1','sample message1','maybe');
+ pg_notify 
+-----------
+ 
+(1 row)
+
+SELECT pg_notify('notify_async1','sample_message1','never');
+ pg_notify 
+-----------
+ 
+(1 row)
+
 SELECT pg_notify('notify_async1','');
  pg_notify 
 -----------
@@ -29,9 +41,14 @@ SELECT 
pg_notify('notify_async_channel_name_too_long____________________________
 ERROR:  channel name too long
 --Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
 NOTIFY notify_async2;
+NOTIFY notify_async2, '', 'maybe';
+NOTIFY notify_async2, '', 'never';
 LISTEN notify_async2;
 UNLISTEN notify_async2;
 UNLISTEN *;
+--Should fail. Invalid collapse mode
+NOTIFY notify_async2, '', 'foobar';
+ERROR:  invalid collapse_mode value 'foobar'
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
 SELECT pg_notification_queue_usage();
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e01538..f95292e3e4 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -4,6 +4,8 @@
 
 --Should work. Send a valid message via a valid channel name
 SELECT pg_notify('notify_async1','sample message1');
+SELECT pg_notify('notify_async1','sample message1','maybe');
+SELECT pg_notify('notify_async1','sample_message1','never');
 SELECT pg_notify('notify_async1','');
 SELECT pg_notify('notify_async1',NULL);
 
@@ -14,10 +16,15 @@ SELECT 
pg_notify('notify_async_channel_name_too_long____________________________
 
 --Should work. Valid NOTIFY/LISTEN/UNLISTEN commands
 NOTIFY notify_async2;
+NOTIFY notify_async2, '', 'maybe';
+NOTIFY notify_async2, '', 'never';
 LISTEN notify_async2;
 UNLISTEN notify_async2;
 UNLISTEN *;
 
+--Should fail. Invalid collapse mode
+NOTIFY notify_async2, '', 'foobar';
+
 -- Should return zero while there are no pending notifications.
 -- src/test/isolation/specs/async-notify.spec tests for actual usage.
 SELECT pg_notification_queue_usage();

Reply via email to