On Thu, Oct 16, 2025, at 22:16, Tom Lane wrote: > "Joel Jacobson" <[email protected]> writes: >> On Thu, Oct 16, 2025, at 20:16, Joel Jacobson wrote: >>> Building pendingNotifyChannels is O(N^2) yes, but how large N is >>> realistic here? > >> I agree this looks like a real problem, since I guess it's not >> completely unthinkable someone might have >> some kind of trigger on a table, that could fire off NOTIFY >> for each row, possibly causing hundreds of thousands of >> notifies in the same db txn. > > We already de-duplicate identical NOTIFY operations for exactly that > reason (cf. AsyncExistsPendingNotify). However, non-identical NOTIFYs > obviously can't be merged. > > I wonder whether we could adapt that de-duplication logic so that > it produces a list of unique channel names in addition to a list > of unique NOTIFY events. One way could be a list/hashtable of > channels used, and for each one a list/hashtable of unique payloads, > rather than the existing single-level list/hashtable.
Thanks for the great idea! Yes, this was indeed possible.
0002-optimize_listen_notify-v20.patch:
* Added channelHashtab field, created and updated together with hashtab.
If we have channelHashtab, it's used within PreCommit_Notify to
quickly build pendingNotifyChannelsl.
In this email, I'm also answering to the feedback from Arseniy Mukhin,
and I've based the alt1, alt2, alt3 .txt patches on top of v20.
On Sat, Oct 18, 2025, at 18:41, Arseniy Mukhin wrote:
> Thank you for the new version and all implementations!
Thanks for review and great ideas!
>> > I think we can perhaps salvage the idea if we invent a separate
>> > "advisory" queue position field, which tells its backend "hey,
>> > you could skip as far as here if you want", but is not used for
>> > purposes of SLRU truncation.
>>
>> Above idea is implemented in 0002-optimize_listen_notify-v19-alt1.txt
>
> pos = QUEUE_BACKEND_POS(i);
>
> /* Direct advancement for idle backends at the old head */
> if (pendingNotifies != NULL &&
> QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
> {
> QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
>
> If we have several notifying backends, it looks like only the first
> one will be able to do direct advancement here. Next notifying backend
> will fail on QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite) as we don't
> wake up the listener and pos will be the same as it was for the first
> notifying backend.
Right.
> It seems that to accumulate direct advancement from
> several notifying backends we need to compare queueHeadBeforeWrite
> with advisoryPos here.
*** 0002-optimize_listen_notify-v20-alt1.txt:
* Fixed; compare advisoryPos with queueHeadBeforeWrite instead of pos.
> And we also need to advance advisoryPos to the
> listener's position after reading if advisoryPos falls behind.
* Fixed; set advisoryPos to max(max,advisoryPos) in PG_FINALLY block.
* Also noted Exec_ListenPreCommit didn't set advisoryPos to max
for the first LISTEN, now fixed.
> Minute of brainstorming
>
> I also thought about a workload that probably frequently can be met.
> Let's say we have sequence of notifications:
>
> F F F T F F F T F F F T
>
> Here F - notification from the channel we don't care about and T - the
> opposite.
> It seems that after the first 'T' notification it will be more
> difficult for notifying backends to do 'direct advancement' as there
> will be some lag before the listener reads the notification and
> advances its position. Not sure if it's a problem, probably it depends
> on the intensity of notifications.
Hmm, I realize both the advisoryPos and donePos ideas share a problem;
they both require listening backends to wakeup eventually anyway,
just to advance the 'pos'.
The holy grail would be to avoid this context switching cost entirely,
and only need to wakeup listening backends when they are actually
interested in the queued notifications. I think the third idea,
alt3, is most promising in achieving this goal.
> But maybe we can use a bit more
> sophisticated data structure here? Something like a list of skip
> ranges. Every entry in the list is the range (pos1, pos2) that the
> listener can skip during the reading. So instead of advancing
> advisoryPos every notifying backend should add skip range to the list.
> Notifying backends can merge neighbour ranges (pos1, pos2) & (pos2,
> pos3) -> (pos1, pos3). We also can limit the number of entries to 5
> for example. Listeners on their side should clear the list before
> reading and skip all ranges from it. What do you think? Is it
> overkill?
Hmm, maybe, but I'm a bit wary about too much complication.
Hopefully there is a simpler solution that avoids the need for this,
but sure, if we can't find one, then I'm positive to try this skip ranges idea.
>> > Alternatively, split the queue pos
>> > into "this is where to read next" and "this is as much as I'm
>> > definitively done with", where the second field gets advanced at
>> > the end of asyncQueueReadAllNotifications. Not sure which
>> > view would be less confusing (in the end I guess they're nearly
>> > the same thing, differently explained).
>>
>> Above idea is implemented in 0002-optimize_listen_notify-v19-alt2.txt
>>
>
> IMHO it's a little bit more confusing than the first option. Two
> points I noticed:
>
> 1) We have a fast path in asyncQueueReadAllNotifications()
>
> if (QUEUE_POS_EQUAL(pos, head))
> {
> /* Nothing to do, we have read all notifications already. */
> return;
> }
>
> Should we update donePos here? It looks like donePos may never be
> updated without it.
*** 0002-optimize_listen_notify-v20-alt2.txt:
* Fixed; update donePos here
> 2) In SignalBackends()
>
> /* Signal backends that have fallen too far behind */
> lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
> QUEUE_POS_PAGE(pos));
>
> if (lag >= QUEUE_CLEANUP_DELAY)
> {
> pid = QUEUE_BACKEND_PID(i);
> Assert(pid != InvalidPid);
>
> QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
> pids[count] = pid;
> procnos[count] = i;
> count++;
> }
>
> Should we use donePos here as it is responsible for queue truncation now?
* Fixed; use donePos here
>> > A different line of thought could be to get rid of
>> > asyncQueueReadAllNotifications's optimization of moving the
>> > queue pos only once, per
>> >
>> > * (We could alternatively retake NotifyQueueLock and move the
>> > position
>> > * before handling each individual message, but that seems like too
>> > much
>> > * lock traffic.)
>> >
>> > Since we only need shared lock to advance our own queue pos,
>> > maybe that wouldn't be too awful. Not sure.
>>
>> Above idea is implemented in 0002-optimize_listen_notify-v19-alt3.txt
>>
>
> Hmm, it seems we still have the race when in the beginning of
> asyncQueueReadAllNotifications we read pos into the local variable and
> release the lock. IIUC to avoid the race without introducing another
> field here, the listener needs to hold the lock until it updates its
> position so that the notifying backend cannot change it concurrently.
*** 0002-optimize_listen_notify-v20-alt3.txt:
* Fixed; the shared 'pos' is now only updated if the new position is ahead.
To me, it looks like alt3 is the winner in terms of simplicity, and is
also the winner in my ping-pong benchmark, due to avoiding context
switches more effectively than alt1 and alt2.
Eager to hear your thoughts!
/Joel
0001-optimize_listen_notify-v20.patch
Description: Binary data
From afff0f3f8b01cfde369c564025313e6acc9a610a Mon Sep 17 00:00:00 2001 From: Joel Jacobson <[email protected]> Date: Sun, 19 Oct 2025 08:08:05 +0200 Subject: [PATCH] Implements idea #1: advisoryPos --- src/backend/commands/async.c | 63 +++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..6a02f5e3acc 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -264,6 +264,11 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is @@ -286,6 +291,7 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + QueuePosition advisoryPos; /* backend could skip queue to here */ bool wakeupPending; /* signal sent but not yet processed */ } QueueBackendStatus; @@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_ADVISORY_POS(i) (asyncQueueControl->backend[i].advisoryPos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) /* @@ -674,6 +681,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; } } @@ -1312,6 +1320,7 @@ Exec_ListenPreCommit(void) prevListener = i; } QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; /* Insert backend into list of listeners at correct position */ @@ -2031,9 +2040,13 @@ SignalBackends(void) * Even though we may take and release NotifyQueueLock multiple times * while writing, the heavyweight lock guarantees this region contains * only our messages. Therefore, any backend still positioned at the - * queue head from before our write can be safely advanced to the current + * queue head from before our write can be advised to skip to the current * queue head without waking it. * + * We use the advisoryPos field rather than directly modifying pos. + * The backend controls its own pos field and will check advisoryPos + * when it's safe to do so. + * * False-positive possibility: if a backend was previously signaled but * hasn't yet awoken, we'll skip advancing it (because wakeupPending is * true). This is safe - the backend will advance its pointer when it @@ -2048,6 +2061,7 @@ SignalBackends(void) i = QUEUE_NEXT_LISTENER(i)) { QueuePosition pos; + QueuePosition advisoryPos; int64 lag; int32 pid; @@ -2055,15 +2069,31 @@ SignalBackends(void) continue; pos = QUEUE_BACKEND_POS(i); + advisoryPos = QUEUE_BACKEND_ADVISORY_POS(i); - /* Direct advancement for idle backends at the old head */ + /* + * Direct advancement for idle backends at the old head. + * + * We check advisoryPos rather than pos to allow accumulating advances + * from multiple consecutive notifying backends. If we checked pos, + * only the first notifier could advance idle backends; subsequent + * notifiers would find pos unchanged (since the backend hasn't woken + * up yet) and fail to advance further. + */ if (pendingNotifies != NULL && - QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite)) + QUEUE_POS_EQUAL(advisoryPos, queueHeadBeforeWrite)) { - QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; - pos = queueHeadAfterWrite; + QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite; + advisoryPos = queueHeadAfterWrite; } + /* + * For lag calculation, use whichever position is further ahead. + * This ensures we don't spuriously wake a backend that has been + * directly advanced. + */ + pos = QUEUE_POS_MAX(pos, advisoryPos); + /* Signal backends that have fallen too far behind */ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(pos)); @@ -2302,6 +2332,7 @@ static void asyncQueueReadAllNotifications(void) { volatile QueuePosition pos; + QueuePosition advisoryPos; QueuePosition head; Snapshot snapshot; @@ -2319,6 +2350,21 @@ asyncQueueReadAllNotifications(void) QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; + + /* + * Check if another backend has set an advisory position for us. + * If so, and if we haven't yet read past that point, we can safely + * adopt the advisory position and skip the intervening notifications. + */ + advisoryPos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber); + + if (!QUEUE_POS_EQUAL(advisoryPos, pos) && + QUEUE_POS_PRECEDES(pos, advisoryPos)) + { + pos = advisoryPos; + QUEUE_BACKEND_POS(MyProcNumber) = pos; + } + LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head)) @@ -2440,6 +2486,13 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyProcNumber) = pos; + /* + * Advance advisoryPos to our current position if it has fallen behind, + * but preserve any newer advisory position that may have been set by + * another backend while we were processing notifications. + */ + QUEUE_BACKEND_ADVISORY_POS(MyProcNumber) = + QUEUE_POS_MAX(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)); LWLockRelease(NotifyQueueLock); } PG_END_TRY(); -- 2.50.1
From c403098ae4e4d06f109eb6292a67c6577e123010 Mon Sep 17 00:00:00 2001 From: Joel Jacobson <[email protected]> Date: Sun, 19 Oct 2025 08:35:44 +0200 Subject: [PATCH] Implement idea #3 --- src/backend/commands/async.c | 150 ++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 65 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..b34e4a2247b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -264,6 +264,11 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is @@ -2304,6 +2309,7 @@ asyncQueueReadAllNotifications(void) volatile QueuePosition pos; QueuePosition head; Snapshot snapshot; + bool reachedStop; /* page_buffer must be adequately aligned, so use a union */ union @@ -2372,77 +2378,69 @@ asyncQueueReadAllNotifications(void) * It is possible that we fail while trying to send a message to our * frontend (for example, because of encoding conversion failure). If * that happens it is critical that we not try to send the same message - * over and over again. Therefore, we place a PG_TRY block here that will - * forcibly advance our queue position before we lose control to an error. - * (We could alternatively retake NotifyQueueLock and move the position - * before handling each individual message, but that seems like too much - * lock traffic.) + * over and over again. Therefore, we must advance our queue position + * regularly as we process messages. + * + * We must also be careful about concurrency: SignalBackends() can + * directly advance our position while we're reading. To preserve such + * advancement, asyncQueueProcessPageEntries updates our position in + * shared memory for each message, only writing if our position is ahead. + * Shared lock is sufficient since we're only updating our own position. */ - PG_TRY(); + do { - bool reachedStop; + int64 curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; - do + /* + * We copy the data from SLRU into a local buffer, so as to avoid + * holding the SLRU lock while we are examining the entries and + * possibly transmitting them to our frontend. Copy only the part + * of the page we will actually inspect. + */ + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + if (curpage == QUEUE_POS_PAGE(head)) { - int64 curpage = QUEUE_POS_PAGE(pos); - int curoffset = QUEUE_POS_OFFSET(pos); - int slotno; - int copysize; + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + memcpy(page_buffer.buf + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); - /* - * We copy the data from SLRU into a local buffer, so as to avoid - * holding the SLRU lock while we are examining the entries and - * possibly transmitting them to our frontend. Copy only the part - * of the page we will actually inspect. - */ - slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, - InvalidTransactionId); - if (curpage == QUEUE_POS_PAGE(head)) - { - /* we only want to read as far as head */ - copysize = QUEUE_POS_OFFSET(head) - curoffset; - if (copysize < 0) - copysize = 0; /* just for safety */ - } - else - { - /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; - } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, - copysize); - /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); + /* + * Process messages up to the stop position, end of page, or an + * uncommitted message. + * + * Our stop position is what we found to be the head's position + * when we entered this function. It might have changed already. + * But if it has, we will receive (or have already received and + * queued) another signal and come here again. + * + * We are not holding NotifyQueueLock here! The queue can only + * extend beyond the head pointer (see above). + * asyncQueueProcessPageEntries will update our backend's position + * for each message to ensure we don't reprocess messages if we fail + * partway through, and to preserve any direct advancement that + * SignalBackends() might perform concurrently. + */ + reachedStop = asyncQueueProcessPageEntries(&pos, head, + page_buffer.buf, + snapshot); - /* - * Process messages up to the stop position, end of page, or an - * uncommitted message. - * - * Our stop position is what we found to be the head's position - * when we entered this function. It might have changed already. - * But if it has, we will receive (or have already received and - * queued) another signal and come here again. - * - * We are not holding NotifyQueueLock here! The queue can only - * extend beyond the head pointer (see above) and we leave our - * backend's pointer where it is so nobody will truncate or - * rewrite pages under us. Especially we don't want to hold a lock - * while sending the notifications to the frontend. - */ - reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf, - snapshot); - } while (!reachedStop); - } - PG_FINALLY(); - { - /* Update shared state */ - LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; - LWLockRelease(NotifyQueueLock); - } - PG_END_TRY(); + } while (!reachedStop); /* Done with snapshot */ UnregisterSnapshot(snapshot); @@ -2490,6 +2488,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, */ reachedEndOfPage = asyncQueueAdvance(current, qe->length); + /* + * Update our position in shared memory immediately after advancing, + * before we attempt to process the message. This ensures we won't + * reprocess this message if NotifyMyFrontEnd fails. + * + * Only write if our position is ahead of the shared position. + * If the shared position is already ahead (due to direct advancement + * by SignalBackends), preserve it by not overwriting. + */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + { + QueuePosition sharedPos = QUEUE_BACKEND_POS(MyProcNumber); + + if (QUEUE_POS_PRECEDES(sharedPos, *current)) + QUEUE_BACKEND_POS(MyProcNumber) = *current; + } + LWLockRelease(NotifyQueueLock); + /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { @@ -2515,6 +2531,10 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, * messages. */ *current = thisentry; + /* Update shared memory to reflect the backed-up position */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + QUEUE_BACKEND_POS(MyProcNumber) = *current; + LWLockRelease(NotifyQueueLock); reachedStop = true; break; } -- 2.50.1
From 928cc032706ac154153279adbdfba95f6af2fae4 Mon Sep 17 00:00:00 2001 From: Joel Jacobson <[email protected]> Date: Sun, 19 Oct 2025 08:12:47 +0200 Subject: [PATCH] Implement idea #2: donePos --- src/backend/commands/async.c | 57 +++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4e6556fb8d1..c81807107d1 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -285,7 +285,8 @@ typedef struct QueueBackendStatus int32 pid; /* either a PID or InvalidPid */ Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ - QueuePosition pos; /* backend has read queue up to here */ + QueuePosition pos; /* next position to read from */ + QueuePosition donePos; /* backend has definitively processed up to here */ bool wakeupPending; /* signal sent but not yet processed */ } QueueBackendStatus; @@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_DONEPOS(i) (asyncQueueControl->backend[i].donePos) #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) /* @@ -674,6 +676,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0); QUEUE_BACKEND_WAKEUP_PENDING(i) = false; } } @@ -1312,6 +1315,7 @@ Exec_ListenPreCommit(void) prevListener = i; } QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_DONEPOS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; /* Insert backend into list of listeners at correct position */ @@ -2048,6 +2052,7 @@ SignalBackends(void) i = QUEUE_NEXT_LISTENER(i)) { QueuePosition pos; + QueuePosition donePos; int64 lag; int32 pid; @@ -2055,6 +2060,7 @@ SignalBackends(void) continue; pos = QUEUE_BACKEND_POS(i); + donePos = QUEUE_BACKEND_DONEPOS(i); /* Direct advancement for idle backends at the old head */ if (pendingNotifies != NULL && @@ -2064,9 +2070,17 @@ SignalBackends(void) pos = queueHeadAfterWrite; } - /* Signal backends that have fallen too far behind */ + /* + * Signal backends that have fallen too far behind. + * + * We use donePos rather than pos for the lag check because donePos + * is what matters for queue truncation (see asyncQueueAdvanceTail). + * A backend may have been directly advanced (pos is recent) while + * donePos is still far behind, holding back the tail. We need to + * wake such backends so they can advance their donePos. + */ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), - QUEUE_POS_PAGE(pos)); + QUEUE_POS_PAGE(donePos)); if (lag >= QUEUE_CLEANUP_DELAY) { @@ -2319,14 +2333,25 @@ asyncQueueReadAllNotifications(void) QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; - LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head)) { - /* Nothing to do, we have read all notifications already. */ + /* + * Nothing to do, we have read all notifications already. + * + * Update donePos to match pos before returning. This is important + * when our position was advanced via direct advancement: we need to + * update donePos so the queue tail can advance. Without this, + * backends that have been directly advanced would hold back queue + * truncation indefinitely. + */ + QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos; + LWLockRelease(NotifyQueueLock); return; } + LWLockRelease(NotifyQueueLock); + /*---------- * Get snapshot we'll use to decide which xacts are still in progress. * This is trickier than it might seem, because of race conditions. @@ -2437,9 +2462,19 @@ asyncQueueReadAllNotifications(void) } PG_FINALLY(); { - /* Update shared state */ + /* + * Update shared state. + * + * We update donePos to what we actually read (the local pos variable), + * as this is used for truncation safety. For the read position (pos), + * we use the maximum of our local position and the current shared + * position, in case another backend used direct advancement to skip us + * ahead while we were reading. This prevents us from going backwards + * and potentially pointing to a truncated page. + */ LWLockAcquire(NotifyQueueLock, LW_SHARED); - QUEUE_BACKEND_POS(MyProcNumber) = pos; + QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos; + QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos, QUEUE_BACKEND_POS(MyProcNumber)); LWLockRelease(NotifyQueueLock); } PG_END_TRY(); @@ -2589,7 +2624,13 @@ asyncQueueAdvanceTail(void) for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) { Assert(QUEUE_BACKEND_PID(i) != InvalidPid); - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + /* + * Use donePos rather than pos for truncation safety. The donePos + * field represents what the backend has definitively processed, while + * pos can be advanced by other backends via direct advancement. This + * prevents truncating pages that a backend is still reading from. + */ + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i)); } QUEUE_TAIL = min; oldtailpage = QUEUE_STOP_PAGE; -- 2.50.1
