From 72df2b4e2f76294692ce57553349b61ff141e00c Mon Sep 17 00:00:00 2001
From: Matheus Alcantara <mths.dev@pm.me>
Date: Sat, 6 Sep 2025 11:29:02 -0300
Subject: [PATCH v1] Make AsyncQueueEntry's self contained

Previously the asyncQueueProcessPageEntries() use the
TransactionIdDidCommit() to check if the transaction that a notification
belongs is committed or not. Although this work for almost all scenarios
we may have some cases where if a notification is keep for to long on
the queue and the VACUUM FREEZE is executed during this time it may
remove clog files that is needed to check the transaction status of
these notifications which will cause errors to listener backends when
reading the async queue.

This commit fix this issue by making the AsyncQueueEntry self contained
by adding the "committed" boolean field so asyncQueueProcessPageEntries()
can use this to check if the transaction of the notification is
committed or not.

We set committed as true when adding the entry on the SLRU page buffer
cache when PreCommit_Notify() is called and if an error occur before
AtCommit_Notify() the AtAbort_Notify() will be called which will mark
the committed field as false.

A new global List pendingAsyncQueueEntries is created to keep track of
pending notification entries that is on shared async queue page buffer
but it's not fully committed yet so we can mark these entries as not
committed in case of transaction abort.

Also this commit include TAP tests to exercise the VACUUM FREEZE issue
and also the scenario of an error being occur between the
PreCommit_Notify() and AtCommit_Notify() calls by using the injection
points extension.
---
 src/backend/access/transam/xact.c             |  3 +
 src/backend/commands/async.c                  | 92 ++++++++++++++++++-
 src/test/modules/Makefile                     |  1 +
 src/test/modules/meson.build                  |  1 +
 src/test/modules/test_listen_notify/Makefile  | 17 ++++
 .../modules/test_listen_notify/meson.build    | 14 +++
 .../test_listen_notify/t/001_xid_freeze.pl    | 66 +++++++++++++
 .../test_listen_notify/t/002_transaction.pl   | 57 ++++++++++++
 src/tools/pgindent/typedefs.list              |  1 +
 9 files changed, 251 insertions(+), 1 deletion(-)
 create mode 100644 src/test/modules/test_listen_notify/Makefile
 create mode 100644 src/test/modules/test_listen_notify/meson.build
 create mode 100644 src/test/modules/test_listen_notify/t/001_xid_freeze.pl
 create mode 100644 src/test/modules/test_listen_notify/t/002_transaction.pl

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b46e7e9c2a6..34b46ab4d4a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -64,6 +64,7 @@
 #include "utils/builtins.h"
 #include "utils/combocid.h"
 #include "utils/guc.h"
+#include "utils/injection_point.h"
 #include "utils/inval.h"
 #include "utils/memutils.h"
 #include "utils/relmapper.h"
@@ -2340,6 +2341,8 @@ CommitTransaction(void)
 	 */
 	PreCommit_Notify();
 
+	INJECTION_POINT("commit-transaction-pos-pre-commit-notify", NULL);
+
 	/*
 	 * Mark serializable transaction as complete for predicate locking
 	 * purposes.  This should be done as late as we can put it and still allow
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..a7c211fc09a 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -180,6 +180,8 @@ typedef struct AsyncQueueEntry
 	Oid			dboid;			/* sender's database OID */
 	TransactionId xid;			/* sender's XID */
 	int32		srcPid;			/* sender's PID */
+	bool		committed;		/* Is transaction that the entry belongs
+								 * committed? */
 	char		data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
 } AsyncQueueEntry;
 
@@ -401,8 +403,30 @@ struct NotificationHash
 	Notification *event;		/* => the actual Notification struct */
 };
 
+/*
+ * Struct representing an AsyncQueueEntry that is added on global async queue
+ * but it is not yet fully committed.
+ *
+ * This struct is used to find AsyncQueueEntry's already added on the global
+ * async queue and mark then as not committed when an error between the
+ * Pre_CommitNotify() and At_CommitNotify() is raised.
+ */
+typedef struct PendingAsyncQueueEntry
+{
+	/* Page which the entry was added */
+	int64		page;
+
+	/* slot number where the entry was added */
+	int			slotno;
+
+	/* offset of the entry on page buffer */
+	int			offset;
+} PendingAsyncQueueEntry;
+
 static NotificationList *pendingNotifies = NULL;
 
+static List *pendingAsyncQueueEntries = NIL;
+
 /*
  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
  * called from inside a signal handler. That just sets the
@@ -1398,12 +1422,30 @@ asyncQueueAddEntries(ListCell *nextNotify)
 	while (nextNotify != NULL)
 	{
 		Notification *n = (Notification *) lfirst(nextNotify);
+		PendingAsyncQueueEntry *pendingEntry = palloc(sizeof(PendingAsyncQueueEntry));
 
 		/* Construct a valid queue entry in local variable qe */
 		asyncQueueNotificationToEntry(n, &qe);
 
+		/*
+		 * Mark the entry as committed. If the transaction that this
+		 * notification belongs fails to commit the AtAbort_Notify() will mark
+		 * this entry as not committed.
+		 */
+		qe.committed = true;
+
 		offset = QUEUE_POS_OFFSET(queue_head);
 
+		/*
+		 * Store information from AsyncQueueEntry so AtAbort_Notify() can
+		 * lookup the entry on shared page buffer if the transaction fails to
+		 * commit
+		 */
+		pendingEntry->page = pageno;
+		pendingEntry->slotno = slotno;
+		pendingEntry->offset = offset;
+		pendingAsyncQueueEntries = lappend(pendingAsyncQueueEntries, pendingEntry);
+
 		/* Check whether the entry really fits on the current page */
 		if (offset + qe.length <= QUEUE_PAGESIZE)
 		{
@@ -1670,6 +1712,8 @@ SignalBackends(void)
 void
 AtAbort_Notify(void)
 {
+	ListCell   *lc;
+
 	/*
 	 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
 	 * we have registered as a listener but have not made any entry in
@@ -1678,6 +1722,51 @@ AtAbort_Notify(void)
 	if (amRegisteredListener && listenChannels == NIL)
 		asyncQueueUnregister();
 
+	/*
+	 * At this stage if we have pendingAsyncQueueEntries we already have added
+	 * these notifications on the shared async queue, so we need to lookup
+	 * these notifications and mark then as not committed, so when a backend
+	 * is processing this notification it can skip without checking the
+	 * transaction status on clog files that may already have be truncated by
+	 * VACUUM FREEZE.
+	 */
+	if (pendingAsyncQueueEntries != NIL)
+	{
+		/*
+		 * We can not have pending async queue entries without pending
+		 * notifications
+		 */
+		Assert(pendingNotifies != NULL);
+
+		foreach(lc, pendingAsyncQueueEntries)
+		{
+			PendingAsyncQueueEntry *pendingEntry = (PendingAsyncQueueEntry *) lfirst(lc);
+			int			slotno = pendingEntry->slotno;
+			int			offset = pendingEntry->offset;
+			int64		page = pendingEntry->page;
+			LWLock	   *banklock = SimpleLruGetBankLock(NotifyCtl, page);
+			AsyncQueueEntry *qe;
+
+			LWLockAcquire(banklock, LW_EXCLUSIVE);
+
+			qe = (AsyncQueueEntry *) (NotifyCtl->shared->page_buffer[slotno] + offset);
+
+			/*
+			 * The entry should already be marked as committed when adding on
+			 * into the shared async queue.
+			 */
+			Assert(qe->committed);
+
+			/*
+			 * Mark notification entry as not committed so it's not visible to
+			 * other backends
+			 */
+			qe->committed = false;
+
+			LWLockRelease(banklock);
+		}
+	}
+
 	/* And clean up */
 	ClearPendingActionsAndNotifies();
 }
@@ -2066,7 +2155,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 				reachedStop = true;
 				break;
 			}
-			else if (TransactionIdDidCommit(qe->xid))
+			else if (qe->committed)
 			{
 				/* qe->data is the null-terminated channel name */
 				char	   *channel = qe->data;
@@ -2385,6 +2474,7 @@ ClearPendingActionsAndNotifies(void)
 	 */
 	pendingActions = NULL;
 	pendingNotifies = NULL;
+	pendingAsyncQueueEntries = NIL;
 }
 
 /*
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 903a8ac151a..4c0160df341 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -28,6 +28,7 @@ SUBDIRS = \
 		  test_int128 \
 		  test_integerset \
 		  test_json_parser \
+		  test_listen_notify \
 		  test_lfind \
 		  test_misc \
 		  test_oat_hooks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 93be0f57289..144379b619b 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -27,6 +27,7 @@ subdir('test_ginpostinglist')
 subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
+subdir('test_listen_notify')
 subdir('test_lfind')
 subdir('test_misc')
 subdir('test_oat_hooks')
diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile
new file mode 100644
index 00000000000..da1bf5bb1b7
--- /dev/null
+++ b/src/test/modules/test_listen_notify/Makefile
@@ -0,0 +1,17 @@
+# src/test/modules/test_listen_notify/Makefile
+
+MODULE = test_listen_notify
+PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support"
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_listen_notify
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build
new file mode 100644
index 00000000000..f0a2b5058e4
--- /dev/null
+++ b/src/test/modules/test_listen_notify/meson.build
@@ -0,0 +1,14 @@
+# Copyright (c) 2022-2025, PostgreSQL Global Development Group
+
+tests += {
+  'name': 'test_listen_notify',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_xid_freeze.pl',
+      't/002_transaction.pl',
+    ],
+  },
+}
+
diff --git a/src/test/modules/test_listen_notify/t/001_xid_freeze.pl b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
new file mode 100644
index 00000000000..0a5130a042e
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/001_xid_freeze.pl
@@ -0,0 +1,66 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
+$node->safe_psql('postgres',
+	'CREATE TABLE t AS SELECT g AS a, g+2 AS b from generate_series(1,100000) g;'
+);
+$node->safe_psql('postgres',
+	'ALTER DATABASE template0 WITH ALLOW_CONNECTIONS true');
+
+# --- Start Session 1 and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen s;', "Session 1 listens to 's'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# --- Session 2, multiple notify's, and commit ---
+for my $i (1 .. 10)
+{
+	$node->safe_psql(
+		'postgres', "
+		BEGIN;
+		NOTIFY s, '$i';
+		COMMIT;");
+}
+
+# Consume enough XIDs to trigger truncation
+$node->safe_psql('postgres', 'select consume_xids(10000000);');
+
+# Execute update so the frozen xid of "t" table is updated to a xid greater
+# than consume_xids() result
+$node->safe_psql('postgres', 'UPDATE t SET a = a+b;');
+
+# Remember current datfrozenxid before vacuum freeze to ensure that it is advanced.
+my $datafronzenxid = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'");
+
+# Execute vacuum freeze on all databases
+$node->command_ok([ 'vacuumdb', '--all', '--freeze', '--port', $node->port ],
+	"vacuumdb --all --freeze");
+
+# Get the new datfrozenxid after vacuum freeze to ensure that is advanced but
+# we can still get the notification status of the notification
+my $datafronzenxid_freeze = $node->safe_psql('postgres', "select datfrozenxid from pg_database where datname = 'postgres'");
+ok($datafronzenxid_freeze > $datafronzenxid, 'datfrozenxid is advanced');
+
+# On Session 1, commit and ensure that the all notifications is received
+my $res = $psql_session1->query_safe('commit;', "commit listen s;");
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+	$notifications_count++;
+	like($i, qr/Asynchronous notification "s" with payload "$notifications_count" received/);
+}
+is($notifications_count, 10, 'received all committed notifications');
+
+done_testing();
diff --git a/src/test/modules/test_listen_notify/t/002_transaction.pl b/src/test/modules/test_listen_notify/t/002_transaction.pl
new file mode 100644
index 00000000000..825ae9cb1a9
--- /dev/null
+++ b/src/test/modules/test_listen_notify/t/002_transaction.pl
@@ -0,0 +1,57 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Path qw(mkpath);
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->start;
+
+if (!$node->check_extension('injection_points'))
+{
+	plan skip_all => 'Extension injection_points not installed';
+}
+
+# Use injection_points extension to test that if an error occur between
+# Pre_CommitNotify() and At_CommitNotify() the notification is not processed by
+# a backend listener.
+$node->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
+
+# Start session 1, create a listener and leave it idle in transaction
+my $psql_session1 = $node->background_psql('postgres');
+$psql_session1->query_safe('listen c1;', "Session 1 listens to 'c1'");
+$psql_session1->query_safe('begin;', "Session 1 starts a transaction");
+
+# Start session 2 to send a notification inside a transaction block
+my $psql_session2 = $node->background_psql('postgres', on_error_stop => 0);
+$psql_session2->query_safe('BEGIN;');
+$psql_session2->query_safe('NOTIFY c1;');
+
+# Add injection point to fail to commit the notify c1 transaction.
+$psql_session2->query_safe("SELECT injection_points_attach('commit-transaction-pos-pre-commit-notify', 'error');");
+
+# Commit the NOTIFY transaction which will raise an error due to injection point
+$psql_session2->query('COMMIT;');
+
+# detach the injection point on the open transaction to make it complete.
+$psql_session1->query_safe("SELECT injection_points_detach('commit-transaction-pos-pre-commit-notify');");
+
+# Send another NOTIFY after injection point is detached to signal the psql_session1 backend.
+$node->safe_psql('postgres', 'NOTIFY c1') ;
+
+# Commit the listener transaction - It should not see the notification that fails to commit.
+my $res = $psql_session1->query_safe('commit;', "commit listen s2;");
+
+my $notifications_count = 0;
+foreach my $i (split('\n', $res))
+{
+	$notifications_count++;
+	like($i, qr/Asynchronous notification "c1" received/);
+}
+is($notifications_count, 1, 'received only committed notifications');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e90af5b2ad3..6228faa56a5 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2148,6 +2148,7 @@ PatternInfo
 PatternInfoArray
 Pattern_Prefix_Status
 Pattern_Type
+PendingAsyncQueueEntry
 PendingFsyncEntry
 PendingRelDelete
 PendingRelSync
-- 
2.39.5 (Apple Git-154)

