On Fri, Oct 17, 2025, at 15:51, Álvaro Herrera wrote:
> I have the impression that this thread has lost focus on the idea of
> producing a backpatchable bugfix. The last proposed patch has a lot of
> new mechanism that doesn't seem suitable for backpatch. I could be
> wrong of course.
I've tried to create a minimal isolated fix, hopefully suitable for
backpatching, with no new mechanisms, other than the added
GetOldestQueuedNotifyXid used by vac_update_datfrozenxid.
It's based on the approach discussed earlier in this thread, that just
goes through the notification queue from QUEUE_TAIL to QUEUE_HEAD, to
find the oldestXid in the current database.
Implementation:
* Break out SLRU read page code from asyncQueueReadAllNotifications into
new helper-function asyncQueueReadPageToBuffer.
* Add GetOldestQueuedNotifyXid which uses the new helper-function
asyncQueueReadPageToBuffer.
It passes the 001_xid_freeze.pl test, not included in this patch.
/Joel
From 31ff0b7c35320afacf30685c006f17d6de179421 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 19 Oct 2025 18:55:25 +0200
Subject: [PATCH] Prevent VACUUM from truncating XIDs still present in
notification queue
VACUUM's computation of datfrozenxid did not account for transaction IDs
in the LISTEN/NOTIFY queue. This allowed VACUUM to truncate clog
entries for XIDs that were still referenced by queued notifications,
causing backends to fail in TransactionIdDidCommit when later processing
those notifications.
Fix by adding GetOldestQueuedNotifyXid to find the oldest XID in queued
notifications for the current database, and constraining datfrozenxid to
not pass that. The function scans from QUEUE_TAIL, since notifications
may have been written before any listeners existed.
To avoid code duplication, refactor SLRU page-reading code into a new
helper function asyncQueueReadPageToBuffer.
---
src/backend/commands/async.c | 139 ++++++++++++++++++++++++++++------
src/backend/commands/vacuum.c | 14 ++++
src/include/commands/async.h | 3 +
3 files changed, 132 insertions(+), 24 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..7c9d7831c9f 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -1841,6 +1841,44 @@ ProcessNotifyInterrupt(bool flush)
ProcessIncomingNotify(flush);
}
+/*
+ * Read a page from the SLRU queue into a local buffer.
+ *
+ * Reads the page containing 'pos', copying the data from the current offset
+ * either to the end of the page or up to 'head' (whichever comes first)
+ * into page_buffer.
+ */
+static void
+asyncQueueReadPageToBuffer(QueuePosition pos, QueuePosition head,
+ char *page_buffer)
+{
+ int64 curpage = QUEUE_POS_PAGE(pos);
+ int curoffset = QUEUE_POS_OFFSET(pos);
+ int slotno;
+ int copysize;
+
+ slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+
InvalidTransactionId);
+
+ if (curpage == QUEUE_POS_PAGE(head))
+ {
+ /* we only want to read as far as head */
+ copysize = QUEUE_POS_OFFSET(head) - curoffset;
+ if (copysize < 0)
+ copysize = 0; /* just for safety */
+ }
+ else
+ {
+ /* fetch all the rest of the page */
+ copysize = QUEUE_PAGESIZE - curoffset;
+ }
+
+ memcpy(page_buffer + curoffset,
+ NotifyCtl->shared->page_buffer[slotno] + curoffset,
+ copysize);
+ /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+ LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+}
/*
* Read all pending notifications from the queue, and deliver appropriate
@@ -1932,36 +1970,13 @@ asyncQueueReadAllNotifications(void)
do
{
- int64 curpage = QUEUE_POS_PAGE(pos);
- int curoffset =
QUEUE_POS_OFFSET(pos);
- int slotno;
- int copysize;
-
/*
* We copy the data from SLRU into a local buffer, so
as to avoid
* holding the SLRU lock while we are examining the
entries and
* possibly transmitting them to our frontend. Copy
only the part
* of the page we will actually inspect.
*/
- slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-
InvalidTransactionId);
- if (curpage == QUEUE_POS_PAGE(head))
- {
- /* we only want to read as far as head */
- copysize = QUEUE_POS_OFFSET(head) - curoffset;
- if (copysize < 0)
- copysize = 0; /* just for safety */
- }
- else
- {
- /* fetch all the rest of the page */
- copysize = QUEUE_PAGESIZE - curoffset;
- }
- memcpy(page_buffer.buf + curoffset,
- NotifyCtl->shared->page_buffer[slotno] +
curoffset,
- copysize);
- /* Release lock that we got from
SimpleLruReadPage_ReadOnly() */
- LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+ asyncQueueReadPageToBuffer(pos, head, page_buffer.buf);
/*
* Process messages up to the stop position, end of
page, or an
@@ -2097,6 +2112,82 @@ asyncQueueProcessPageEntries(volatile QueuePosition
*current,
return reachedStop;
}
+/*
+ * Get the oldest XID in the notification queue that has not yet been
+ * processed by all listening backends.
+ *
+ * Returns InvalidTransactionId if there are no unprocessed notifications.
+ */
+TransactionId
+GetOldestQueuedNotifyXid(void)
+{
+ QueuePosition pos;
+ QueuePosition head;
+ TransactionId oldestXid = InvalidTransactionId;
+
+ /* page_buffer must be adequately aligned, so use a union */
+ union
+ {
+ char buf[QUEUE_PAGESIZE];
+ AsyncQueueEntry align;
+ } page_buffer;
+
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+ /*
+ * We must start at QUEUE_TAIL since notification data might have been
+ * written before there were any listening backends.
+ */
+ pos = QUEUE_TAIL;
+ head = QUEUE_HEAD;
+
+ /* If the queue is empty, no XIDs need protection */
+ if (QUEUE_POS_EQUAL(pos, head))
+ {
+ LWLockRelease(NotifyQueueLock);
+ return InvalidTransactionId;
+ }
+
+ while (!QUEUE_POS_EQUAL(pos, head))
+ {
+ int curoffset;
+ AsyncQueueEntry *qe;
+
+ /* Read the current page from SLRU into our local buffer */
+ asyncQueueReadPageToBuffer(pos, head, page_buffer.buf);
+
+ curoffset = QUEUE_POS_OFFSET(pos);
+
+ /* Process all entries on this page up to head */
+ while (curoffset + QUEUEALIGN(AsyncQueueEntryEmptySize) <=
QUEUE_PAGESIZE &&
+ !QUEUE_POS_EQUAL(pos, head))
+ {
+ qe = (AsyncQueueEntry *) (page_buffer.buf + curoffset);
+
+ /*
+ * Check if this entry is for our database and has a
valid XID.
+ * Only entries for our database matter for our
datfrozenxid.
+ */
+ if (qe->dboid == MyDatabaseId &&
TransactionIdIsValid(qe->xid))
+ {
+ if (!TransactionIdIsValid(oldestXid) ||
+ TransactionIdPrecedes(qe->xid,
oldestXid))
+ oldestXid = qe->xid;
+ }
+
+ /* Advance to next entry */
+ if (asyncQueueAdvance(&pos, qe->length))
+ break; /* advanced to next
page */
+
+ curoffset = QUEUE_POS_OFFSET(pos);
+ }
+ }
+
+ LWLockRelease(NotifyQueueLock);
+
+ return oldestXid;
+}
+
/*
* Advance the shared queue tail variable to the minimum of all the
* per-backend tail pointers. Truncate pg_notify space if possible.
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index ed03e3bd50d..4f278c6b988 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -37,6 +37,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_database.h"
#include "catalog/pg_inherits.h"
+#include "commands/async.h"
#include "commands/cluster.h"
#include "commands/defrem.h"
#include "commands/progress.h"
@@ -1733,6 +1734,19 @@ vac_update_datfrozenxid(void)
if (bogus)
return;
+ /*
+ * Also consider the oldest XID in the notification queue, since
+ * backends will need to call TransactionIdDidCommit() on those
+ * XIDs when processing the notifications.
+ */
+ {
+ TransactionId oldestNotifyXid = GetOldestQueuedNotifyXid();
+
+ if (TransactionIdIsValid(oldestNotifyXid) &&
+ TransactionIdPrecedes(oldestNotifyXid, newFrozenXid))
+ newFrozenXid = oldestNotifyXid;
+ }
+
Assert(TransactionIdIsNormal(newFrozenXid));
Assert(MultiXactIdIsValid(newMinMulti));
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..d707f516316 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -26,6 +26,9 @@ extern void NotifyMyFrontEnd(const char *channel,
const char *payload,
int32 srcPid);
+/* get oldest XID in the notification queue for vacuum */
+extern TransactionId GetOldestQueuedNotifyXid(void);
+
/* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload);
extern void Async_Listen(const char *channel);
--
2.50.1