Attached v12 fixes a variety of buglets I found throughout the patch set. I've also done quite a bit of refactoring. The scope of the refactoring varies from inlining some helpers to introducing new input argument structs.
0001 is independently valuable as it optimizes StrategyRejectBuffer() a bit and makes GetVictimBuffer() cleaner 0002-0007 were largely present in older versions of the patch set 0008 is new -- it is an early version of batching for normal backends flushing a buffer to obtain a clean one. Right now, it checks if the two blocks succeeding the target block are in shared buffers and dirty, and, if so, writes them out with the target buffer. I haven't started testing or benchmarking it because I need to convert bgwriter to use write combining to be able to benchmark it effectively. But I thought I would get the code out there sooner rather than later. It's a lot harder with my current code structure to add the target block's predecessor if it is dirty and read to write out. I wonder how important this is vs just the two succeeding blocks. - Melanie
From 07af817d3a9ebe4cf01a4652a5300821181d504b Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 7 Jan 2026 13:32:18 -0500 Subject: [PATCH v12 1/8] Streamline buffer rejection for bulkreads of unlogged tables Bulk-read buffer access strategies reject reusing a buffer from the buffer access strategy ring if reusing it would require flushing WAL. Unlogged relations never require WAL flushes, so this check can be skipped. This avoids taking the buffer header lock unnecessarily. Refactor this into StrategyRejectBuffer() itself, which also avoids LSN checking for non-bulkread buffer access strategies. --- src/backend/storage/buffer/bufmgr.c | 69 ++++++++++++++++++++------- src/backend/storage/buffer/freelist.c | 13 ++++- src/include/storage/buf_internals.h | 2 + 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index a036c2aa275..29c11d2d357 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -2482,26 +2482,15 @@ again: /* * If using a nondefault strategy, and writing the buffer would * require a WAL flush, let the strategy decide whether to go ahead - * and write/reuse the buffer or to choose another victim. We need a - * lock to inspect the page LSN, so this can't be done inside + * and write/reuse the buffer or to choose another victim. We need the + * content lock to inspect the page LSN, so this can't be done inside * StrategyGetBuffer. */ - if (strategy != NULL) + if (StrategyRejectBuffer(strategy, buf_hdr, from_ring)) { - XLogRecPtr lsn; - - /* Read the LSN while holding buffer header lock */ - buf_state = LockBufHdr(buf_hdr); - lsn = BufferGetLSN(buf_hdr); - UnlockBufHdr(buf_hdr); - - if (XLogNeedsFlush(lsn) - && StrategyRejectBuffer(strategy, buf_hdr, from_ring)) - { - LWLockRelease(content_lock); - UnpinBuffer(buf_hdr); - goto again; - } + LWLockRelease(content_lock); + UnpinBuffer(buf_hdr); + goto again; } /* OK, do the I/O */ @@ -3416,6 +3405,52 @@ TrackNewBufferPin(Buffer buf) BLCKSZ); } +/* + * Returns true if the buffer needs WAL flushed before it can be written out. + * *lsn is set to the current page LSN. + * + * If the result is required to be correct, the caller must hold a buffer + * content lock. If they only hold a shared content lock, we'll need to + * acquire the buffer header spinlock, so they must not already hold it. + * + * If the buffer is unlogged, *lsn shouldn't be used by the caller and is set + * to InvalidXLogRecPtr. + */ +bool +BufferNeedsWALFlush(BufferDesc *bufdesc, bool exclusive, XLogRecPtr *lsn) +{ + uint32 buf_state = pg_atomic_read_u32(&bufdesc->state); + char *page; + Buffer buffer; + + /* + * Unlogged buffers can't need WAL flush. See FlushBuffer() for more + * details on unlogged relations with LSNs. + */ + if (!(buf_state & BM_PERMANENT)) + { + *lsn = InvalidXLogRecPtr; + return false; + } + + buffer = BufferDescriptorGetBuffer(bufdesc); + page = BufferGetPage(buffer); + + Assert(BufferIsValid(buffer)); + + if (!XLogHintBitIsNeeded() || BufferIsLocal(buffer) || exclusive) + *lsn = PageGetLSN(page); + else + { + LockBufHdr(bufdesc); + *lsn = PageGetLSN(page); + UnlockBufHdr(bufdesc); + } + + return XLogNeedsFlush(*lsn); +} + + #define ST_SORT sort_checkpoint_bufferids #define ST_ELEMENT_TYPE CkptSortItem #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b) diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 9a93fb335fc..6a4452e2da0 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -780,12 +780,20 @@ IOContextForStrategy(BufferAccessStrategy strategy) * be written out and doing so would require flushing WAL too. This gives us * a chance to choose a different victim. * + * The buffer must be pinned and content locked and the buffer header spinlock + * must not be held. + * * Returns true if buffer manager should ask for a new victim, and false * if this buffer should be written and re-used. */ bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_ring) { + XLogRecPtr lsn; + + if (!strategy) + return false; + /* We only do this in bulkread mode */ if (strategy->btype != BAS_BULKREAD) return false; @@ -795,8 +803,11 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r strategy->buffers[strategy->current] != BufferDescriptorGetBuffer(buf)) return false; + if (!BufferNeedsWALFlush(buf, false, &lsn)) + return false; + /* - * Remove the dirty buffer from the ring; necessary to prevent infinite + * Remove the dirty buffer from the ring; necessary to prevent an infinite * loop if all ring members are dirty. */ strategy->buffers[strategy->current] = InvalidBuffer; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index fa43cf4458d..3c774d7a1d2 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -15,6 +15,7 @@ #ifndef BUFMGR_INTERNALS_H #define BUFMGR_INTERNALS_H +#include "access/xlogdefs.h" #include "pgstat.h" #include "port/atomics.h" #include "storage/aio_types.h" @@ -493,6 +494,7 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context, BufferTag *tag); extern void TrackNewBufferPin(Buffer buf); +extern bool BufferNeedsWALFlush(BufferDesc *bufdesc, bool exclusive, XLogRecPtr *lsn); /* solely to make it easier to write tests */ extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); -- 2.43.0
From 4795ec99e25e347ff4b3437423d276d9769123ff Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 15 Oct 2025 10:54:19 -0400 Subject: [PATCH v12 2/8] Split FlushBuffer() into two parts Before adding write combining to write a batch of blocks when flushing dirty buffers, refactor FlushBuffer() into the preparatory step and actual buffer flushing step. This separation procides symmetry with future code for batch flushing which necessarily separates these steps, as it must prepare multiple buffers before flushing them together. These steps are moved into a new FlushBuffer() helper function, CleanVictimBuffer() which will contain both the batch flushing and single flush code in future commits. Author: Melanie Plageman <[email protected]> Reviewed-by: Chao Li <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 133 ++++++++++++++++++---------- 1 file changed, 86 insertions(+), 47 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 29c11d2d357..2c706682eb3 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -631,6 +631,10 @@ static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); +static bool PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn); +static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, + IOObject io_object, IOContext io_context, + XLogRecPtr buffer_lsn); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -2450,6 +2454,7 @@ again: if (buf_state & BM_DIRTY) { LWLock *content_lock; + XLogRecPtr max_lsn = InvalidXLogRecPtr; Assert(buf_state & BM_TAG_VALID); Assert(buf_state & BM_VALID); @@ -2493,12 +2498,18 @@ again: goto again; } - /* OK, do the I/O */ - FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); - LWLockRelease(content_lock); - - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &buf_hdr->tag); + if (!PrepareFlushBuffer(buf_hdr, &max_lsn)) + { + /* May be nothing to do if buffer was cleaned */ + LWLockRelease(BufferDescriptorGetContentLock(buf_hdr)); + } + else + { + DoFlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context, max_lsn); + LWLockRelease(BufferDescriptorGetContentLock(buf_hdr)); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &buf_hdr->tag); + } } @@ -3424,8 +3435,8 @@ BufferNeedsWALFlush(BufferDesc *bufdesc, bool exclusive, XLogRecPtr *lsn) Buffer buffer; /* - * Unlogged buffers can't need WAL flush. See FlushBuffer() for more - * details on unlogged relations with LSNs. + * Unlogged buffers can't need WAL flush. See PrepareFlushBuffer() for + * more details on unlogged relations with LSNs. */ if (!(buf_state & BM_PERMANENT)) { @@ -4413,11 +4424,20 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context) { - XLogRecPtr recptr; - ErrorContextCallback errcallback; - instr_time io_start; - Block bufBlock; - char *bufToWrite; + XLogRecPtr lsn; + + if (PrepareFlushBuffer(buf, &lsn)) + DoFlushBuffer(buf, reln, io_object, io_context, lsn); +} + +/* + * Prepare the buffer with bufdesc for writing. Returns true if the buffer + * actually needs writing and false otherwise. lsn returns the buffer's LSN if + * the table is logged. + */ +static bool +PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn) +{ uint32 buf_state; /* @@ -4425,42 +4445,16 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false, false)) - return; - - /* Setup error traceback support for ereport() */ - errcallback.callback = shared_buffer_write_error_callback; - errcallback.arg = buf; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* Find smgr relation for buffer */ - if (reln == NULL) - reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER); - - TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag), - buf->tag.blockNum, - reln->smgr_rlocator.locator.spcOid, - reln->smgr_rlocator.locator.dbOid, - reln->smgr_rlocator.locator.relNumber); - - buf_state = LockBufHdr(buf); - - /* - * Run PageGetLSN while holding header lock, since we don't have the - * buffer locked exclusively in all cases. - */ - recptr = BufferGetLSN(buf); + if (!StartBufferIO(bufdesc, false, false)) + return false; - /* To check if block content changes while flushing. - vadim 01/17/97 */ - UnlockBufHdrExt(buf, buf_state, - 0, BM_JUST_DIRTIED, - 0); + *lsn = InvalidXLogRecPtr; + buf_state = LockBufHdr(bufdesc); /* - * Force XLOG flush up to buffer's LSN. This implements the basic WAL - * rule that log updates must hit disk before any of the data-file changes - * they describe do. + * Record the buffer's LSN. We will force XLOG flush up to buffer's LSN. + * This implements the basic WAL rule that log updates must hit disk + * before any of the data-file changes they describe do. * * However, this rule does not apply to unlogged relations, which will be * lost after a crash anyway. Most unlogged relation pages do not bear @@ -4473,9 +4467,54 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * happen, attempting to flush WAL through that location would fail, with * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. + * + * We must hold the buffer header lock when examining the page LSN since + * don't have buffer exclusively locked in all cases. */ if (buf_state & BM_PERMANENT) - XLogFlush(recptr); + *lsn = BufferGetLSN(bufdesc); + + /* To check if block content changes while flushing. - vadim 01/17/97 */ + UnlockBufHdrExt(bufdesc, buf_state, + 0, BM_JUST_DIRTIED, + 0); + return true; +} + +/* + * Actually do the write I/O to clean a buffer. buf and reln may be modified. + */ +static void +DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, + IOContext io_context, XLogRecPtr buffer_lsn) +{ + ErrorContextCallback errcallback; + instr_time io_start; + Block bufBlock; + char *bufToWrite; + + /* Setup error traceback support for ereport() */ + errcallback.callback = shared_buffer_write_error_callback; + errcallback.arg = buf; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Find smgr relation for buffer */ + if (reln == NULL) + reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER); + + TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag), + buf->tag.blockNum, + reln->smgr_rlocator.locator.spcOid, + reln->smgr_rlocator.locator.dbOid, + reln->smgr_rlocator.locator.relNumber); + + /* Force XLOG flush up to buffer's LSN */ + if (XLogRecPtrIsValid(buffer_lsn)) + { + Assert(pg_atomic_read_u32(&buf->state) & BM_PERMANENT); + XLogFlush(buffer_lsn); + } /* * Now it's safe to write the buffer to disk. Note that no one else should -- 2.43.0
From bc956edc684384ed1815dbfeb72bca43268fd88b Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 7 Jan 2026 14:56:49 -0500 Subject: [PATCH v12 3/8] Eagerly flush bulkwrite strategy ring Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably need to flush buffers in the strategy ring in order to reuse them. By eagerly flushing the buffers in a larger run, we encourage larger writes at the kernel level and less interleaving of WAL flushes and data file writes. The effect is mainly noticeable with multiple parallel COPY FROMs. In this case, client backends achieve higher write throughput and end up spending less time waiting on acquiring the lock to flush WAL. Larger flush operations also mean less time waiting for flush operations at the kernel level. The heuristic for eager eviction is to only flush buffers in the strategy ring which do not require a WAL flush. This patch also is a step toward AIO writes, as it lines up multiple buffers that can be issued asynchronously once the infrastructure exists. Author: Melanie Plageman <[email protected]> Reviewed-by: Chao Li <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Earlier version Reviewed-by: Kirill Reshke <[email protected]> Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 160 ++++++++++++++++++++++++++ src/backend/storage/buffer/freelist.c | 48 ++++++++ src/include/storage/buf_internals.h | 4 + 3 files changed, 212 insertions(+) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 2c706682eb3..d31b6243354 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -631,6 +631,10 @@ static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); + +static BufferDesc *PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, + Buffer bufnum, + XLogRecPtr *max_lsn); static bool PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn); static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context, @@ -2503,6 +2507,59 @@ again: /* May be nothing to do if buffer was cleaned */ LWLockRelease(BufferDescriptorGetContentLock(buf_hdr)); } + else if (from_ring && StrategySupportsEagerFlush(strategy)) + { + Buffer sweep_end = buf; + int cursor = StrategyGetCurrentIndex(strategy); + bool first_buffer = true; + BufferDesc *next_bufdesc = buf_hdr; + + /* + * Flush the victim buffer and then loop around strategy ring one + * time eagerly flushing all of the eligible buffers. + */ + for (;;) + { + Buffer next_buf; + + if (next_bufdesc) + { + DoFlushBuffer(next_bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + LWLockRelease(BufferDescriptorGetContentLock(next_bufdesc)); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &next_bufdesc->tag); + /* We leave the first buffer pinned for the caller */ + if (!first_buffer) + UnpinBuffer(next_bufdesc); + first_buffer = false; + } + + next_buf = StrategyNextBuffer(strategy, &cursor); + + /* Completed one sweep of the ring buffer */ + if (next_buf == sweep_end) + break; + + /* + * For strategies currently supporting eager flush + * (BAS_BULKWRITE, eventually BAS_VACUUM), once you hit an + * InvalidBuffer, the remaining buffers in the ring will be + * invalid. If BAS_BULKREAD is someday supported, this logic + * will have to change. + */ + if (!BufferIsValid(next_buf)) + break; + + /* + * Check buffer eager flush eligibility. If the buffer is + * ineligible, we'll keep looking until we complete one full + * sweep around the ring. + */ + next_bufdesc = PrepareOrRejectEagerFlushBuffer(strategy, + next_buf, + &max_lsn); + } + } else { DoFlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context, max_lsn); @@ -4430,6 +4487,109 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } +/* + * Prepare bufdesc for eager flushing. + * + * Given bufnum, return the buffer descriptor of the buffer to eagerly flush, + * pinned and locked and with BM_IO_IN_PROGRESS set, or NULL if this buffer + * does not contain a block that should be flushed. + * + * max_lsn may be updated if the provided buffer LSN exceeds the current max + * LSN. + */ +static BufferDesc * +PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, + Buffer bufnum, + XLogRecPtr *max_lsn) +{ + BufferDesc *bufdesc; + uint32 buf_state; + XLogRecPtr lsn; + LWLock *content_lock; + + if (!BufferIsValid(bufnum)) + goto reject_buffer; + + Assert(!BufferIsLocal(bufnum)); + + bufdesc = GetBufferDescriptor(bufnum - 1); + buf_state = pg_atomic_read_u32(&bufdesc->state); + + /* + * Quick racy check to see if the buffer is clean, in which case we don't + * need to flush it. We'll recheck if it is dirty again later before + * actually setting BM_IO_IN_PROGRESS. + */ + if (!(buf_state & BM_DIRTY)) + goto reject_buffer; + + /* + * Quick check to see if the buffer is pinned, in which case it is more + * likely to be dirtied again soon, and we don't want to eagerly flush it. + * We don't care if it has a non-zero usage count because we don't need to + * reuse it right away and a non-zero usage count doesn't necessarily mean + * it will be dirtied again soon. + */ + if (BUF_STATE_GET_REFCOUNT(buf_state) > 0) + goto reject_buffer; + + /* + * Don't eagerly flush buffers requiring WAL flush. We must check this + * again later while holding the buffer content lock for correctness. + */ + if (BufferNeedsWALFlush(bufdesc, false, &lsn)) + goto reject_buffer; + + /* + * Ensure that there's a free refcount entry and resource owner slot for + * the pin before pinning the buffer. While this may leak a refcount and + * slot if we return without a buffer, that slot will be reused. + */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + /* There is no need to flush the buffer if it is not BM_VALID */ + if (!PinBuffer(bufdesc, strategy, /* skip_if_not_valid */ true)) + goto reject_buffer; + + CheckBufferIsPinnedOnce(bufnum); + + content_lock = BufferDescriptorGetContentLock(bufdesc); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + goto reject_buffer_unpin; + + if (BufferNeedsWALFlush(bufdesc, false, &lsn)) + goto reject_buffer_unlock; + + /* Try to start an I/O operation */ + if (!StartBufferIO(bufdesc, false, true)) + goto reject_buffer_unlock; + + /* + * Because we don't eagerly flush buffers that need WAL flushed first, + * this buffer's LSN should only be greater than the victim buffer LSN if + * the victim doesn't need WAL flushing either -- in which case, we don't + * really need to update max_lsn. But, it seems better to keep the max_lsn + * honest -- especially since doing so is cheap. + */ + if (lsn > *max_lsn) + *max_lsn = lsn; + + buf_state = LockBufHdr(bufdesc); + UnlockBufHdrExt(bufdesc, buf_state, 0, BM_JUST_DIRTIED, 0); + + return bufdesc; + +reject_buffer_unlock: + LWLockRelease(content_lock); + +reject_buffer_unpin: + UnpinBuffer(bufdesc); + +reject_buffer: + return NULL; +} + /* * Prepare the buffer with bufdesc for writing. Returns true if the buffer * actually needs writing and false otherwise. lsn returns the buffer's LSN if diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 6a4452e2da0..dfa6b27a4af 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -155,6 +155,31 @@ ClockSweepTick(void) return victim; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * buffers in the ring before they are needed. This can lead to better I/O + * patterns than lazily flushing buffers immediately before reusing them. + */ +bool +StrategySupportsEagerFlush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -306,6 +331,29 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } +/* + * Returns the next buffer in the ring after the one at cursor and increments + * cursor. + */ +Buffer +StrategyNextBuffer(BufferAccessStrategy strategy, int *cursor) +{ + if (++(*cursor) >= strategy->nbuffers) + *cursor = 0; + + return strategy->buffers[*cursor]; +} + +/* + * Return the current slot in the strategy ring. + */ +int +StrategyGetCurrentIndex(BufferAccessStrategy strategy) +{ + return strategy->current; +} + + /* * StrategySyncStart -- tell BgBufferSync where to start syncing * diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 3c774d7a1d2..09521af4bdc 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -503,6 +503,10 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ +extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy); +extern Buffer StrategyNextBuffer(BufferAccessStrategy strategy, + int *cursor); +extern int StrategyGetCurrentIndex(BufferAccessStrategy strategy); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); -- 2.43.0
From 8887bd812fe245169d8e890e0ed2f6709238a594 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 15 Oct 2025 13:42:47 -0400 Subject: [PATCH v12 4/8] Write combining for BAS_BULKWRITE Implement write combining for users of the bulkwrite buffer access strategy (e.g. COPY FROM). When the buffer access strategy needs to clean a buffer for reuse, it already opportunistically flushes some other buffers. Now, combine any contiguous blocks from the same relation into larger writes and issue them with smgrwritev(). The performance benefit for COPY FROM is mostly noticeable for multiple concurrent COPY FROMs because a single COPY FROM is either CPU bound or bound by WAL writes. The infrastructure for flushing larger batches of IOs will be reused by checkpointer and other processes doing writes of dirty data. XXX: Because this sets in-place checksums for batches, it is not committable until additional infrastructure goes in place. Author: Melanie Plageman <[email protected]> Reviewed-by: Chao Li <[email protected]> Discussion: https://postgr.es/m/flat/CAAKRu_bcWRvRwZUop_d9vzF9nHAiT%2B-uPzkJ%3DS3ShZ1GqeAYOw%40mail.gmail.com --- src/backend/storage/buffer/bufmgr.c | 325 ++++++++++++++++++++++++++-- src/backend/storage/page/bufpage.c | 20 ++ src/backend/utils/probes.d | 2 + src/include/storage/buf_internals.h | 31 +++ src/include/storage/bufpage.h | 2 + src/tools/pgindent/typedefs.list | 2 + 6 files changed, 367 insertions(+), 15 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index d31b6243354..04026cbff5e 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -162,6 +162,19 @@ typedef struct SMgrSortArray SMgrRelation srel; } SMgrSortArray; +/* + * All blocks in a given BufferWriteBatch have to be a continuous range in the + * same fork in the same relation. This convenience struct specifies the + * requirements for the next block in the batch, used when deciding whether or + * not to include or reject a given block. + */ +typedef struct BatchBlockRequirements +{ + RelFileLocator *rlocator; + ForkNumber forkno; + BlockNumber block; +} BatchBlockRequirements; + /* GUC variables */ bool zero_damaged_pages = false; int bgwriter_lru_maxpages = 100; @@ -626,15 +639,23 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, bool *foundPtr, IOContext io_context); static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress); static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete); +static uint32 MaxWriteBatchSize(BufferAccessStrategy strategy); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); +static bool BufferHasRequiredBlock(BatchBlockRequirements *require, BufferDesc *bufdesc); static BufferDesc *PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, Buffer bufnum, + BatchBlockRequirements *require, XLogRecPtr *max_lsn); +static void FindStrategyFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end, + BufferDesc *batch_start, + uint32 max_batch_size, + BufferWriteBatch *batch, + int *sweep_cursor); static bool PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn); static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context, @@ -2418,6 +2439,34 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr) return true; } +/* + * Determine the largest IO we can assemble given strategy-specific and global + * constraints on the number of pinned buffers and max IO size. Currently only + * a single write is inflight at a time, so the batch can consume all the + * pinned buffers this backend is allowed. Only for batches of shared + * (non-local) relations. + */ +static uint32 +MaxWriteBatchSize(BufferAccessStrategy strategy) +{ + uint32 result = io_combine_limit; + uint32 strategy_pin_limit; + uint32 max_pin_limit = GetPinLimit(); + + /* Apply pin limits */ + result = Min(result, max_pin_limit); + if (strategy) + { + strategy_pin_limit = GetAccessStrategyPinLimit(strategy); + result = Min(result, strategy_pin_limit); + } + + /* Ensure forward progress */ + result = Max(result, 1); + + return result; +} + static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) { @@ -2511,12 +2560,19 @@ again: { Buffer sweep_end = buf; int cursor = StrategyGetCurrentIndex(strategy); - bool first_buffer = true; + uint32 max_batch_size = MaxWriteBatchSize(strategy); BufferDesc *next_bufdesc = buf_hdr; + /* Pin victim again so it stays ours even after batch released */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + IncrBufferRefCount(BufferDescriptorGetBuffer(buf_hdr)); + /* * Flush the victim buffer and then loop around strategy ring one - * time eagerly flushing all of the eligible buffers. + * time eagerly flushing all of the eligible buffers. IO + * concurrency only needs to be taken into account if AIO writes + * are added in the future. */ for (;;) { @@ -2524,14 +2580,22 @@ again: if (next_bufdesc) { - DoFlushBuffer(next_bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); - LWLockRelease(BufferDescriptorGetContentLock(next_bufdesc)); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &next_bufdesc->tag); - /* We leave the first buffer pinned for the caller */ - if (!first_buffer) - UnpinBuffer(next_bufdesc); - first_buffer = false; + BufferWriteBatch batch; + + /* + * After finding one eligible buffer, identify any of the + * buffers following it which are also eligibile and + * combine them into a batch. The cursor will be advanced + * through the ring such that the next write batch will + * start at the next eligible buffer after the current + * batch ends. + */ + FindStrategyFlushAdjacents(strategy, sweep_end, + next_bufdesc, max_batch_size, &batch, + &cursor); + FlushBufferBatch(&batch, io_context); + /* Pins and locks released inside CompleteWriteBatchIO */ + CompleteWriteBatchIO(&batch, io_context, &BackendWritebackContext); } next_buf = StrategyNextBuffer(strategy, &cursor); @@ -2557,6 +2621,7 @@ again: */ next_bufdesc = PrepareOrRejectEagerFlushBuffer(strategy, next_buf, + NULL, &max_lsn); } } @@ -4487,6 +4552,29 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } +/* + * Quick check to see if the buffer contains the required block from the right + * fork of the right relation. If you don't hold the buffer header spinlock, + * you can't guarantee that these won't change out from under you. + */ +static bool +BufferHasRequiredBlock(BatchBlockRequirements *require, BufferDesc *bufdesc) +{ + Assert(BlockNumberIsValid(require->block)); + if (bufdesc->tag.blockNum != require->block) + return false; + + Assert(require->rlocator); + if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag), *require->rlocator)) + return false; + + Assert(require->forkno != InvalidForkNumber); + if (BufTagGetForkNum(&bufdesc->tag) != require->forkno) + return false; + + return true; +} + /* * Prepare bufdesc for eager flushing. * @@ -4494,12 +4582,17 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * pinned and locked and with BM_IO_IN_PROGRESS set, or NULL if this buffer * does not contain a block that should be flushed. * + * If the caller requires a particular block to be in the buffer in order to + * accept it, they will provide the required block number and its + * RelFileLocator and fork. + * * max_lsn may be updated if the provided buffer LSN exceeds the current max * LSN. */ static BufferDesc * PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, Buffer bufnum, + BatchBlockRequirements *require, XLogRecPtr *max_lsn) { BufferDesc *bufdesc; @@ -4513,11 +4606,19 @@ PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, Assert(!BufferIsLocal(bufnum)); bufdesc = GetBufferDescriptor(bufnum - 1); + + /* + * Quick, unsafe checks to see if buffer even possibly contains a block + * meeting our requirements. We'll recheck it all again after getting a + * pin. + */ + if (require && !BufferHasRequiredBlock(require, bufdesc)) + goto reject_buffer; + buf_state = pg_atomic_read_u32(&bufdesc->state); /* - * Quick racy check to see if the buffer is clean, in which case we don't - * need to flush it. We'll recheck if it is dirty again later before + * We'll recheck if it is dirty later, when we have a pin and lock, before * actually setting BM_IO_IN_PROGRESS. */ if (!(buf_state & BM_DIRTY)) @@ -4554,10 +4655,15 @@ PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, CheckBufferIsPinnedOnce(bufnum); + /* Now that we have the buffer pinned, recheck it's got the right block */ + if (require && !BufferHasRequiredBlock(require, bufdesc)) + goto reject_buffer_unpin; + content_lock = BufferDescriptorGetContentLock(bufdesc); if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) goto reject_buffer_unpin; + /* Now that we have the lock, recheck if it needs WAL flush */ if (BufferNeedsWALFlush(bufdesc, false, &lsn)) goto reject_buffer_unlock; @@ -4566,9 +4672,9 @@ PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, goto reject_buffer_unlock; /* - * Because we don't eagerly flush buffers that need WAL flushed first, - * this buffer's LSN should only be greater than the victim buffer LSN if - * the victim doesn't need WAL flushing either -- in which case, we don't + * Because we don't eagerly flush buffers that need WAL flushed, this + * buffer's LSN should only be greater than the victim buffer LSN if the + * victim doesn't need WAL flushing either -- in which case, we don't * really need to update max_lsn. But, it seems better to keep the max_lsn * honest -- especially since doing so is cheap. */ @@ -4590,6 +4696,154 @@ reject_buffer: return NULL; } +/* + * Given a starting buffer descriptor from a strategy ring that supports eager + * flushing, find additional buffers from the ring that can be combined into a + * single write batch with the starting buffer. + * + * max_batch_size is the maximum number of blocks that can be combined into a + * single write in general. This function, based on the block number of start, + * will determine the maximum IO size for this particular write given how much + * of the file remains. max_batch_size is provided by the caller so it doesn't + * have to be recalculated for each write. + * + * batch is an output parameter that this function will fill with the needed + * information to issue this IO. + * + * This function will pin and content lock all of the buffers that it + * assembles for the IO batch. The caller is responsible for issuing the IO. + */ +static void +FindStrategyFlushAdjacents(BufferAccessStrategy strategy, + Buffer sweep_end, + BufferDesc *batch_start, + uint32 max_batch_size, + BufferWriteBatch *batch, + int *sweep_cursor) +{ + BlockNumber limit; + BatchBlockRequirements require; + + Assert(batch_start); + batch->bufdescs[0] = batch_start; + + LockBufHdr(batch_start); + batch->max_lsn = BufferGetLSN(batch_start); + UnlockBufHdr(batch_start); + + batch->start = batch->bufdescs[0]->tag.blockNum; + Assert(BlockNumberIsValid(batch->start)); + batch->n = 1; + batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag); + batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag); + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + + limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start); + limit = Min(max_batch_size, limit); + limit = Min(GetAdditionalPinLimit(), limit); + + /* + * It's possible we're not allowed any more pins or there aren't more + * blocks in the target relation. In this case, just return. Our batch + * will have only one buffer. + */ + if (limit <= 1) + return; + + require.rlocator = &batch->rlocator; + require.forkno = batch->forkno; + + /* Now assemble a run of blocks to write out. */ + for (; batch->n < limit; batch->n++) + { + Buffer bufnum; + + if ((bufnum = + StrategyNextBuffer(strategy, sweep_cursor)) == sweep_end) + break; + + /* + * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining + * buffers in the ring will be invalid. + */ + if (!BufferIsValid(bufnum)) + break; + + require.block = batch->start + batch->n; + + /* Stop when we encounter a buffer that will break the run */ + if ((batch->bufdescs[batch->n] = + PrepareOrRejectEagerFlushBuffer(strategy, bufnum, + &require, + &batch->max_lsn)) == NULL) + break; + } +} + +/* + * Given a prepared batch of buffers write them out as a vector. + */ +void +FlushBufferBatch(BufferWriteBatch *batch, + IOContext io_context) +{ + BlockNumber blknums[MAX_IO_COMBINE_LIMIT]; + Block blocks[MAX_IO_COMBINE_LIMIT]; + instr_time io_start; + ErrorContextCallback errcallback = + { + .callback = shared_buffer_write_error_callback, + .previous = error_context_stack, + }; + + error_context_stack = &errcallback; + + if (XLogRecPtrIsValid(batch->max_lsn)) + XLogFlush(batch->max_lsn); + + if (batch->reln == NULL) + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + +#ifdef USE_ASSERT_CHECKING + for (uint32 i = 0; i < batch->n; i++) + { + XLogRecPtr lsn; + + Assert(!BufferNeedsWALFlush(batch->bufdescs[i], false, &lsn)); + } +#endif + + TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_START(batch->forkno, + batch->reln->smgr_rlocator.locator.spcOid, + batch->reln->smgr_rlocator.locator.dbOid, + batch->reln->smgr_rlocator.locator.relNumber, + batch->reln->smgr_rlocator.backend, + batch->n); + + /* + * XXX: All blocks should be copied and then checksummed but doing so + * takes a lot of extra memory and a future patch will eliminate this + * requirement. + */ + for (BlockNumber i = 0; i < batch->n; i++) + { + blknums[i] = batch->start + i; + blocks[i] = BufHdrGetBlock(batch->bufdescs[i]); + } + + PageSetBatchChecksumInplace((Page *) blocks, blknums, batch->n); + + io_start = pgstat_prepare_io_time(track_io_timing); + + smgrwritev(batch->reln, batch->forkno, + batch->start, (const void **) blocks, batch->n, false); + + pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, IOOP_WRITE, + io_start, batch->n, BLCKSZ); + + error_context_stack = errcallback.previous; +} + /* * Prepare the buffer with bufdesc for writing. Returns true if the buffer * actually needs writing and false otherwise. lsn returns the buffer's LSN if @@ -4754,6 +5008,47 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, LWLockRelease(BufferDescriptorGetContentLock(buf)); } +/* + * Given a previously initialized batch with buffers that have already been + * flushed, terminate the IO on each buffer and then unlock and unpin them. + * This assumes all the buffers were locked and pinned. wb_context will be + * modified. + */ +void +CompleteWriteBatchIO(BufferWriteBatch *batch, IOContext io_context, + WritebackContext *wb_context) +{ + ErrorContextCallback errcallback = + { + .callback = shared_buffer_write_error_callback, + .previous = error_context_stack, + }; + + error_context_stack = &errcallback; + pgBufferUsage.shared_blks_written += batch->n; + + for (uint32 i = 0; i < batch->n; i++) + { + Buffer buffer = BufferDescriptorGetBuffer(batch->bufdescs[i]); + + errcallback.arg = batch->bufdescs[i]; + + /* Mark the buffer as clean and end the BM_IO_IN_PROGRESS state. */ + TerminateBufferIO(batch->bufdescs[i], true, 0, true, false); + UnlockReleaseBuffer(buffer); + ScheduleBufferTagForWriteback(wb_context, io_context, + &batch->bufdescs[i]->tag); + } + + TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_DONE(batch->forkno, + batch->reln->smgr_rlocator.locator.spcOid, + batch->reln->smgr_rlocator.locator.dbOid, + batch->reln->smgr_rlocator.locator.relNumber, + batch->reln->smgr_rlocator.backend, + batch->n, batch->start); + error_context_stack = errcallback.previous; +} + /* * RelationGetNumberOfBlocksInFork * Determines the current number of pages in the specified relation fork. diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index de85911e3ac..4d0f1883a26 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -1546,3 +1546,23 @@ PageSetChecksumInplace(Page page, BlockNumber blkno) ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno); } + +/* + * A helper to set multiple blocks' checksums + */ +void +PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, uint32 length) +{ + /* If we don't need a checksum, just return */ + if (!DataChecksumsEnabled()) + return; + + for (uint32 i = 0; i < length; i++) + { + Page page = pages[i]; + + if (PageIsNew(page)) + continue; + ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]); + } +} diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index 1929521c6a5..e0f48c6d2d9 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -61,6 +61,8 @@ provider postgresql { probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid); probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int); probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber); + probe buffer__batch__flush__start(ForkNumber, Oid, Oid, Oid, int, unsigned int); + probe buffer__batch__flush__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber); probe buffer__checkpoint__start(int); probe buffer__checkpoint__sync__start(); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 09521af4bdc..29d7e1d7184 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -484,6 +484,34 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer) ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc); } +/* + * Used to write out multiple blocks at a time in a combined IO. bufdescs + * contains buffer descriptors for buffers containing adjacent blocks of the + * same fork of the same relation. + */ +typedef struct BufferWriteBatch +{ + RelFileLocator rlocator; + ForkNumber forkno; + SMgrRelation reln; + + /* + * The BlockNumber of the first block in the run of contiguous blocks to + * be written out as a single IO. + */ + BlockNumber start; + + /* + * While assembling the buffers, we keep track of the maximum LSN so that + * we can flush WAL through this LSN before flushing the buffers. + */ + XLogRecPtr max_lsn; + + /* The number of valid buffers in bufdescs */ + uint32 n; + BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT]; +} BufferWriteBatch; + /* * Internal buffer management routines */ @@ -492,6 +520,7 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending); extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context); extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context, BufferTag *tag); +extern void FlushBufferBatch(BufferWriteBatch *batch, IOContext io_context); extern void TrackNewBufferPin(Buffer buf); extern bool BufferNeedsWALFlush(BufferDesc *bufdesc, bool exclusive, XLogRecPtr *lsn); @@ -507,6 +536,8 @@ extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy); extern Buffer StrategyNextBuffer(BufferAccessStrategy strategy, int *cursor); extern int StrategyGetCurrentIndex(BufferAccessStrategy strategy); +extern void CompleteWriteBatchIO(BufferWriteBatch *batch, IOContext io_context, + WritebackContext *wb_context); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index ae3725b3b81..baadfc6c313 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -506,5 +506,7 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, const void *newtup, Size newsize); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern void PageSetChecksumInplace(Page page, BlockNumber blkno); +extern void PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, + uint32 length); #endif /* BUFPAGE_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 14dec2d49c1..f1606d50532 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -255,6 +255,7 @@ Barrier BaseBackupCmd BaseBackupTargetHandle BaseBackupTargetType +BatchBlockRequirements BatchMVCCState BeginDirectModify_function BeginForeignInsert_function @@ -359,6 +360,7 @@ BufferManagerRelation BufferStrategyControl BufferTag BufferUsage +BufferWriteBatch BuildAccumulator BuiltinScript BulkInsertState -- 2.43.0
From ac13cf966330b7f018479ae8828544e3fe522158 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Tue, 2 Sep 2025 15:22:11 -0400 Subject: [PATCH v12 5/8] Add database Oid to CkptSortItem This is an oversight that currently isn't causing harm. However, it is required for checkpointer write combining -- which will be added in a future commit. Author: Melanie Plageman <[email protected]> Reviewed-by: Chao Li <[email protected]> Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com --- src/backend/storage/buffer/bufmgr.c | 8 ++++++++ src/include/storage/buf_internals.h | 1 + 2 files changed, 9 insertions(+) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 04026cbff5e..e1f3d45b522 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -3663,6 +3663,7 @@ BufferSync(int flags) item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; item->tsId = bufHdr->tag.spcOid; + item->dbId = bufHdr->tag.dbOid; item->relNumber = BufTagGetRelNumber(&bufHdr->tag); item->forkNum = BufTagGetForkNum(&bufHdr->tag); item->blockNum = bufHdr->tag.blockNum; @@ -6998,6 +6999,13 @@ ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b) return -1; else if (a->tsId > b->tsId) return 1; + + /* compare database */ + if (a->dbId < b->dbId) + return -1; + else if (a->dbId > b->dbId) + return 1; + /* compare relation */ if (a->relNumber < b->relNumber) return -1; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 29d7e1d7184..5717b44f2f0 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -450,6 +450,7 @@ extern uint32 WaitBufHdrUnlocked(BufferDesc *buf); typedef struct CkptSortItem { Oid tsId; + Oid dbId; RelFileNumber relNumber; ForkNumber forkNum; BlockNumber blockNum; -- 2.43.0
From 735bb47365c181c78586092fada1756c5d13dd93 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 15 Oct 2025 15:23:16 -0400 Subject: [PATCH v12 6/8] Implement checkpointer data write combining When the checkpointer writes out dirty buffers, writing multiple contiguous blocks as a single IO is a substantial performance improvement. The checkpointer is usually bottlenecked on IO, so issuing larger IOs leads to increased write throughput and faster checkpoints. Author: Melanie Plageman <[email protected]> Reviewed-by: Chao Li <[email protected]> Reviewed-by: Soumya <[email protected]> Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com --- src/backend/storage/buffer/bufmgr.c | 225 ++++++++++++++++++++++++---- src/backend/utils/probes.d | 2 +- 2 files changed, 199 insertions(+), 28 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e1f3d45b522..83cc4018d28 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -3604,7 +3604,6 @@ BufferNeedsWALFlush(BufferDesc *bufdesc, bool exclusive, XLogRecPtr *lsn) static void BufferSync(int flags) { - uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -3616,6 +3615,8 @@ BufferSync(int flags) int i; uint32 mask = BM_DIRTY; WritebackContext wb_context; + uint32 max_batch_size; + BufferWriteBatch batch; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3647,6 +3648,7 @@ BufferSync(int flags) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); uint32 set_bits = 0; + uint32 buf_state; /* * Header spinlock is enough to examine BM_DIRTY, see comment in @@ -3789,48 +3791,217 @@ BufferSync(int flags) */ num_processed = 0; num_written = 0; + max_batch_size = MaxWriteBatchSize(NULL); while (!binaryheap_empty(ts_heap)) { + BlockNumber limit = max_batch_size; BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); + int ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan; + int processed = 0; - buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); + batch.start = InvalidBlockNumber; + batch.max_lsn = InvalidXLogRecPtr; + batch.n = 0; - bufHdr = GetBufferDescriptor(buf_id); + while (batch.n < limit) + { + uint32 buf_state; + XLogRecPtr lsn = InvalidXLogRecPtr; + LWLock *content_lock; + CkptSortItem item; + Buffer buffer; - num_processed++; + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); - /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. - */ - if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) - { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + /* Check if we are done with this tablespace */ + if (ts_stat->index + processed >= ts_end) + break; + + item = CkptBufferIds[ts_stat->index + processed]; + + buf_id = item.buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + buffer = BufferDescriptorGetBuffer(bufHdr); + + /* + * If this is the first block of the batch, then check if we need + * to open a new relation. Open the relation now because we have + * to determine the maximum IO size based on how many blocks + * remain in the file. + */ + if (!BlockNumberIsValid(batch.start)) { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0); + batch.rlocator.spcOid = item.tsId; + batch.rlocator.dbOid = item.dbId; + batch.rlocator.relNumber = item.relNumber; + batch.forkno = item.forkNum; + batch.start = item.blockNum; + batch.reln = smgropen(batch.rlocator, INVALID_PROC_NUMBER); + limit = smgrmaxcombine(batch.reln, batch.forkno, batch.start); + limit = Min(max_batch_size, limit); + limit = Min(GetAdditionalPinLimit(), limit); + /* Guarantee progress */ + limit = Max(limit, 1); } + + /* + * Once we hit blocks from the next relation or fork of the + * relation, break out of the loop and issue the IO we've built up + * so far. It is important that we don't increment processed + * because we want to start the next IO with this item. + */ + if (item.dbId != batch.rlocator.dbOid || + item.relNumber != batch.rlocator.relNumber || + item.forkNum != batch.forkno) + break; + + Assert(item.tsId == batch.rlocator.spcOid); + + /* + * If the next block is not contiguous, we can't include it in the + * IO we will issue. Break out of the loop and issue what we have + * so far. Do not count this item as processed -- otherwise we + * will end up skipping it. + */ + if (item.blockNum != batch.start + batch.n) + break; + + /* + * We don't need to acquire the lock here, because we're only + * looking at a few bits. It's possible that someone else writes + * the buffer and clears the flag right after we check, but that + * doesn't matter since StartBufferIO will then return false. + * + * If the buffer doesn't need checkpointing, don't include it in + * the batch we are building. And if the buffer doesn't need + * flushing, we're done with the item, so count it as processed + * and break out of the loop to issue the IO so far. + */ + buf_state = pg_atomic_read_u32(&bufHdr->state); + if ((buf_state & (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) != + (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) + { + processed++; + break; + } + + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + + /* If the buffer is not BM_VALID, nothing to do on this buffer */ + if (!PinBuffer(bufHdr, NULL, true)) + { + processed++; + break; + } + + /* + * Now that we have a pin, we must recheck that the buffer + * contains the specified block. Someone may have replaced the + * block in the buffer with a different block. In that case, count + * it as processed and issue the IO so far. + */ + if (!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufHdr->tag), + batch.rlocator) || + BufTagGetForkNum(&bufHdr->tag) != batch.forkno || + item.blockNum != BufferGetBlockNumber(buffer)) + { + UnpinBuffer(bufHdr); + processed++; + break; + } + + /* + * It's conceivable that between the time we examine the buffer + * header for BM_CHECKPOINT_NEEDED above and when we are now + * acquiring the lock that someone else wrote the buffer out. In + * that improbable case, we will write the buffer though we didn't + * need to. It doesn't seem worth guarding against this, though. + */ + content_lock = BufferDescriptorGetContentLock(bufHdr); + + /* + * We are willing to wait for the content lock on the first IO in + * the batch. However, for subsequent IOs, waiting could lead to + * deadlock. We have to eventually flush all eligible buffers, + * though. So, if we fail to acquire the lock on a subsequent + * buffer, we break out and issue the IO we've built up so far. + * Then we come back and start a new IO with that buffer as the + * starting buffer. As such, we must not count the item as + * processed if we end up failing to acquire the content lock. + */ + if (batch.n == 0) + LWLockAcquire(content_lock, LW_SHARED); + else if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + UnpinBuffer(bufHdr); + break; + } + + /* + * If the buffer doesn't need IO, count the item as processed, + * release the buffer, and break out of the loop to issue the IO + * we have built up so far. + */ + if (!StartBufferIO(bufHdr, false, true)) + { + LWLockRelease(content_lock); + UnpinBuffer(bufHdr); + processed++; + break; + } + + /* + * Lock buffer header lock before examining LSN because we only + * have a shared lock on the buffer. + */ + buf_state = LockBufHdr(bufHdr); + lsn = BufferGetLSN(bufHdr); + UnlockBufHdrExt(bufHdr, buf_state, 0, BM_JUST_DIRTIED, 0); + + /* + * Keep track of the max LSN so that we can be sure to flush + * enough WAL before flushing data from the buffers. See comment + * in DoFlushBuffer() for more on why we don't consider the LSNs + * of unlogged relations. + */ + if (buf_state & BM_PERMANENT && lsn > batch.max_lsn) + batch.max_lsn = lsn; + + batch.bufdescs[batch.n++] = bufHdr; + processed++; } /* * Measure progress independent of actually having to flush the buffer - * - otherwise writing become unbalanced. + * - otherwise writing becomes unbalanced. */ - ts_stat->progress += ts_stat->progress_slice; - ts_stat->num_scanned++; - ts_stat->index++; + num_processed += processed; + ts_stat->progress += ts_stat->progress_slice * processed; + ts_stat->num_scanned += processed; + ts_stat->index += processed; + + /* + * If we built up an IO, issue it. There's a chance we didn't find any + * items referencing buffers that needed flushing this time, but we + * still want to check if we should update the heap if we examined and + * processed the items. + */ + if (batch.n > 0) + { + FlushBufferBatch(&batch, IOCONTEXT_NORMAL); + CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, &wb_context); + + TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n); + PendingCheckpointerStats.buffers_written += batch.n; + num_written += batch.n; + } /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index e0f48c6d2d9..90169c92c26 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -68,7 +68,7 @@ provider postgresql { probe buffer__checkpoint__sync__start(); probe buffer__checkpoint__done(); probe buffer__sync__start(int, int); - probe buffer__sync__written(int); + probe buffer__batch__sync__written(BlockNumber); probe buffer__sync__done(int, int, int); probe deadlock__found(); -- 2.43.0
From 3b9727f5f4d7cddbb1dcc09b669ef34f2371a341 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 15 Oct 2025 16:16:58 -0400 Subject: [PATCH v12 7/8] Refactor SyncOneBuffer for bgwriter use only Since xxx, only bgwriter uses SyncOneBuffer, so we can remove the skip_recently_used parameter and make that behavior the default. While we are at it, 5e89985928795f243 introduced the pattern of using a CAS loop instead of locking the buffer header and then calling PinBuffer_Locked(). Do that in SyncOneBuffer() so we can avoid taking the buffer header spinlock in the common case that the buffer is recently used. Author: Melanie Plageman <[email protected]> Reviewed-by: Chao Li <[email protected]> Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com --- src/backend/storage/buffer/bufmgr.c | 96 +++++++++++++++++------------ 1 file changed, 56 insertions(+), 40 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 83cc4018d28..4ae70915089 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -625,8 +625,7 @@ static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); static void BufferSync(int flags); -static int SyncOneBuffer(int buf_id, bool skip_recently_used, - WritebackContext *wb_context); +static int SyncOneBuffer(int buf_id, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); static void AbortBufferIO(Buffer buffer); static void shared_buffer_write_error_callback(void *arg); @@ -4275,8 +4274,7 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int sync_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state = SyncOneBuffer(next_to_clean, wb_context); if (++next_to_clean >= NBuffers) { @@ -4339,8 +4337,8 @@ BgBufferSync(WritebackContext *wb_context) /* * SyncOneBuffer -- process a single buffer during syncing. * - * If skip_recently_used is true, we don't write currently-pinned buffers, nor - * buffers marked recently used, as these are not replacement candidates. + * We don't write currently-pinned buffers, nor buffers marked recently used, + * as these are not replacement candidates. * * Returns a bitmask containing the following flag bits: * BUF_WRITTEN: we wrote the buffer. @@ -4351,53 +4349,71 @@ BgBufferSync(WritebackContext *wb_context) * after locking it, but we don't care all that much.) */ static int -SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) +SyncOneBuffer(int buf_id, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 old_buf_state; uint32 buf_state; BufferTag tag; - /* Make sure we can handle the pin */ - ReservePrivateRefCountEntry(); - ResourceOwnerEnlarge(CurrentResourceOwner); - /* - * Check whether buffer needs writing. - * - * We can make this check without taking the buffer content lock so long - * as we mark pages dirty in access methods *before* logging changes with - * XLogInsert(): if someone marks the buffer dirty just after our check we - * don't worry because our checkpoint.redo points before log record for - * upcoming changes and so we are not required to write such dirty buffer. + * Check whether the buffer can be used and pin it if so. Do this using a + * CAS loop, to avoid having to lock the buffer header. */ - buf_state = LockBufHdr(bufHdr); - - if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && - BUF_STATE_GET_USAGECOUNT(buf_state) == 0) + old_buf_state = pg_atomic_read_u32(&bufHdr->state); + for (;;) { + buf_state = old_buf_state; + + /* + * We can make these checks without taking the buffer content lock so + * long as we mark pages dirty in access methods *before* logging + * changes with XLogInsert(): if someone marks the buffer dirty just + * after our check we don't worry because our checkpoint.redo points + * before log record for upcoming changes and so we are not required + * to write such dirty buffer. + */ + if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 || + BUF_STATE_GET_USAGECOUNT(buf_state) != 0) + { + /* Don't write recently-used buffers */ + return result; + } + result |= BUF_REUSABLE; - } - else if (skip_recently_used) - { - /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr); - return result; - } - if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) - { - /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr); - return result; + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) + { + /* It's clean, so nothing to do */ + return result; + } + + if (unlikely(buf_state & BM_LOCKED)) + { + old_buf_state = WaitBufHdrUnlocked(bufHdr); + continue; + } + + /* Make sure we can handle the pin */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + + /* pin the buffer if the CAS succeeds */ + buf_state += BUF_REFCOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state, + buf_state)) + { + TrackNewBufferPin(BufferDescriptorGetBuffer(bufHdr)); + break; + } } /* - * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the - * buffer is clean by the time we've locked it.) + * Share lock and write it out (FlushBuffer will do nothing if the buffer + * is clean by the time we've locked it.) */ - PinBuffer_Locked(bufHdr); - FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); tag = bufHdr->tag; @@ -4405,8 +4421,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) UnpinBuffer(bufHdr); /* - * SyncOneBuffer() is only called by checkpointer and bgwriter, so - * IOContext will always be IOCONTEXT_NORMAL. + * SyncOneBuffer() is only called by bgwriter, so IOContext will always be + * IOCONTEXT_NORMAL. */ ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag); -- 2.43.0
From f533b4bb5e2de11db6c1bdbff03c6232ec7321ac Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Mon, 12 Jan 2026 11:49:41 -0500 Subject: [PATCH v12 8/8] Eagerly flush buffer successors When flushing a dirty buffer, check if it the two blocks following it are in shared buffers and whether or not they are dirty. If they are, flush them together with the victim buffer. --- src/backend/storage/buffer/bufmgr.c | 145 +++++++++++++++++++++++----- 1 file changed, 122 insertions(+), 23 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 4ae70915089..4dc13c7d972 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -649,7 +649,10 @@ static bool BufferHasRequiredBlock(BatchBlockRequirements *require, BufferDesc * static BufferDesc *PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, Buffer bufnum, BatchBlockRequirements *require, + LWLock *buftable_lock, XLogRecPtr *max_lsn); +static void FindFlushAdjacents(BufferAccessStrategy strategy, BufferDesc *batch_start, + BufferWriteBatch *batch); static void FindStrategyFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end, BufferDesc *batch_start, uint32 max_batch_size, @@ -2621,15 +2624,22 @@ again: next_bufdesc = PrepareOrRejectEagerFlushBuffer(strategy, next_buf, NULL, + NULL, &max_lsn); } } else { - DoFlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context, max_lsn); - LWLockRelease(BufferDescriptorGetContentLock(buf_hdr)); - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &buf_hdr->tag); + BufferWriteBatch batch; + + /* Pin victim again so it stays ours even after batch released */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + IncrBufferRefCount(BufferDescriptorGetBuffer(buf_hdr)); + + FindFlushAdjacents(strategy, buf_hdr, &batch); + FlushBufferBatch(&batch, io_context); + CompleteWriteBatchIO(&batch, io_context, &BackendWritebackContext); } } @@ -4740,6 +4750,35 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, DoFlushBuffer(buf, reln, io_object, io_context, lsn); } + +static BlockNumber +WriteBatchInit(BufferDesc *batch_start, uint32 max_batch_size, + BufferWriteBatch *batch) +{ + BlockNumber limit; + + Assert(batch_start); + batch->bufdescs[0] = batch_start; + + LockBufHdr(batch_start); + batch->max_lsn = BufferGetLSN(batch_start); + UnlockBufHdr(batch_start); + + batch->start = batch->bufdescs[0]->tag.blockNum; + Assert(BlockNumberIsValid(batch->start)); + batch->n = 1; + batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag); + batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag); + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + + limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start); + limit = Min(max_batch_size, limit); + limit = Min(GetAdditionalPinLimit(), limit); + + return limit; +} + + /* * Quick check to see if the buffer contains the required block from the right * fork of the right relation. If you don't hold the buffer header spinlock, @@ -4774,6 +4813,9 @@ BufferHasRequiredBlock(BatchBlockRequirements *require, BufferDesc *bufdesc) * accept it, they will provide the required block number and its * RelFileLocator and fork. * + * If the caller is holding the buftable_lock, it will be released after + * acquiring a pin on the buffer. + * * max_lsn may be updated if the provided buffer LSN exceeds the current max * LSN. */ @@ -4781,6 +4823,7 @@ static BufferDesc * PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, Buffer bufnum, BatchBlockRequirements *require, + LWLock *buftable_lock, XLogRecPtr *max_lsn) { BufferDesc *bufdesc; @@ -4841,6 +4884,12 @@ PrepareOrRejectEagerFlushBuffer(BufferAccessStrategy strategy, if (!PinBuffer(bufdesc, strategy, /* skip_if_not_valid */ true)) goto reject_buffer; + if (buftable_lock) + { + LWLockRelease(buftable_lock); + buftable_lock = NULL; + } + CheckBufferIsPinnedOnce(bufnum); /* Now that we have the buffer pinned, recheck it's got the right block */ @@ -4881,6 +4930,8 @@ reject_buffer_unpin: UnpinBuffer(bufdesc); reject_buffer: + if (buftable_lock) + LWLockRelease(buftable_lock); return NULL; } @@ -4909,26 +4960,8 @@ FindStrategyFlushAdjacents(BufferAccessStrategy strategy, BufferWriteBatch *batch, int *sweep_cursor) { - BlockNumber limit; BatchBlockRequirements require; - - Assert(batch_start); - batch->bufdescs[0] = batch_start; - - LockBufHdr(batch_start); - batch->max_lsn = BufferGetLSN(batch_start); - UnlockBufHdr(batch_start); - - batch->start = batch->bufdescs[0]->tag.blockNum; - Assert(BlockNumberIsValid(batch->start)); - batch->n = 1; - batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag); - batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag); - batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); - - limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start); - limit = Min(max_batch_size, limit); - limit = Min(GetAdditionalPinLimit(), limit); + BlockNumber limit = WriteBatchInit(batch_start, max_batch_size, batch); /* * It's possible we're not allowed any more pins or there aren't more @@ -4963,6 +4996,72 @@ FindStrategyFlushAdjacents(BufferAccessStrategy strategy, if ((batch->bufdescs[batch->n] = PrepareOrRejectEagerFlushBuffer(strategy, bufnum, &require, + NULL, + &batch->max_lsn)) == NULL) + break; + } +} + + +/* + * Check if the blocks after my block are in shared buffers and dirty and if + * it is, write them out too + */ +static void +FindFlushAdjacents(BufferAccessStrategy strategy, BufferDesc *batch_start, + BufferWriteBatch *batch) +{ + BufferTag newTag; /* identity of requested block */ + uint32 newHash; /* hash value for newTag */ + LWLock *newPartitionLock; /* buffer partition lock for it */ + int buf_id; + BlockNumber limit; + BlockNumber max_batch_size = 3; /* we only look for two successors */ + BatchBlockRequirements require; + + limit = WriteBatchInit(batch_start, max_batch_size, batch); + + /* + * It's possible we're not allowed any more pins or there aren't more + * blocks in the target relation. In this case, just return. Our batch + * will have only one buffer. + */ + if (limit <= 1) + return; + + require.rlocator = &batch->rlocator; + require.forkno = batch->forkno; + + for (; batch->n < limit; batch->n++) + { + require.block = batch->start + batch->n; + + Assert(BlockNumberIsValid(require.block)); + + /* create a tag so we can lookup the buffer */ + InitBufferTag(&newTag, &batch->rlocator, batch->forkno, + require.block); + + /* determine its hash code and partition lock ID */ + newHash = BufTableHashCode(&newTag); + newPartitionLock = BufMappingPartitionLock(newHash); + + /* see if the block is in the buffer pool already */ + LWLockAcquire(newPartitionLock, LW_SHARED); + buf_id = BufTableLookup(&newTag, newHash); + + /* The block may not even be in shared buffers. */ + if (buf_id < 0) + { + LWLockRelease(newPartitionLock); + break; + } + + if ((batch->bufdescs[batch->n] = + PrepareOrRejectEagerFlushBuffer(strategy, + buf_id + 1, + &require, + newPartitionLock, &batch->max_lsn)) == NULL) break; } -- 2.43.0
