On Fri Sep 19, 2025 at 12:13 PM -03, Arseniy Mukhin wrote:
> I think it's impossible: before pushing anything to the queue we
> acquire global lock in PreCommit_Notify():
>
> LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock);
>
> While we are holding the lock, no writers can add anything to the
> queue. Then we save head position and add pending notifications to the
> queue. The moment we get in AtAbort_Notify(), we still hold the global
> lock (maybe it is worth adding Assert about it if we start relying on
> it), so we can be sure there are no notifications in the queue after
> the saved head position except ours. So it seems safe but maybe I
> missed something.
>
Thanks for the explanation! I'm just not sure if I understand why do we
need the LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE) on
PreCommit_Notify() if we already have the
LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock);

See the attached patch that is based on your the previous comment of
resetting the QUEUE_HEAD at AtAbort_Notify()

--
Matheus Alcantara
From f31e096ed8eef6b6988d145ecaa0f513ec9f1042 Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <[email protected]>
Date: Sat, 6 Sep 2025 11:29:02 -0300
Subject: [PATCH v2] Reset LISTEN/NOTIFY QUEUE_HEAD for crashed transactions

Previously the asyncQueueProcessPageEntries() use the
TransactionIdDidCommit() to check if the transaction that a notification
belongs is committed or not. Although this work for almost all scenarios
we may have some cases where if a notification is keep for to long on
the queue and the VACUUM FREEZE is executed during this time it may
remove clog files that is needed to check the transaction status of
these notifications which will cause errors to listener backends when
reading the async queue.

This commit fix this issue by preserving the QUEUE_HEAD position before
a transaction's notifications are added to the shared queue. This
ensures that if the transaction fails between the PreCommit_Notify() and
AtCommit_Notify() phases, the At_AbortNotify() will be called to reset
the QUEUE_HEAD to its prior state, removing the entries from the aborted
transaction from the shared queue.

Also this commit include TAP tests to exercise the VACUUM FREEZE issue
and also the scenario of an error being occur between the
PreCommit_Notify() and AtCommit_Notify() calls.

Co-authored-by: Arseniy Mukhin <[email protected]>
---
 src/backend/commands/async.c                  | 62 ++++++++++++++---
 src/backend/storage/lmgr/lmgr.c               | 19 ++++++
 src/include/storage/lmgr.h                    |  3 +
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 src/test/modules/test_listen_notify/Makefile  | 17 +++++
 .../modules/test_listen_notify/meson.build    | 14 ++++
 .../test_listen_notify/t/001_xid_freeze.pl    | 66 +++++++++++++++++++
 .../t/002_aborted_tx_notifies.pl              | 66 +++++++++++++++++++
 src/tools/pgindent/typedefs.list              |  1 +
 10 files changed, 242 insertions(+), 8 deletions(-)
 create mode 100644 src/test/modules/test_listen_notify/Makefile
 create mode 100644 src/test/modules/test_listen_notify/meson.build
 create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl
 create mode 100644 
src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..7276a1da8f0 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -401,8 +401,17 @@ struct NotificationHash
        Notification *event;            /* => the actual Notification struct */
 };
 
+/*  Information needed by At_AbortNotify() to remove entries from the queue 
for crashed transactions. */
+typedef struct AtAbortNotifyInfo
+{
+       QueuePosition previousHead;
+
+} AtAbortNotifyInfo;
+
 static NotificationList *pendingNotifies = NULL;
 
+static AtAbortNotifyInfo *atAbortInfo = NULL;
+
 /*
  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
  * called from inside a signal handler. That just sets the
@@ -1465,6 +1474,19 @@ asyncQueueAddEntries(ListCell *nextNotify)
                }
        }
 
+       /*
+        * If 'atAbortInfo' is NULL, this is the first time we're adding
+        * notifications for the current transaction on the shared global queue.
+        * We save the initial QUEUE_HEAD position and if the transaction later
+        * aborts, At_AbortNotify() will use this saved position to reset
+        * QUEUE_HEAD, discarding the uncommitted notifications from the queue.
+        */
+       if (atAbortInfo == NULL)
+       {
+               atAbortInfo = palloc(sizeof(AtAbortNotifyInfo));
+               atAbortInfo->previousHead = QUEUE_HEAD;
+       }
+
        /* Success, so update the global QUEUE_HEAD */
        QUEUE_HEAD = queue_head;
 
@@ -1678,6 +1700,25 @@ AtAbort_Notify(void)
        if (amRegisteredListener && listenChannels == NIL)
                asyncQueueUnregister();
 
+       /*
+        * AtAbort_Notify information is set when we are adding entries on the
+        * global shared queue at PreCommit_Notify(), so in case of a crash on 
the
+        * transaction between the PreCommit_Notify() and AtCommit_Notify() we 
use
+        * this information to remove the entries from the crashed transaction
+        * from the queue. We remove these entries by resetting the QUEUE_HEAD 
to
+        * the position that it was before the entries being added on the queue.
+        *
+        * We are protected by the global shared lock on the database.
+        */
+       if (atAbortInfo != NULL)
+       {
+               Assert(CheckSharedObjectLockedByMe(DatabaseRelationId, 
LW_EXCLUSIVE, true));
+
+               LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+               QUEUE_HEAD = atAbortInfo->previousHead;
+               LWLockRelease(NotifyQueueLock);
+       }
+
        /* And clean up */
        ClearPendingActionsAndNotifies();
 }
@@ -2066,8 +2107,19 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                                reachedStop = true;
                                break;
                        }
-                       else if (TransactionIdDidCommit(qe->xid))
+                       else
                        {
+                               /*
+                                * AsyncQueueEntry's from crashed transactions 
are removed
+                                * from the queue at At_AbortNotify(), so if a 
notification
+                                * entry is not in-progress it is fully 
committed and safe to
+                                * be processed by a listener backends.
+                                *
+                                * For aborted transactions the notifications 
are not added on
+                                * the shared global queue since the COMMIT 
should be executed
+                                * by the transaction to include these on the 
queue.
+                                */
+
                                /* qe->data is the null-terminated channel name 
*/
                                char       *channel = qe->data;
 
@@ -2079,13 +2131,6 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                                        NotifyMyFrontEnd(channel, payload, 
qe->srcPid);
                                }
                        }
-                       else
-                       {
-                               /*
-                                * The source transaction aborted or crashed, 
so we just
-                                * ignore its notifications.
-                                */
-                       }
                }
 
                /* Loop back if we're not at end of page */
@@ -2385,6 +2430,7 @@ ClearPendingActionsAndNotifies(void)
         */
        pendingActions = NULL;
        pendingNotifies = NULL;
+       atAbortInfo = NULL;
 }
 
 /*
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 4798eb79003..12a21c51452 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -357,6 +357,25 @@ CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode, 
bool orstronger)
        return LockHeldByMe(&tag, lockmode, orstronger);
 }
 
+/*
+ *  CheckSharedObjectLockedByMe
+ *
+ *  Like CheckRelationLockedByMe, but it checks for shared objects.
+ */
+bool
+CheckSharedObjectLockedByMe(Oid classid, LOCKMODE lockmode, bool orstronger)
+{
+       LOCKTAG         tag;
+
+       SET_LOCKTAG_OBJECT(tag,
+                                          InvalidOid,
+                                          classid,
+                                          InvalidOid,
+                                          0);
+
+       return LockHeldByMe(&tag, lockmode, orstronger);
+}
+
 /*
  *             LockHasWaitersRelation
  *
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index b7abd18397d..c119c8f4ded 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -50,6 +50,9 @@ extern bool CheckRelationLockedByMe(Relation relation, 
LOCKMODE lockmode,
                                                                        bool 
orstronger);
 extern bool CheckRelationOidLockedByMe(Oid relid, LOCKMODE lockmode,
                                                                           bool 
orstronger);
+extern bool CheckSharedObjectLockedByMe(Oid classid, LOCKMODE lockmode,
+                                                                          bool 
orstronger);
+
 extern bool LockHasWaitersRelation(Relation relation, LOCKMODE lockmode);
 
 extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 8a3cd2afab7..4be824d7a31 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -28,6 +28,7 @@ SUBDIRS = \
                  test_int128 \
                  test_integerset \
                  test_json_parser \
+                 test_listen_notify \
                  test_lfind \
                  test_lwlock_tranches \
                  test_misc \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 717e85066ba..c0b0b327922 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -27,6 +27,7 @@ subdir('test_ginpostinglist')
 subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
+subdir('test_listen_notify')
 subdir('test_lfind')
 subdir('test_lwlock_tranches')
 subdir('test_misc')
diff --git a/src/test/modules/test_listen_notify/Makefile 
b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..da1bf5bb1b7
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,17 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY 
support"
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build 
b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..a68052cd353
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,14 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_listen_notify',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_xid_freeze.pl',
+      't/002_aborted_tx_notifies.pl'
+    ],
+  },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl 
b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
new file mode 100644
index 00000000000..0a5130a042e
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
@@ -0,0 +1,66 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+       'CREATE TABLE t AS SELECT g AS a, g+2 AS b from 
generate_series(1,100000) g;'
+);
+$node->safe_psql('postgres',
+       'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Session 2, multiple notify's, and commit ---
+for my $i (1 .. 10)
+{
+       $node->safe_psql(
+               'postgres', "
+               BEGIN;
+               NOTIFY s, '$i';
+               COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+
+# Execute update so the frozen xid of "t" table is updated to a xid greater
+# than consume_xids() result
+$node->safe_psql('postgres', 'UPDATE t SET a = a+b;');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is 
advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from 
pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+       "vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but
+# we can still get the notification status of the notification
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid 
from pg_database where datname = 'postgres'");
+ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced');
+
+# On Session 1, commit and ensure that the all notifications is received
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+       $notifications_count++;
+       like($i, qr/Asynchronous notification "s" with payload 
"$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+done_testing();
diff --git a/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl 
b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
new file mode 100644
index 00000000000..17fcb4b786e
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/002_aborted_tx_notifies.pl
@@ -0,0 +1,66 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Test checks that listeners do not receive notifications from aborted
+# transaction even if notifications have been added to the listen/notify
+# queue. To reproduce it we use the fact that serializable conflicts
+# are checked after tx adds notifications to the queue.
+
+# Setup
+$node->safe_psql('postgres', 'CREATE TABLE t1 (a bigserial);');
+
+# Listener
+my $psql_listener = $node->background_psql('postgres');
+$psql_listener->query_safe('LISTEN ch;');
+
+# Session1. Start SERIALIZABLE tx and add a notification.
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe("
+       BEGIN ISOLATION LEVEL SERIALIZABLE;
+       SELECT * FROM t1;
+       INSERT INTO t1 DEFAULT VALUES;
+       NOTIFY ch,'committed';
+");
+
+# Session2. Start SERIALIZABLE tx, add a notification and introduce a conflict
+# with session1.
+my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0);
+$psql_session2->query_safe("
+       BEGIN ISOLATION LEVEL SERIALIZABLE;
+       SELECT * FROM t1;
+       INSERT INTO t1 DEFAULT VALUES;
+       NOTIFY ch,'aborted';
+");
+
+# Session1 should be committed successfully. Listeners must receive session1
+# notifications.
+$psql_session1->query_safe("COMMIT;");
+
+# Session2 should be aborted due to the conflict with session1. Transaction
+# is aborted after adding notifications to the listen/notify queue, but
+# listeners should not receive session2 notifications.
+$psql_session2->query("COMMIT;");
+
+# send another notification after aborted
+$node->safe_psql('postgres', "NOTIFY ch, 'next_committed';");
+
+# fetch notifications
+my $res = $psql_listener->query_safe('begin; commit;');
+
+# check received notifications
+my @lines = split('\n', $res);
+is(@lines, 2, 'received all committed notifications');
+like($lines[0], qr/Asynchronous notification "ch" with payload "committed" 
received/);
+like($lines[1], qr/Asynchronous notification "ch" with payload 
"next_committed" received/);
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e90af5b2ad3..e1c2384aa3d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -159,6 +159,7 @@ ArrayType
 AsyncQueueControl
 AsyncQueueEntry
 AsyncRequest
+AtAbortNotifyInfo
 AttInMetadata
 AttStatsSlot
 AttoptCacheEntry
-- 
2.39.5 (Apple Git-154)

Reply via email to