On Fri, Mar 8, 2024 at 4:56 PM Melanie Plageman <melanieplage...@gmail.com> wrote: > > On Sat, Mar 02, 2024 at 06:07:48PM -0500, Melanie Plageman wrote: > > On Wed, Feb 28, 2024 at 12:30 PM Melanie Plageman > > <melanieplage...@gmail.com> wrote: > > > > > > On Mon, Feb 26, 2024 at 03:56:57PM -0500, Melanie Plageman wrote: > > > > On Mon, Feb 19, 2024 at 6:05 PM Melanie Plageman > > > > <melanieplage...@gmail.com> wrote: > > > > > > > > > > On Mon, Jan 29, 2024 at 4:17 PM Melanie Plageman > > > > > <melanieplage...@gmail.com> wrote: > > > > > > > > > > > > There is an outstanding question about where to allocate the > > > > > > PgStreamingRead object for sequential scans > > > > > > > > > > I've written three alternative implementations of the actual streaming > > > > > read user for sequential scan which handle the question of where to > > > > > allocate the streaming read object and how to handle changing scan > > > > > direction in different ways. > > > > > > > > > > Option A) > > > > > https://github.com/melanieplageman/postgres/tree/seqscan_pgsr_initscan_allocation > > > > > - Allocates the streaming read object in initscan(). Since we do not > > > > > know the scan direction at this time, if the scan ends up not being a > > > > > forwards scan, the streaming read object must later be freed -- so > > > > > this will sometimes allocate a streaming read object it never uses. > > > > > - Only supports ForwardScanDirection and once the scan direction > > > > > changes, streaming is never supported again -- even if we return to > > > > > ForwardScanDirection > > > > > - Must maintain a "fallback" codepath that does not use the streaming > > > > > read API > > > > > > > > Attached is a version of this patch which implements a "reset" > > > > function for the streaming read API which should be cheaper than the > > > > full pg_streaming_read_free() on rescan. This can easily be ported to > > > > work on any of my proposed implementations (A/B/C). I implemented it > > > > on A as an example. > > > > > > Attached is the latest version of this patchset -- rebased in light of > > > Thomas' updatees to the streaming read API [1]. I have chosen the > > > approach I think we should go with. It is a hybrid of my previously > > > proposed approaches. > > > > While investigating some performance concerns, Andres pointed out that > > the members I add to HeapScanDescData in this patch push rs_cindex and > > rs_ntuples to another cacheline and introduce a 4-byte hole. Attached > > v4's HeapScanDescData is as well-packed as on master and its members > > are reordered so that rs_cindex and rs_ntuples are back on the second > > cacheline of the struct's data. > > I did some additional profiling and realized that dropping the > unlikely() from the places we check rs_inited frequently was negatively > impacting performance. v5 adds those back and also makes a few other > very minor cleanups. > > Note that this patch set has a not yet released version of Thomas > Munro's Streaming Read API with a new ramp-up logic which seems to fix a > performance issue I saw with my test case when all of the sequential > scan's blocks are in shared buffers. Once he sends the official new > version, I will rebase this and point to his explanation in that thread.
Attached v6 has the version of the streaming read API mentioned here [1]. This resolved the fully-in-shared-buffers regressions investigated in that thread by Andres, Bilal, and Thomas. The one outstanding item for the sequential scan streaming read user is deciding how the BAS_BULKREAD buffer access strategy should interact with the streaming read infrastructure. We discussed a bit off-list, and it seems clear that the ring must be at least as large as io_combine_limit. This should be no problem for BAS_BULKREAD because its ring is 16 MB. The question is whether or not we need to do anything right now to ensure there aren't adverse interactions between io_combine_limit, max_ios, and the buffer access strategy ring buffer size. - Melanie [1] https://www.postgresql.org/message-id/CA%2BhUKGJTwrS7F%3DuJPx3SeigMiQiW%2BLJaOkjGyZdCntwyMR%3DuAw%40mail.gmail.com
From bed26d391190f4411eccc4533d188e5dba6e8f72 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Sat, 27 Jan 2024 18:39:37 -0500 Subject: [PATCH v6 1/6] Split heapgetpage into two parts heapgetpage(), a per-block utility function used in heap scans, read a passed-in block into a buffer and then, if page-at-a-time processing was enabled, pruned the page and built an array of its visible tuples. This was used for sequential and sample scans. Future commits will add support for streaming reads. The streaming read API will read in the blocks specified by a callback, but any significant per-page processing should be done synchronously on the buffer yielded by the streaming read API. To support this, separate the logic in heapgetpage() to read a block into a buffer from that which prunes the page and builds an array of the visible tuples. The former is now heapfetchbuf() and the latter is now heapbuildvis(). Future commits will push the logic for selecting the next block into heapfetchbuf() in cases when streaming reads are not supported (such as backwards sequential scans). Because this logic differs for sample scans and sequential scans, inline the code to read the block into a buffer for sample scans. This has the added benefit of allowing for a bit of refactoring in heapam_scan_sample_next_block(), including unpinning the previous buffer before invoking the callback to select the next block. --- src/backend/access/heap/heapam.c | 74 ++++++++++++++---------- src/backend/access/heap/heapam_handler.c | 40 +++++++++---- src/include/access/heapam.h | 2 +- 3 files changed, 72 insertions(+), 44 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 2f6527df0dc..4291f161cf9 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -360,17 +360,18 @@ heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlk } /* - * heapgetpage - subroutine for heapgettup() + * heapbuildvis - Utility function for heap scans. * - * This routine reads and pins the specified page of the relation. - * In page-at-a-time mode it performs additional work, namely determining - * which tuples on the page are visible. + * Given a page residing in a buffer saved in the scan descriptor, prune the + * page and determine which of its tuples are all visible, saving their offsets + * in an array in the scan descriptor. */ void -heapgetpage(TableScanDesc sscan, BlockNumber block) +heapbuildvis(TableScanDesc sscan) { HeapScanDesc scan = (HeapScanDesc) sscan; - Buffer buffer; + Buffer buffer = scan->rs_cbuf; + BlockNumber block = scan->rs_cblock; Snapshot snapshot; Page page; int lines; @@ -378,31 +379,8 @@ heapgetpage(TableScanDesc sscan, BlockNumber block) OffsetNumber lineoff; bool all_visible; - Assert(block < scan->rs_nblocks); + Assert(BufferGetBlockNumber(buffer) == block); - /* release previous scan buffer, if any */ - if (BufferIsValid(scan->rs_cbuf)) - { - ReleaseBuffer(scan->rs_cbuf); - scan->rs_cbuf = InvalidBuffer; - } - - /* - * Be sure to check for interrupts at least once per page. Checks at - * higher code levels won't be able to stop a seqscan that encounters many - * pages' worth of consecutive dead tuples. - */ - CHECK_FOR_INTERRUPTS(); - - /* read page using selected strategy */ - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block, - RBM_NORMAL, scan->rs_strategy); - scan->rs_cblock = block; - - if (!(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)) - return; - - buffer = scan->rs_cbuf; snapshot = scan->rs_base.rs_snapshot; /* @@ -475,6 +453,37 @@ heapgetpage(TableScanDesc sscan, BlockNumber block) scan->rs_ntuples = ntup; } +/* + * heapfetchbuf - subroutine for heapgettup() + * + * This routine reads the specified block of the relation into a buffer and + * returns with that pinned buffer saved in the scan descriptor. + */ +static inline void +heapfetchbuf(HeapScanDesc scan, BlockNumber block) +{ + Assert(block < scan->rs_nblocks); + + /* release previous scan buffer, if any */ + if (BufferIsValid(scan->rs_cbuf)) + { + ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + } + + /* + * Be sure to check for interrupts at least once per page. Checks at + * higher code levels won't be able to stop a seqscan that encounters many + * pages' worth of consecutive dead tuples. + */ + CHECK_FOR_INTERRUPTS(); + + /* read page using selected strategy */ + scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block, + RBM_NORMAL, scan->rs_strategy); + scan->rs_cblock = block; +} + /* * heapgettup_initial_block - return the first BlockNumber to scan * @@ -748,7 +757,7 @@ heapgettup(HeapScanDesc scan, */ while (block != InvalidBlockNumber) { - heapgetpage((TableScanDesc) scan, block); + heapfetchbuf(scan, block); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_start_page(scan, dir, &linesleft, &lineoff); continue_page: @@ -869,7 +878,8 @@ heapgettup_pagemode(HeapScanDesc scan, */ while (block != InvalidBlockNumber) { - heapgetpage((TableScanDesc) scan, block); + heapfetchbuf(scan, block); + heapbuildvis((TableScanDesc) scan); page = BufferGetPage(scan->rs_cbuf); linesleft = scan->rs_ntuples; lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1; diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 6abfe36dec7..8784473b462 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2336,11 +2336,14 @@ heapam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate) if (hscan->rs_nblocks == 0) return false; - if (tsm->NextSampleBlock) + if (BufferIsValid(hscan->rs_cbuf)) { - blockno = tsm->NextSampleBlock(scanstate, hscan->rs_nblocks); - hscan->rs_cblock = blockno; + ReleaseBuffer(hscan->rs_cbuf); + hscan->rs_cbuf = InvalidBuffer; } + + if (tsm->NextSampleBlock) + blockno = tsm->NextSampleBlock(scanstate, hscan->rs_nblocks); else { /* scanning table sequentially */ @@ -2382,20 +2385,35 @@ heapam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate) } } - if (!BlockNumberIsValid(blockno)) + hscan->rs_cblock = blockno; + + if (!BlockNumberIsValid(hscan->rs_cblock)) { - if (BufferIsValid(hscan->rs_cbuf)) - ReleaseBuffer(hscan->rs_cbuf); - hscan->rs_cbuf = InvalidBuffer; - hscan->rs_cblock = InvalidBlockNumber; hscan->rs_inited = false; - return false; } - heapgetpage(scan, blockno); - hscan->rs_inited = true; + Assert(hscan->rs_cblock < hscan->rs_nblocks); + + /* + * We may scan multiple pages before finding tuples to yield or finishing + * the scan. Since we want to check for interrupts at least once per page, + * do so here. + */ + CHECK_FOR_INTERRUPTS(); + + /* Read page using selected strategy */ + hscan->rs_cbuf = ReadBufferExtended(hscan->rs_base.rs_rd, MAIN_FORKNUM, + hscan->rs_cblock, RBM_NORMAL, hscan->rs_strategy); + /* + * If pagemode is allowed, prune the page and build an array of visible + * tuple offsets. + */ + if (hscan->rs_base.rs_flags & SO_ALLOW_PAGEMODE) + heapbuildvis(scan); + + hscan->rs_inited = true; return true; } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index f1122453738..bacef18c0d3 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -254,7 +254,7 @@ extern TableScanDesc heap_beginscan(Relation relation, Snapshot snapshot, uint32 flags); extern void heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks); -extern void heapgetpage(TableScanDesc sscan, BlockNumber block); +extern void heapbuildvis(TableScanDesc sscan); extern void heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, bool allow_strat, bool allow_sync, bool allow_pagemode); extern void heap_endscan(TableScanDesc sscan); -- 2.40.1
From 7e021d9f21292a939c3c3485ac44e17e0995127f Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Mon, 29 Jan 2024 11:50:01 -0500 Subject: [PATCH v6 2/6] Replace blocks with buffers in heapgettup control flow Future commits will introduce the streaming read API and the sequential scan streaming read API user. Streaming read API users implement a callback which returns the next block to read. Sequential scans previously looped through the blocks in the relation, synchronously reading in a block and then processing it. An InvalidBlockNumber returned by heapgettup_advance_block() meant that the relation was exhausted and all blocks had been processed. The streaming read API may exhaust the blocks in a relation (having read all of them into buffers) before they have all been processed by the sequential scan. As such, the sequential scan should continue processing blocks until heapfetchbuf() returns InvalidBuffer. Note that this commit does not implement the streaming read API user. It simply restructures heapgettup() and heapgettup_pagemode() to use buffers instead of blocks for control flow. Not all sequential scans will support streaming reads. As such, this code will remain for compatability even after sequential scans support streaming reads. --- src/backend/access/heap/heapam.c | 79 ++++++++++++++------------------ 1 file changed, 35 insertions(+), 44 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 4291f161cf9..c814aea9407 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -83,6 +83,9 @@ static Bitmapset *HeapDetermineColumnsInfo(Relation relation, static bool heap_acquire_tuplock(Relation relation, ItemPointer tid, LockTupleMode mode, LockWaitPolicy wait_policy, bool *have_tuple_lock); +static inline BlockNumber heapgettup_advance_block(HeapScanDesc scan, + BlockNumber block, ScanDirection dir); +static inline BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir); static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 old_infomask2, TransactionId add_to_xmax, LockTupleMode mode, bool is_update, @@ -456,14 +459,12 @@ heapbuildvis(TableScanDesc sscan) /* * heapfetchbuf - subroutine for heapgettup() * - * This routine reads the specified block of the relation into a buffer and - * returns with that pinned buffer saved in the scan descriptor. + * This routine reads the next block of the relation into a buffer and returns + * with that pinned buffer saved in the scan descriptor. */ static inline void -heapfetchbuf(HeapScanDesc scan, BlockNumber block) +heapfetchbuf(HeapScanDesc scan, ScanDirection dir) { - Assert(block < scan->rs_nblocks); - /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -478,10 +479,19 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block) */ CHECK_FOR_INTERRUPTS(); - /* read page using selected strategy */ - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block, - RBM_NORMAL, scan->rs_strategy); - scan->rs_cblock = block; + if (unlikely(!scan->rs_inited)) + { + scan->rs_cblock = heapgettup_initial_block(scan, dir); + Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); + scan->rs_inited = true; + } + else + scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir); + + /* read block if valid */ + if (BlockNumberIsValid(scan->rs_cblock)) + scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, + scan->rs_cblock, RBM_NORMAL, scan->rs_strategy); } /* @@ -491,7 +501,7 @@ heapfetchbuf(HeapScanDesc scan, BlockNumber block) * occur with empty tables and in parallel scans when parallel workers get all * of the pages before we can get a chance to get our first page. */ -static BlockNumber +BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) { Assert(!scan->rs_inited); @@ -631,7 +641,7 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft, * This also adjusts rs_numblocks when a limit has been imposed by * heap_setscanlimits(). */ -static inline BlockNumber +BlockNumber heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir) { if (ScanDirectionIsForward(dir)) @@ -729,23 +739,13 @@ heapgettup(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - BlockNumber block; Page page; OffsetNumber lineoff; int linesleft; - if (unlikely(!scan->rs_inited)) - { - block = heapgettup_initial_block(scan, dir); - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; - } - else + if (likely(scan->rs_inited)) { /* continue from previously returned page/tuple */ - block = scan->rs_cblock; - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff); goto continue_page; @@ -755,9 +755,12 @@ heapgettup(HeapScanDesc scan, * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - while (block != InvalidBlockNumber) + while (true) { - heapfetchbuf(scan, block); + heapfetchbuf(scan, dir); + if (!BufferIsValid(scan->rs_cbuf)) + break; + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_start_page(scan, dir, &linesleft, &lineoff); continue_page: @@ -779,7 +782,7 @@ continue_page: tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), block, lineoff); + ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff); visible = HeapTupleSatisfiesVisibility(tuple, scan->rs_base.rs_snapshot, @@ -809,9 +812,6 @@ continue_page: * it's time to move to the next. */ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); - - /* get the BlockNumber to scan next */ - block = heapgettup_advance_block(scan, block, dir); } /* end of scan */ @@ -844,22 +844,13 @@ heapgettup_pagemode(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - BlockNumber block; Page page; int lineindex; int linesleft; - if (unlikely(!scan->rs_inited)) - { - block = heapgettup_initial_block(scan, dir); - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; - } - else + if (likely(scan->rs_inited)) { /* continue from previously returned page/tuple */ - block = scan->rs_cblock; /* current page */ page = BufferGetPage(scan->rs_cbuf); lineindex = scan->rs_cindex + dir; @@ -876,9 +867,12 @@ heapgettup_pagemode(HeapScanDesc scan, * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - while (block != InvalidBlockNumber) + while (true) { - heapfetchbuf(scan, block); + heapfetchbuf(scan, dir); + if (!BufferIsValid(scan->rs_cbuf)) + break; + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); heapbuildvis((TableScanDesc) scan); page = BufferGetPage(scan->rs_cbuf); linesleft = scan->rs_ntuples; @@ -898,7 +892,7 @@ continue_page: tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), block, lineoff); + ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff); /* skip any tuples that don't match the scan key */ if (key != NULL && @@ -909,9 +903,6 @@ continue_page: scan->rs_cindex = lineindex; return; } - - /* get the BlockNumber to scan next */ - block = heapgettup_advance_block(scan, block, dir); } /* end of scan */ -- 2.40.1
From 78a9008b50d5398cce23cac695da884c41d0d720 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Tue, 27 Feb 2024 00:01:42 +1300 Subject: [PATCH v6 4/6] Provide API for streaming relation data. Introduce an abstraction where relation data can be accessed as a stream of buffers, with an implementation that is more efficient than the equivalent sequence of ReadBuffer() calls. Client code supplies a callback that can say which block number is wanted next, and then consumes individual buffers one at a time from the stream. This division allows read_stream.c to build up large calls to StartReadBuffers() up to io_combine_limit, and issue fadvise() advice ahead of time in a systematic way when random access is detected. This API is based on an idea from Andres Freund to pave the way for asynchronous I/O in future work as required to support direct I/O. The goal is to have an abstraction that insulates client code from future changes to the I/O subsystem. An extended API may be necessary in future for more complicated cases (for example recovery, whose LsnReadQueue device in xlogprefetcher.c is a distant cousin of this code that should eventually be replaced by it), but this basic API is sufficient for many common usage patterns involving predictable access to a single relation fork. Author: Thomas Munro <thomas.mu...@gmail.com> Author: Heikki Linnakangas <hlinn...@iki.fi> (contributions) Suggested-by: Andres Freund <and...@anarazel.de> Reviewed-by: Heikki Linnakangas <hlinn...@iki.fi> Reviewed-by: Melanie Plageman <melanieplage...@gmail.com> Reviewed-by: Nazir Bilal Yavuz <byavu...@gmail.com> Reviewed-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6ut5tum2g...@mail.gmail.com --- src/backend/storage/Makefile | 2 +- src/backend/storage/aio/Makefile | 14 + src/backend/storage/aio/meson.build | 5 + src/backend/storage/aio/read_stream.c | 733 ++++++++++++++++++++++++++ src/backend/storage/buffer/bufmgr.c | 21 +- src/backend/storage/meson.build | 1 + src/include/storage/read_stream.h | 62 +++ src/tools/pgindent/typedefs.list | 2 + 8 files changed, 829 insertions(+), 11 deletions(-) create mode 100644 src/backend/storage/aio/Makefile create mode 100644 src/backend/storage/aio/meson.build create mode 100644 src/backend/storage/aio/read_stream.c create mode 100644 src/include/storage/read_stream.h diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile index 8376cdfca20..eec03f6f2b4 100644 --- a/src/backend/storage/Makefile +++ b/src/backend/storage/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/storage top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync +SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile new file mode 100644 index 00000000000..2f29a9ec4d1 --- /dev/null +++ b/src/backend/storage/aio/Makefile @@ -0,0 +1,14 @@ +# +# Makefile for storage/aio +# +# src/backend/storage/aio/Makefile +# + +subdir = src/backend/storage/aio +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + read_stream.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build new file mode 100644 index 00000000000..10e1aa3b20b --- /dev/null +++ b/src/backend/storage/aio/meson.build @@ -0,0 +1,5 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +backend_sources += files( + 'read_stream.c', +) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c new file mode 100644 index 00000000000..4e293e0df65 --- /dev/null +++ b/src/backend/storage/aio/read_stream.c @@ -0,0 +1,733 @@ +/*------------------------------------------------------------------------- + * + * read_stream.c + * Mechanism for accessing buffered relation data with look-ahead + * + * Code that needs to access relation data typically pins blocks one at a + * time, often in a predictable order that might be sequential or data-driven. + * Calling the simple ReadBuffer() function for each block is inefficient, + * because blocks that are not yet in the buffer pool require I/O operations + * that are small and might stall waiting for storage. This mechanism looks + * into the future and calls StartReadBuffers() and WaitReadBuffers() to read + * neighboring blocks together and ahead of time, with an adaptive look-ahead + * distance. + * + * A user-provided callback generates a stream of block numbers that is used + * to form reads of up to io_combine_limit, by attempting to merge them with a + * pending read. When that isn't possible, the existing pending read is sent + * to StartReadBuffers() so that a new one can begin to form. + * + * The algorithm for controlling the look-ahead distance tries to classify the + * stream into three ideal behaviors: + * + * A) No I/O is necessary, because the requested blocks are fully cached + * already. There is no benefit to looking ahead more than one block, so + * distance is 1. This is the default initial assumption. + * + * B) I/O is necessary, but fadvise is undesirable because the access is + * sequential, or impossible because direct I/O is enabled or the system + * doesn't support advice. There is no benefit in looking ahead more than + * io_combine_limit, because in this case only goal is larger read system + * calls. Looking further ahead would pin many buffers and perform + * speculative work looking ahead for no benefit. + * + * C) I/O is necesssary, it appears random, and this system supports fadvise. + * We'll look further ahead in order to reach the configured level of I/O + * concurrency. + * + * The distance increases rapidly and decays slowly, so that it moves towards + * those levels as different I/O patterns are discovered. For example, a + * sequential scan of fully cached data doesn't bother looking ahead, but a + * sequential scan that hits a region of uncached blocks will start issuing + * increasingly wide read calls until it plateaus at io_combine_limit. + * + * The main data structure is a circular queue of buffers of size + * max_pinned_buffers plus some extra space for technical reasons, ready to be + * returned by read_stream_next_buffer(). Each buffer also has an optional + * variable sized object that is passed from the callback to the consumer of + * buffers. + * + * Parallel to the queue of buffers, there is a circular queue of in-progress + * I/Os that have been started with StartReadBuffers(), and for which + * WaitReadBuffers() must be called before returning the buffer. + * + * For example, if the callback return block numbers 10, 42, 43, 60 in + * successive calls, then these data structures might appear as follows: + * + * buffers buf/data ios + * + * +----+ +-----+ +--------+ + * | | | | +----+ 42..44 | <- oldest_io_index + * +----+ +-----+ | +--------+ + * oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 | + * +----+ +-----+ | | +--------+ + * | 42 | | ? |<-+ | | | <- next_io_index + * +----+ +-----+ | +--------+ + * | 43 | | ? | | | | + * +----+ +-----+ | +--------+ + * | 44 | | ? | | | | + * +----+ +-----+ | +--------+ + * | 60 | | ? |<---+ + * +----+ +-----+ + * next_buffer_index -> | | | | + * +----+ +-----+ + * + * In the example, 5 buffers are pinned, and the next buffer to be streamed to + * the client is block 10. Block 10 was a hit and has no associated I/O, but + * the range 42..44 requires an I/O wait before its buffers are returned, as + * does block 60. + * + * + * Portions Copyright (c) 2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/aio/read_stream.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/pg_tablespace.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/smgr.h" +#include "storage/read_stream.h" +#include "utils/memdebug.h" +#include "utils/rel.h" +#include "utils/spccache.h" + +typedef struct InProgressIO +{ + int16 buffer_index; + ReadBuffersOperation op; +} InProgressIO; + +/* + * State for managing a stream of reads. + */ +struct ReadStream +{ + int16 max_ios; + int16 ios_in_progress; + int16 queue_size; + int16 max_pinned_buffers; + int16 pinned_buffers; + int16 distance; + bool advice_enabled; + + /* + * Sometimes we need to be able to 'unget' a block number to resolve a + * flow control problem when I/Os are split. + */ + BlockNumber unget_blocknum; + bool have_unget_blocknum; + + /* + * The callback that will tell us which block numbers to read, and an + * opaque pointer that will be pass to it for its own purposes. + */ + ReadStreamBlockNumberCB callback; + void *callback_private_data; + + /* Next expected block, for detecting sequential access. */ + BlockNumber seq_blocknum; + + /* The read operation we are currently preparing. */ + BlockNumber pending_read_blocknum; + int16 pending_read_nblocks; + + /* Space for buffers and optional per-buffer private data. */ + size_t per_buffer_data_size; + void *per_buffer_data; + + /* Read operations that have been started but not waited for yet. */ + InProgressIO *ios; + int16 oldest_io_index; + int16 next_io_index; + + /* Circular queue of buffers. */ + int16 oldest_buffer_index; /* Next pinned buffer to return */ + int16 next_buffer_index; /* Index of next buffer to pin */ + Buffer buffers[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * Return a pointer to the per-buffer data by index. + */ +static inline void * +get_per_buffer_data(ReadStream *stream, int16 buffer_index) +{ + return (char *) stream->per_buffer_data + + stream->per_buffer_data_size * buffer_index; +} + +/* + * Ask the callback which block it would like us to read next, with a small + * buffer in front to allow streaming_unget_block() to work. + */ +static inline BlockNumber +read_stream_get_block(ReadStream *stream, void *per_buffer_data) +{ + if (!stream->have_unget_blocknum) + return stream->callback(stream, + stream->callback_private_data, + per_buffer_data); + + /* + * You can only unget one block, and next_buffer_index can't change across + * a get, unget, get sequence, so the callback's per_buffer_data, if any, + * is still present in the correct slot. We just have to return the + * previous block number. + */ + stream->have_unget_blocknum = false; + return stream->unget_blocknum; +} + +/* + * In order to deal with short reads in StartReadBuffers(), we sometimes need + * to defer handling of a block until later. + */ +static inline void +read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) +{ + Assert(!stream->have_unget_blocknum); + stream->have_unget_blocknum = true; + stream->unget_blocknum = blocknum; +} + +static void +read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) +{ + bool need_wait; + int nblocks; + int flags; + int16 io_index; + int16 overflow; + int16 buffer_index; + + /* This should only be called with a pending read. */ + Assert(stream->pending_read_nblocks > 0); + Assert(stream->pending_read_nblocks <= io_combine_limit); + + /* We had better not exceed the pin limit by starting this read. */ + Assert(stream->pinned_buffers + stream->pending_read_nblocks <= + stream->max_pinned_buffers); + + /* We had better not be overwriting an existing pinned buffer. */ + if (stream->pinned_buffers > 0) + Assert(stream->next_buffer_index != stream->oldest_buffer_index); + else + Assert(stream->next_buffer_index == stream->oldest_buffer_index); + + /* + * If advice hasn't been suppressed, this system supports it, and this + * isn't a strictly sequential pattern, then we'll issue advice. + */ + if (!suppress_advice && + stream->advice_enabled && + stream->pending_read_blocknum != stream->seq_blocknum) + flags = READ_BUFFERS_ISSUE_ADVICE; + else + flags = 0; + + /* We say how many blocks we want to read, but may be smaller on return. */ + buffer_index = stream->next_buffer_index; + io_index = stream->next_io_index; + nblocks = stream->pending_read_nblocks; + need_wait = StartReadBuffers(&stream->ios[io_index].op, + &stream->buffers[buffer_index], + stream->pending_read_blocknum, + &nblocks, + flags); + stream->pinned_buffers += nblocks; + + /* Remember whether we need to wait before returning this buffer. */ + if (!need_wait) + { + /* Look-ahead distance decays, no I/O necessary (behavior A). */ + if (stream->distance > 1) + stream->distance--; + } + else + { + /* + * Remember to call WaitReadBuffers() before returning head buffer. + * Look-ahead distance will be adjusted after waiting. + */ + stream->ios[io_index].buffer_index = buffer_index; + if (++stream->next_io_index == stream->max_ios) + stream->next_io_index = 0; + Assert(stream->ios_in_progress < stream->max_ios); + stream->ios_in_progress++; + stream->seq_blocknum = stream->pending_read_blocknum + nblocks; + } + + /* + * We gave a contiguous range of buffer space to StartReadBuffers(), but + * we want it to wrap around at queue_size. Slide overflowing buffers to + * the front of the array. + */ + overflow = (buffer_index + nblocks) - stream->queue_size; + if (overflow > 0) + memmove(&stream->buffers[0], + &stream->buffers[stream->queue_size], + sizeof(stream->buffers[0]) * overflow); + + /* Compute location of start of next read, without using % operator. */ + buffer_index += nblocks; + if (buffer_index >= stream->queue_size) + buffer_index -= stream->queue_size; + Assert(buffer_index >= 0 && buffer_index < stream->queue_size); + stream->next_buffer_index = buffer_index; + + /* Adjust the pending read to cover the remaining portion, if any. */ + stream->pending_read_blocknum += nblocks; + stream->pending_read_nblocks -= nblocks; +} + +static void +read_stream_look_ahead(ReadStream *stream, bool suppress_advice) +{ + while (stream->ios_in_progress < stream->max_ios && + stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) + { + BlockNumber blocknum; + int16 buffer_index; + void *per_buffer_data; + + if (stream->pending_read_nblocks == io_combine_limit) + { + read_stream_start_pending_read(stream, suppress_advice); + suppress_advice = false; + continue; + } + + /* + * See which block the callback wants next in the stream. We need to + * compute the index of the Nth block of the pending read including + * wrap-around, but we don't want to use the expensive % operator. + */ + buffer_index = stream->next_buffer_index + stream->pending_read_nblocks; + if (buffer_index >= stream->queue_size) + buffer_index -= stream->queue_size; + Assert(buffer_index >= 0 && buffer_index < stream->queue_size); + per_buffer_data = get_per_buffer_data(stream, buffer_index); + blocknum = read_stream_get_block(stream, per_buffer_data); + if (blocknum == InvalidBlockNumber) + { + stream->distance = 0; + break; + } + + /* Can we merge it with the pending read? */ + if (stream->pending_read_nblocks > 0 && + stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum) + { + stream->pending_read_nblocks++; + continue; + } + + /* We have to start the pending read before we can build another. */ + if (stream->pending_read_nblocks > 0) + { + read_stream_start_pending_read(stream, suppress_advice); + suppress_advice = false; + if (stream->ios_in_progress == stream->max_ios) + { + /* And we've hit the limit. Rewind, and stop here. */ + read_stream_unget_block(stream, blocknum); + return; + } + } + + /* This is the start of a new pending read. */ + stream->pending_read_blocknum = blocknum; + stream->pending_read_nblocks = 1; + } + + /* + * Normally we don't start the pending read just because we've hit a + * limit, preferring to give it another chance to grow to a larger size + * once more buffers have been consumed. However, in cases where that + * can't possibly happen, we might as well start the read immediately. + */ + if (stream->pending_read_nblocks > 0 && + (stream->distance == stream->pending_read_nblocks || + stream->distance == 0) && + stream->ios_in_progress < stream->max_ios) + read_stream_start_pending_read(stream, suppress_advice); +} + +/* + * Create a new streaming read object that can be used to perform the + * equivalent of a series of ReadBuffer() calls for one fork of one relation. + * Internally, it generates larger vectored reads where possible by looking + * ahead. The callback should return block numbers or InvalidBlockNumber to + * signal end-of-stream, and if per_buffer_data_size is non-zero, it may also + * write extra data for each block into the space provided to it. It will + * also receive callback_private_data for its own purposes. + */ +ReadStream * +read_stream_begin_relation(int flags, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + ReadStreamBlockNumberCB callback, + void *callback_private_data, + size_t per_buffer_data_size) +{ + ReadStream *stream; + size_t size; + int16 queue_size; + int16 max_ios; + uint32 max_pinned_buffers; + Oid tablespace_id; + + /* Make sure our bmr's smgr and persistent are populated. */ + if (bmr.smgr == NULL) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; + } + + /* + * Decide how many I/Os we will allow to run at the same time. That + * currently means advice to the kernel to tell it that we will soon read. + * This number also affects how far we look ahead for opportunities to + * start more I/Os. + */ + tablespace_id = bmr.smgr->smgr_rlocator.locator.spcOid; + if (!OidIsValid(MyDatabaseId) || + (bmr.rel && IsCatalogRelation(bmr.rel)) || + IsCatalogRelationOid(bmr.smgr->smgr_rlocator.locator.relNumber)) + { + /* + * Avoid circularity while trying to look up tablespace settings or + * before spccache.c is ready. + */ + max_ios = effective_io_concurrency; + } + else if (flags & READ_STREAM_MAINTENANCE) + max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id); + else + max_ios = get_tablespace_io_concurrency(tablespace_id); + max_ios = Min(max_ios, PG_INT16_MAX); + + /* + * Choose the maximum number of buffers we're prepared to pin. We try to + * pin fewer if we can, though. We clamp it to at least io_combine_limit + * so that we can have a chance to build up a full io_combine_limit sized + * read, even when max_ios is zero. Be careful not to allow int16 to + * overflow (even though that's not possible with the current GUC range + * limits), allowing also for the spare entry and the overflow space. + */ + max_pinned_buffers = Max(max_ios * 4, io_combine_limit); + max_pinned_buffers = Min(max_pinned_buffers, + PG_INT16_MAX - io_combine_limit - 1); + + /* Don't allow this backend to pin more than its share of buffers. */ + if (SmgrIsTemp(bmr.smgr)) + LimitAdditionalLocalPins(&max_pinned_buffers); + else + LimitAdditionalPins(&max_pinned_buffers); + Assert(max_pinned_buffers > 0); + + /* + * We need one extra entry for buffers and per-buffer data, because users + * of per-buffer data have access to the object until the next call to + * read_stream_next_buffer(), so we need a gap between the head and tail + * of the queue so that we don't clobber it. + */ + queue_size = max_pinned_buffers + 1; + + /* + * Allocate the object, the buffers, the ios and per_data_data space in + * one big chunk. Though we have queue_size buffers, we want to be able + * to assume that all the buffers for a single read are contiguous (i.e. + * don't wrap around halfway through), so we allow temporary overflows of + * up to the maximum possible read size by allocating an extra + * io_combine_limit - 1 elements. + */ + size = offsetof(ReadStream, buffers); + size += sizeof(Buffer) * (queue_size + io_combine_limit - 1); + size += sizeof(InProgressIO) * Max(1, max_ios); + size += per_buffer_data_size * queue_size; + size += MAXIMUM_ALIGNOF * 2; + stream = (ReadStream *) palloc(size); + memset(stream, 0, offsetof(ReadStream, buffers)); + stream->ios = (InProgressIO *) + MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]); + if (per_buffer_data_size > 0) + stream->per_buffer_data = (void *) + MAXALIGN(&stream->ios[Max(1, max_ios)]); + +#ifdef USE_PREFETCH + + /* + * This system supports prefetching advice. We can use it as long as + * direct I/O isn't enabled, the caller hasn't promised sequential access + * (overriding our detection heuristics), and max_ios hasn't been set to + * zero. + */ + if ((io_direct_flags & IO_DIRECT_DATA) == 0 && + (flags & READ_STREAM_SEQUENTIAL) == 0 && + max_ios > 0) + stream->advice_enabled = true; +#endif + + /* + * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled + * above. If we had real asynchronous I/O we might need a slightly + * different definition. + */ + if (max_ios == 0) + max_ios = 1; + + stream->max_ios = max_ios; + stream->per_buffer_data_size = per_buffer_data_size; + stream->max_pinned_buffers = max_pinned_buffers; + stream->queue_size = queue_size; + + if (!bmr.smgr) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; + } + stream->callback = callback; + stream->callback_private_data = callback_private_data; + + /* + * Skip the initial ramp-up phase if the caller says we're going to be + * reading the whole relation. This way we start out assuming we'll be + * doing full io_combine_limit sized reads (behavior B). + */ + if (flags & READ_STREAM_FULL) + stream->distance = Min(max_pinned_buffers, io_combine_limit); + else + stream->distance = 1; + + /* + * Since we always currently always access the same relation, we can + * initialize parts of the ReadBuffersOperation objects and leave them + * that way, to avoid wasting CPU cycles writing to them for each read. + */ + for (int i = 0; i < max_ios; ++i) + { + stream->ios[i].op.bmr = bmr; + stream->ios[i].op.forknum = forknum; + stream->ios[i].op.strategy = strategy; + } + + return stream; +} + +/* + * Pull one pinned buffer out of a stream created with + * read_stream_begin_buffered(). Each call returns successive blocks in the + * order specified by the callback. If per_buffer_data_size was set to a + * non-zero size, *per_buffer_data receives a pointer to the extra per-buffer + * data that the callback had a chance to populate, which remains valid until + * the next call to read_stream_next_buffer(). When the stream runs out of + * data, InvalidBuffer is returned. The caller may decide to end the stream + * early at any time by calling read_stream_end_buffered(). + */ +Buffer +read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) +{ + Buffer buffer; + int16 oldest_buffer_index; + + /* + * A fast path for all-cached scans (behavior A). This is the same as the + * usual algorithm, but it is specialized for no I/O and no per-buffer + * data, so we can skip the queue management code, stay in the same buffer + * slot and use singular StartReadBuffer(). + */ + if (likely(per_buffer_data == NULL && + stream->ios_in_progress == 0 && + stream->pinned_buffers == 1 && + stream->distance == 1)) + { + BlockNumber next_blocknum; + + /* + * We have a pinned buffer that we need to serve up, but we also want + * to probe the next one before we return, just in case we need to + * start an I/O. We can re-use the same buffer slot, and an arbitrary + * I/O slot since they're all free. + */ + oldest_buffer_index = stream->oldest_buffer_index; + Assert((oldest_buffer_index + 1) % stream->queue_size == + stream->next_buffer_index); + buffer = stream->buffers[oldest_buffer_index]; + Assert(buffer != InvalidBuffer); + Assert(stream->pending_read_nblocks <= 1); + if (unlikely(stream->pending_read_nblocks == 1)) + { + next_blocknum = stream->pending_read_blocknum; + stream->pending_read_nblocks = 0; + } + else + next_blocknum = read_stream_get_block(stream, NULL); + if (unlikely(next_blocknum == InvalidBlockNumber)) + { + /* End of stream. */ + stream->distance = 0; + stream->next_buffer_index = oldest_buffer_index; + /* Pin transferred to caller. */ + stream->pinned_buffers = 0; + return buffer; + } + /* Call the special single block version, which is marginally faster. */ + if (unlikely(StartReadBuffer(&stream->ios[0].op, + &stream->buffers[oldest_buffer_index], + next_blocknum, + stream->advice_enabled ? + READ_BUFFERS_ISSUE_ADVICE : 0))) + { + /* I/O needed. We'll take the general path next time. */ + stream->oldest_io_index = 0; + stream->next_io_index = stream->max_ios > 1 ? 1 : 0; + stream->ios_in_progress = 1; + stream->ios[0].buffer_index = oldest_buffer_index; + stream->seq_blocknum = next_blocknum + 1; + /* Increase look ahead distance (move towards behavior B/C). */ + stream->distance = Min(2, stream->max_pinned_buffers); + } + /* Pin transferred to caller, got another one, no net change. */ + Assert(stream->pinned_buffers == 1); + return buffer; + } + + if (stream->pinned_buffers == 0) + { + Assert(stream->oldest_buffer_index == stream->next_buffer_index); + + /* End of stream reached? */ + if (stream->distance == 0) + return InvalidBuffer; + + /* + * The usual order of operations is that we look ahead at the bottom + * of this function after potentially finishing an I/O and making + * space for more, but if we're just starting up we'll need to crank + * the handle to get started. + */ + read_stream_look_ahead(stream, true); + + /* End of stream reached? */ + if (stream->pinned_buffers == 0) + { + Assert(stream->distance == 0); + return InvalidBuffer; + } + } + + /* Grab the oldest pinned buffer and associated per-buffer data. */ + Assert(stream->pinned_buffers > 0); + oldest_buffer_index = stream->oldest_buffer_index; + Assert(oldest_buffer_index >= 0 && + oldest_buffer_index < stream->queue_size); + buffer = stream->buffers[oldest_buffer_index]; + if (per_buffer_data) + *per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index); + + Assert(BufferIsValid(buffer)); + + /* Do we have to wait for an associated I/O first? */ + if (stream->ios_in_progress > 0 && + stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) + { + int16 io_index = stream->oldest_io_index; + int16 distance; + + /* Sanity check that we still agree on the buffers. */ + Assert(stream->ios[io_index].op.buffers == + &stream->buffers[oldest_buffer_index]); + + WaitReadBuffers(&stream->ios[io_index].op); + + Assert(stream->ios_in_progress > 0); + stream->ios_in_progress--; + if (++stream->oldest_io_index == stream->max_ios) + stream->oldest_io_index = 0; + + if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE) + { + /* Distance ramps up fast (behavior C). */ + distance = stream->distance * 2; + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; + } + else + { + /* No advice; move towards io_combine_limit (behavior B). */ + if (stream->distance > io_combine_limit) + { + stream->distance--; + } + else + { + distance = stream->distance * 2; + distance = Min(distance, io_combine_limit); + distance = Min(distance, stream->max_pinned_buffers); + stream->distance = distance; + } + } + } + +#ifdef CLOBBER_FREED_MEMORY + /* Clobber old buffer and per-buffer data for debugging purposes. */ + stream->buffers[oldest_buffer_index] = InvalidBuffer; + + /* + * The caller will get access to the per-buffer data, until the next call. + * We wipe the one before, which is never occupied because queue_size + * allowed one extra element. This will hopefully trip up client code + * that is holding a dangling pointer to it. + */ + if (stream->per_buffer_data) + wipe_mem(get_per_buffer_data(stream, + oldest_buffer_index == 0 ? + stream->queue_size - 1 : + oldest_buffer_index - 1), + stream->per_buffer_data_size); +#endif + + /* Pin transferred to caller. */ + Assert(stream->pinned_buffers > 0); + stream->pinned_buffers--; + + /* Advance oldest buffer, with wrap-around. */ + stream->oldest_buffer_index++; + if (stream->oldest_buffer_index == stream->queue_size) + stream->oldest_buffer_index = 0; + + /* Prepare for the next call. */ + read_stream_look_ahead(stream, false); + + return buffer; +} + +/* + * Release stream resources. + */ +void +read_stream_end(ReadStream *stream) +{ + Buffer buffer; + + /* Stop looking ahead. */ + stream->distance = 0; + + /* Unpin anything that wasn't consumed. */ + while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) + ReleaseBuffer(buffer); + + Assert(stream->pinned_buffers == 0); + Assert(stream->ios_in_progress == 0); + + /* Release memory. */ + pfree(stream); +} diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 8df5f3b43da..577bcf6e5dd 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1210,13 +1210,14 @@ StartReadBuffer(ReadBuffersOperation *operation, * Begin reading a range of blocks beginning at blockNum and extending for * *nblocks. On return, up to *nblocks pinned buffers holding those blocks * are written into the buffers array, and *nblocks is updated to contain the - * actual number, which may be fewer than requested. + * actual number, which may be fewer than requested. Caller sets some of the + * members of operation; see struct definition. * - * If false is returned, no I/O is necessary and WaitReadBuffers() is not - * necessary. If true is returned, one I/O has been started, and - * WaitReadBuffers() must be called with the same operation object before the - * buffers are accessed. Along with the operation object, the caller-supplied - * array of buffers must remain valid until WaitReadBuffers() is called. + * If false is returned, no I/O is necessary. If true is returned, one I/O + * has been started, and WaitReadBuffers() must be called with the same + * operation object before the buffers are accessed. Along with the operation + * object, the caller-supplied array of buffers must remain valid until + * WaitReadBuffers() is called. * * Currently the I/O is only started with optional operating system advice, * and the real I/O happens in WaitReadBuffers(). In future work, true I/O @@ -2452,7 +2453,7 @@ MarkBufferDirty(Buffer buffer) uint32 old_buf_state; if (!BufferIsValid(buffer)) - elog(ERROR, "bad buffer ID: %d", buffer); + elog(PANIC, "bad buffer ID: %d", buffer); if (BufferIsLocal(buffer)) { @@ -4824,7 +4825,7 @@ void ReleaseBuffer(Buffer buffer) { if (!BufferIsValid(buffer)) - elog(ERROR, "bad buffer ID: %d", buffer); + elog(PANIC, "bad buffer ID: %d", buffer); if (BufferIsLocal(buffer)) UnpinLocalBuffer(buffer); @@ -4891,7 +4892,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) Page page = BufferGetPage(buffer); if (!BufferIsValid(buffer)) - elog(ERROR, "bad buffer ID: %d", buffer); + elog(PANIC, "bad buffer ID: %d", buffer); if (BufferIsLocal(buffer)) { @@ -5963,7 +5964,7 @@ ResOwnerReleaseBufferPin(Datum res) /* Like ReleaseBuffer, but don't call ResourceOwnerForgetBuffer */ if (!BufferIsValid(buffer)) - elog(ERROR, "bad buffer ID: %d", buffer); + elog(PANIC, "bad buffer ID: %d", buffer); if (BufferIsLocal(buffer)) UnpinLocalBufferNoOwner(buffer); diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build index 40345bdca27..739d13293fb 100644 --- a/src/backend/storage/meson.build +++ b/src/backend/storage/meson.build @@ -1,5 +1,6 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group +subdir('aio') subdir('buffer') subdir('file') subdir('freespace') diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h new file mode 100644 index 00000000000..9e5fa2acf15 --- /dev/null +++ b/src/include/storage/read_stream.h @@ -0,0 +1,62 @@ +/*------------------------------------------------------------------------- + * + * read_stream.h + * Mechanism for accessing buffered relation data with look-ahead + * + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/read_stream.h + * + *------------------------------------------------------------------------- + */ +#ifndef READ_STREAM_H +#define READ_STREAM_H + +#include "storage/bufmgr.h" + +/* Default tuning, reasonable for many users. */ +#define READ_STREAM_DEFAULT 0x00 + +/* + * I/O streams that are performing maintenance work on behalf of potentially + * many users, and thus should be governed by maintenance_io_concurrency + * instead of effective_io_concurrency. For example, VACUUM or CREATE INDEX. + */ +#define READ_STREAM_MAINTENANCE 0x01 + +/* + * We usually avoid issuing prefetch advice automatically when sequential + * access is detected, but this flag explicitly disables it, for cases that + * might not be correctly detected. Explicit advice is known to perform worse + * than letting the kernel (at least Linux) detect sequential access. + */ +#define READ_STREAM_SEQUENTIAL 0x02 + +/* + * We usually ramp up from smaller reads to larger ones, to support users who + * don't know if it's worth reading lots of buffers yet. This flag disables + * that, declaring ahead of time that we'll be reading all available buffers. + */ +#define READ_STREAM_FULL 0x04 + +struct ReadStream; +typedef struct ReadStream ReadStream; + +/* Callback that returns the next block number to read. */ +typedef BlockNumber (*ReadStreamBlockNumberCB) (ReadStream *stream, + void *callback_private_data, + void *per_buffer_data); + +extern ReadStream *read_stream_begin_relation(int flags, + BufferAccessStrategy strategy, + BufferManagerRelation bmr, + ForkNumber forknum, + ReadStreamBlockNumberCB callback, + void *callback_private_data, + size_t per_buffer_data_size); +extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_private); +extern void read_stream_end(ReadStream *stream); + +#endif /* READ_STREAM_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 97edd1388e9..82fa6c12970 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1214,6 +1214,7 @@ InjectionPointCacheEntry InjectionPointEntry InjectionPointSharedState InlineCodeBlock +InProgressIO InsertStmt Instrumentation Int128AggState @@ -2292,6 +2293,7 @@ ReadExtraTocPtrType ReadFunc ReadLocalXLogPageNoWaitPrivate ReadReplicationSlotCmd +ReadStream ReassignOwnedStmt RecheckForeignScan_function RecordCacheArrayEntry -- 2.40.1
From 8b877bf126ebd9f1c5a896946b01120d7ece7ca5 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.mu...@gmail.com> Date: Mon, 26 Feb 2024 23:48:31 +1300 Subject: [PATCH v6 3/6] Provide vectored variant of ReadBuffer(). Break ReadBuffer() up into two steps: StartReadBuffers() and WaitReadBuffers(). This has two advantages: 1. Multiple consecutive blocks can be read with one system call. 2. Advice (hints of future reads) can optionally be issued to the kernel. The traditional ReadBuffer() function is now implemented in terms of those functions, to avoid duplication. For now we still only read a block at a time so there is no change to generated system calls yet, but later commits will provide infrastructure to help build up larger calls. Callers should respect the new GUC io_combine_limit, and the limit on per-backend pins which is now exposed as a public interface. With some more infrastructure in later work, StartReadBuffers() could be extended to start real asynchronous I/O instead of advice. Author: Thomas Munro <thomas.mu...@gmail.com> Author: Andres Freund <and...@anarazel.de> (optimization tweaks) Reviewed-by: Melanie Plageman <melanieplage...@gmail.com> Reviewed-by: Heikki Linnakangas <hlinn...@iki.fi> Reviewed-by: Nazir Bilal Yavuz <byavu...@gmail.com> Reviewed-by: Dilip Kumar <dilipbal...@gmail.com> Reviewed-by: Andres Freund <and...@anarazel.de> Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6ut5tum2g...@mail.gmail.com --- doc/src/sgml/config.sgml | 14 + src/backend/storage/buffer/bufmgr.c | 700 ++++++++++++------ src/backend/storage/buffer/localbuf.c | 14 +- src/backend/utils/misc/guc_tables.c | 14 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/storage/bufmgr.h | 41 +- src/tools/pgindent/typedefs.list | 1 + 7 files changed, 563 insertions(+), 222 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 5468637e2ef..f3736000ad2 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2719,6 +2719,20 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-io-combine-limit" xreflabel="io_combine_limit"> + <term><varname>io_combine_limit</varname> (<type>integer</type>) + <indexterm> + <primary><varname>io_combine_limit</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Controls the largest I/O size in operations that combine I/O. + The default is 128kB. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-max-worker-processes" xreflabel="max_worker_processes"> <term><varname>max_worker_processes</varname> (<type>integer</type>) <indexterm> diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index f0f8d4259c5..8df5f3b43da 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -19,6 +19,11 @@ * and pin it so that no one can destroy it while this process * is using it. * + * StartReadBuffers() -- as above, but for multiple contiguous blocks in + * two steps. + * + * WaitReadBuffers() -- second step of StartReadBuffers(). + * * ReleaseBuffer() -- unpin a buffer * * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty". @@ -160,6 +165,9 @@ int checkpoint_flush_after = DEFAULT_CHECKPOINT_FLUSH_AFTER; int bgwriter_flush_after = DEFAULT_BGWRITER_FLUSH_AFTER; int backend_flush_after = DEFAULT_BACKEND_FLUSH_AFTER; +/* Limit on how many blocks should be handled in single I/O operations. */ +int io_combine_limit = DEFAULT_IO_COMBINE_LIMIT; + /* local state for LockBufferForCleanup */ static BufferDesc *PinCountWaitBuf = NULL; @@ -471,10 +479,9 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) ) -static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, +static Buffer ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, BlockNumber blockNum, - ReadBufferMode mode, BufferAccessStrategy strategy, - bool *hit); + ReadBufferMode mode, BufferAccessStrategy strategy); static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr, ForkNumber fork, BufferAccessStrategy strategy, @@ -500,7 +507,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); -static bool StartBufferIO(BufferDesc *buf, bool forInput); +static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, bool forget_owner); static void AbortBufferIO(Buffer buffer); @@ -781,7 +788,6 @@ Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy) { - bool hit; Buffer buf; /* @@ -794,15 +800,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot access temporary tables of other sessions"))); - /* - * Read the buffer, and update pgstat counters to reflect a cache hit or - * miss. - */ - pgstat_count_buffer_read(reln); - buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence, - forkNum, blockNum, mode, strategy, &hit); - if (hit) - pgstat_count_buffer_hit(reln); + buf = ReadBuffer_common(BMR_REL(reln), + forkNum, blockNum, mode, strategy); + return buf; } @@ -822,13 +822,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent) { - bool hit; - SMgrRelation smgr = smgropen(rlocator, INVALID_PROC_NUMBER); - return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT : - RELPERSISTENCE_UNLOGGED, forkNum, blockNum, - mode, strategy, &hit); + return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT : + RELPERSISTENCE_UNLOGGED), + forkNum, blockNum, + mode, strategy); } /* @@ -994,35 +993,146 @@ ExtendBufferedRelTo(BufferManagerRelation bmr, */ if (buffer == InvalidBuffer) { - bool hit; - Assert(extended_by == 0); - buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence, - fork, extend_to - 1, mode, strategy, - &hit); + buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy); } return buffer; } /* - * ReadBuffer_common -- common logic for all ReadBuffer variants - * - * *hit is set to true if the request was satisfied from shared buffer cache. + * Zero a buffer and lock it, as part of the implementation of + * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK. The buffer must be already + * pinned. It does not have to be valid, but it is valid and locked on + * return. */ -static Buffer -ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, - BlockNumber blockNum, ReadBufferMode mode, - BufferAccessStrategy strategy, bool *hit) +static void +ZeroBuffer(Buffer buffer, ReadBufferMode mode) { BufferDesc *bufHdr; - Block bufBlock; - bool found; + uint32 buf_state; + + Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); + + if (BufferIsLocal(buffer)) + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + else + { + bufHdr = GetBufferDescriptor(buffer - 1); + if (mode == RBM_ZERO_AND_LOCK) + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + else + LockBufferForCleanup(buffer); + } + + memset(BufferGetPage(buffer), 0, BLCKSZ); + + if (BufferIsLocal(buffer)) + { + buf_state = pg_atomic_read_u32(&bufHdr->state); + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + buf_state = LockBufHdr(bufHdr); + buf_state |= BM_VALID; + UnlockBufHdr(bufHdr, buf_state); + } +} + +/* + * Pin a buffer for a given block. *foundPtr is set to true if the block was + * already present, or false if more work is required to either read it in or + * zero it. + */ +static inline Buffer +PinBufferForBlock(BufferManagerRelation bmr, + ForkNumber forkNum, + BlockNumber blockNum, + BufferAccessStrategy strategy, + bool *foundPtr) +{ + BufferDesc *bufHdr; + bool isLocalBuf; IOContext io_context; IOObject io_object; - bool isLocalBuf = SmgrIsTemp(smgr); - *hit = false; + Assert(blockNum != P_NEW); + + Assert(bmr.smgr); + + isLocalBuf = bmr.relpersistence == RELPERSISTENCE_TEMP; + if (isLocalBuf) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(strategy); + io_object = IOOBJECT_RELATION; + } + + TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend); + + if (isLocalBuf) + { + bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr); + if (*foundPtr) + pgBufferUsage.local_blks_hit++; + } + else + { + bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum, + strategy, foundPtr, io_context); + if (*foundPtr) + pgBufferUsage.shared_blks_hit++; + } + if (bmr.rel) + { + /* + * While pgBufferUsage's "read" counter isn't bumped unless we reach + * WaitReadBuffers() (so, not for hits, and not for buffers that are + * zeroed instead), the per-relation stats always count them. + */ + pgstat_count_buffer_read(bmr.rel); + if (*foundPtr) + pgstat_count_buffer_hit(bmr.rel); + } + if (*foundPtr) + { + VacuumPageHit++; + pgstat_count_io_op(io_object, io_context, IOOP_HIT); + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; + + TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, + bmr.smgr->smgr_rlocator.locator.spcOid, + bmr.smgr->smgr_rlocator.locator.dbOid, + bmr.smgr->smgr_rlocator.locator.relNumber, + bmr.smgr->smgr_rlocator.backend, + true); + } + + return BufferDescriptorGetBuffer(bufHdr); +} + +/* + * ReadBuffer_common -- common logic for all ReadBuffer variants + */ +static inline Buffer +ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum, + BlockNumber blockNum, ReadBufferMode mode, + BufferAccessStrategy strategy) +{ + ReadBuffersOperation operation; + Buffer buffer; + int flags; /* * Backward compatibility path, most code should use ExtendBufferedRel() @@ -1041,181 +1151,358 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) flags |= EB_LOCK_FIRST; - return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence), - forkNum, strategy, flags); + return ExtendBufferedRel(bmr, forkNum, strategy, flags); } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend); - - if (isLocalBuf) + if (unlikely(mode == RBM_ZERO_AND_CLEANUP_LOCK || + mode == RBM_ZERO_AND_LOCK)) { - /* - * We do not use a BufferAccessStrategy for I/O of temporary tables. - * However, in some cases, the "strategy" may not be NULL, so we can't - * rely on IOContextForStrategy() to set the right IOContext for us. - * This may happen in cases like CREATE TEMPORARY TABLE AS... - */ - io_context = IOCONTEXT_NORMAL; - io_object = IOOBJECT_TEMP_RELATION; - bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found); - if (found) - pgBufferUsage.local_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.local_blks_read++; + bool found; + + if (bmr.smgr == NULL) + { + bmr.smgr = RelationGetSmgr(bmr.rel); + bmr.relpersistence = bmr.rel->rd_rel->relpersistence; + } + + buffer = PinBufferForBlock(bmr, forkNum, blockNum, strategy, &found); + ZeroBuffer(buffer, mode); + return buffer; } + + if (mode == RBM_ZERO_ON_ERROR) + flags = READ_BUFFERS_ZERO_ON_ERROR; else + flags = 0; + operation.bmr = bmr; + operation.forknum = forkNum; + operation.strategy = strategy; + if (StartReadBuffer(&operation, + &buffer, + blockNum, + flags)) + WaitReadBuffers(&operation); + + return buffer; +} + +/* + * Single block version of the StartReadBuffers(). This might save a few + * instructions when called from another translation unit, if the compiler + * inlines the code and specializes for nblocks == 1. + */ +bool +StartReadBuffer(ReadBuffersOperation *operation, + Buffer *buffer, + BlockNumber blocknum, + int flags) +{ + int nblocks = 1; + bool result; + + result = StartReadBuffers(operation, buffer, blocknum, &nblocks, flags); + Assert(nblocks == 1); /* single block can't be short */ + + return result; +} + +/* + * Begin reading a range of blocks beginning at blockNum and extending for + * *nblocks. On return, up to *nblocks pinned buffers holding those blocks + * are written into the buffers array, and *nblocks is updated to contain the + * actual number, which may be fewer than requested. + * + * If false is returned, no I/O is necessary and WaitReadBuffers() is not + * necessary. If true is returned, one I/O has been started, and + * WaitReadBuffers() must be called with the same operation object before the + * buffers are accessed. Along with the operation object, the caller-supplied + * array of buffers must remain valid until WaitReadBuffers() is called. + * + * Currently the I/O is only started with optional operating system advice, + * and the real I/O happens in WaitReadBuffers(). In future work, true I/O + * could be initiated here. + */ +inline bool +StartReadBuffers(ReadBuffersOperation *operation, + Buffer *buffers, + BlockNumber blockNum, + int *nblocks, + int flags) +{ + int actual_nblocks = *nblocks; + int io_buffers_len = 0; + + Assert(*nblocks > 0); + Assert(*nblocks <= MAX_IO_COMBINE_LIMIT); + + if (!operation->bmr.smgr) { - /* - * lookup the buffer. IO_IN_PROGRESS is set if the requested block is - * not currently in memory. - */ - io_context = IOContextForStrategy(strategy); - io_object = IOOBJECT_RELATION; - bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum, - strategy, &found, io_context); - if (found) - pgBufferUsage.shared_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.shared_blks_read++; + operation->bmr.smgr = RelationGetSmgr(operation->bmr.rel); + operation->bmr.relpersistence = operation->bmr.rel->rd_rel->relpersistence; } - /* At this point we do NOT hold any locks. */ - - /* if it was already in the buffer pool, we're done */ - if (found) + for (int i = 0; i < actual_nblocks; ++i) { - /* Just need to update stats before we exit */ - *hit = true; - VacuumPageHit++; - pgstat_count_io_op(io_object, io_context, IOOP_HIT); + bool found; - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; + buffers[i] = PinBufferForBlock(operation->bmr, + operation->forknum, + blockNum + i, + operation->strategy, + &found); - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + if (found) + { + /* + * Terminate the read as soon as we get a hit. It could be a + * single buffer hit, or it could be a hit that follows a readable + * range. We don't want to create more than one readable range, + * so we stop here. + */ + actual_nblocks = i + 1; + break; + } + else + { + /* Extend the readable range to cover this block. */ + io_buffers_len++; + } + } + *nblocks = actual_nblocks; - /* - * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked - * on return. - */ - if (!isLocalBuf) + if (io_buffers_len > 0) + { + /* Populate information needed for I/O. */ + operation->buffers = buffers; + operation->blocknum = blockNum; + operation->flags = flags; + operation->nblocks = actual_nblocks; + operation->io_buffers_len = io_buffers_len; + + if (flags & READ_BUFFERS_ISSUE_ADVICE) { - if (mode == RBM_ZERO_AND_LOCK) - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE); - else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) - LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr)); + /* + * In theory we should only do this if PinBufferForBlock() had to + * allocate new buffers above. That way, if two calls to + * StartReadBuffers() were made for the same blocks before + * WaitReadBuffers(), only the first would issue the advice. + * That'd be a better simulation of true asynchronous I/O, which + * would only start the I/O once, but isn't done here for + * simplicity. Note also that the following call might actually + * issue two advice calls if we cross a segment boundary; in a + * true asynchronous version we might choose to process only one + * real I/O at a time in that case. + */ + smgrprefetch(operation->bmr.smgr, + operation->forknum, + blockNum, + operation->io_buffers_len); } - return BufferDescriptorGetBuffer(bufHdr); + /* Indicate that WaitReadBuffers() should be called. */ + return true; + } + else + { + return false; } +} + +static inline bool +WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) +{ + if (BufferIsLocal(buffer)) + { + BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); + + return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; + } + else + return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); +} + +void +WaitReadBuffers(ReadBuffersOperation *operation) +{ + Buffer *buffers; + int nblocks; + BlockNumber blocknum; + ForkNumber forknum; + bool isLocalBuf; + IOContext io_context; + IOObject io_object; /* - * if we have gotten to this point, we have allocated a buffer for the - * page but its contents are not yet valid. IO_IN_PROGRESS is set for it, - * if it's a shared buffer. + * Currently operations are only allowed to include a read of some range, + * with an optional extra buffer that is already pinned at the end. So + * nblocks can be at most one more than io_buffers_len. */ - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ + Assert((operation->nblocks == operation->io_buffers_len) || + (operation->nblocks == operation->io_buffers_len + 1)); - bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); + /* Find the range of the physical read we need to perform. */ + nblocks = operation->io_buffers_len; + if (nblocks == 0) + return; /* nothing to do */ + + buffers = &operation->buffers[0]; + blocknum = operation->blocknum; + forknum = operation->forknum; + + isLocalBuf = operation->bmr.relpersistence == RELPERSISTENCE_TEMP; + if (isLocalBuf) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; + } /* - * Read in the page, unless the caller intends to overwrite it and just - * wants us to allocate a buffer. + * We count all these blocks as read by this backend. This is traditional + * behavior, but might turn out to be not true if we find that someone + * else has beaten us and completed the read of some of these blocks. In + * that case the system globally double-counts, but we traditionally don't + * count this as a "hit", and we don't have a separate counter for "miss, + * but another backend completed the read". */ - if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) - MemSet((char *) bufBlock, 0, BLCKSZ); + if (isLocalBuf) + pgBufferUsage.local_blks_read += nblocks; else + pgBufferUsage.shared_blks_read += nblocks; + + for (int i = 0; i < nblocks; ++i) { - instr_time io_start = pgstat_prepare_io_time(track_io_timing); + int io_buffers_len; + Buffer io_buffers[MAX_IO_COMBINE_LIMIT]; + void *io_pages[MAX_IO_COMBINE_LIMIT]; + instr_time io_start; + BlockNumber io_first_block; + + /* + * Skip this block if someone else has already completed it. If an + * I/O is already in progress in another backend, this will wait for + * the outcome: either done, or something went wrong and we will + * retry. + */ + if (!WaitReadBuffersCanStartIO(buffers[i], false)) + { + /* + * Report this as a 'hit' for this backend, even though it must + * have started out as a miss in PinBufferForBlock(). + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i, + operation->bmr.smgr->smgr_rlocator.locator.spcOid, + operation->bmr.smgr->smgr_rlocator.locator.dbOid, + operation->bmr.smgr->smgr_rlocator.locator.relNumber, + operation->bmr.smgr->smgr_rlocator.backend, + true); + continue; + } + + /* We found a buffer that we need to read in. */ + io_buffers[0] = buffers[i]; + io_pages[0] = BufferGetBlock(buffers[i]); + io_first_block = blocknum + i; + io_buffers_len = 1; - smgrread(smgr, forkNum, blockNum, bufBlock); + /* + * How many neighboring-on-disk blocks can we can scatter-read into + * other buffers at the same time? In this case we don't wait if we + * see an I/O already in progress. We already hold BM_IO_IN_PROGRESS + * for the head block, so we should get on with that I/O as soon as + * possible. We'll come back to this block again, above. + */ + while ((i + 1) < nblocks && + WaitReadBuffersCanStartIO(buffers[i + 1], true)) + { + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i + 1]) == + BufferGetBlockNumber(buffers[i]) + 1); + + io_buffers[io_buffers_len] = buffers[++i]; + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); + } - pgstat_count_io_op_time(io_object, io_context, - IOOP_READ, io_start, 1); + io_start = pgstat_prepare_io_time(track_io_timing); + smgrreadv(operation->bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, + io_buffers_len); - /* check for garbage data */ - if (!PageIsVerifiedExtended((Page) bufBlock, blockNum, - PIV_LOG_WARNING | PIV_REPORT_STAT)) + /* Verify each block we read, and terminate the I/O. */ + for (int j = 0; j < io_buffers_len; ++j) { - if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) + BufferDesc *bufHdr; + Block bufBlock; + + if (isLocalBuf) { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - MemSet((char *) bufBlock, 0, BLCKSZ); + bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); + bufBlock = LocalBufHdrGetBlock(bufHdr); } else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - } - } - - /* - * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer - * content lock before marking the page as valid, to make sure that no - * other backend sees the zeroed page before the caller has had a chance - * to initialize it. - * - * Since no-one else can be looking at the page contents yet, there is no - * difference between an exclusive lock and a cleanup-strength lock. (Note - * that we cannot use LockBuffer() or LockBufferForCleanup() here, because - * they assert that the buffer is already valid.) - */ - if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) && - !isLocalBuf) - { - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); - } + { + bufHdr = GetBufferDescriptor(io_buffers[j] - 1); + bufBlock = BufHdrGetBlock(bufHdr); + } - if (isLocalBuf) - { - /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + /* check for garbage data */ + if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j, + PIV_LOG_WARNING | PIV_REPORT_STAT)) + { + if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + io_first_block + j, + relpath(operation->bmr.smgr->smgr_rlocator, forknum)))); + memset(bufBlock, 0, BLCKSZ); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s", + io_first_block + j, + relpath(operation->bmr.smgr->smgr_rlocator, forknum)))); + } - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); - } - else - { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true); - } + /* Terminate I/O and set BM_VALID. */ + if (isLocalBuf) + { + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); - VacuumPageMiss++; - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss; + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + /* Set BM_VALID, terminate IO, and wake up any waiters */ + TerminateBufferIO(bufHdr, false, BM_VALID, true); + } - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + /* Report I/Os as completing individually. */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, + operation->bmr.smgr->smgr_rlocator.locator.spcOid, + operation->bmr.smgr->smgr_rlocator.locator.dbOid, + operation->bmr.smgr->smgr_rlocator.locator.relNumber, + operation->bmr.smgr->smgr_rlocator.backend, + false); + } - return BufferDescriptorGetBuffer(bufHdr); + VacuumPageMiss += io_buffers_len; + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + } } /* - * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared - * buffer. If no buffer exists already, selects a replacement - * victim and evicts the old page, but does NOT read in new page. + * BufferAlloc -- subroutine for PinBufferForBlock. Handles lookup of a shared + * buffer. If no buffer exists already, selects a replacement victim and + * evicts the old page, but does NOT read in new page. * * "strategy" can be a buffer replacement strategy object, or NULL for * the default strategy. The selected buffer's usage_count is advanced when @@ -1223,11 +1510,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * * The returned buffer is pinned and is already marked as holding the * desired page. If it already did have the desired page, *foundPtr is - * set true. Otherwise, *foundPtr is set false and the buffer is marked - * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it. - * - * *foundPtr is actually redundant with the buffer's BM_VALID flag, but - * we keep it for simplicity in ReadBuffer. + * set true. Otherwise, *foundPtr is set false. * * io_context is passed as an output parameter to avoid calling * IOContextForStrategy() when there is a shared buffers hit and no IO @@ -1286,19 +1569,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(buf, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return buf; @@ -1363,19 +1637,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(existing_buf_hdr, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return existing_buf_hdr; @@ -1407,15 +1672,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, LWLockRelease(newPartitionLock); /* - * Buffer contents are currently invalid. Try to obtain the right to - * start I/O. If StartBufferIO returns false, then someone else managed - * to read it before we did, so there's nothing left for BufferAlloc() to - * do. + * Buffer contents are currently invalid. */ - if (StartBufferIO(victim_buf_hdr, true)) - *foundPtr = false; - else - *foundPtr = true; + *foundPtr = false; return victim_buf_hdr; } @@ -1769,7 +2028,7 @@ again: * pessimistic, but outside of toy-sized shared_buffers it should allow * sufficient pins. */ -static void +void LimitAdditionalPins(uint32 *additional_pins) { uint32 max_backends; @@ -2034,7 +2293,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, buf_state &= ~BM_VALID; UnlockBufHdr(existing_hdr, buf_state); - } while (!StartBufferIO(existing_hdr, true)); + } while (!StartBufferIO(existing_hdr, true, false)); } else { @@ -2057,7 +2316,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, LWLockRelease(partition_lock); /* XXX: could combine the locked operations in it with the above */ - StartBufferIO(victim_buf_hdr, true); + StartBufferIO(victim_buf_hdr, true, false); } } @@ -2372,7 +2631,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) else { /* - * If we previously pinned the buffer, it must surely be valid. + * If we previously pinned the buffer, it is likely to be valid, but + * it may not be if StartReadBuffers() was called and + * WaitReadBuffers() hasn't been called yet. We'll check by loading + * the flags without locking. This is racy, but it's OK to return + * false spuriously: when WaitReadBuffers() calls StartBufferIO(), + * it'll see that it's now valid. * * Note: We deliberately avoid a Valgrind client request here. * Individual access methods can optionally superimpose buffer page @@ -2381,7 +2645,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) * that the buffer page is legitimately non-accessible here. We * cannot meddle with that. */ - result = true; + result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0; } ref->refcount++; @@ -3449,7 +3713,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false)) + if (!StartBufferIO(buf, false, false)) return; /* Setup error traceback support for ereport() */ @@ -5184,9 +5448,15 @@ WaitIO(BufferDesc *buf) * * Returns true if we successfully marked the buffer as I/O busy, * false if someone else already did the work. + * + * If nowait is true, then we don't wait for an I/O to be finished by another + * backend. In that case, false indicates either that the I/O was already + * finished, or is still in progress. This is useful for callers that want to + * find out if they can perform the I/O as part of a larger operation, without + * waiting for the answer or distinguishing the reasons why not. */ static bool -StartBufferIO(BufferDesc *buf, bool forInput) +StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) { uint32 buf_state; @@ -5199,6 +5469,8 @@ StartBufferIO(BufferDesc *buf, bool forInput) if (!(buf_state & BM_IO_IN_PROGRESS)) break; UnlockBufHdr(buf, buf_state); + if (nowait) + return false; WaitIO(buf); } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index fcfac335a57..985a2c7049c 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -108,10 +108,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum, * LocalBufferAlloc - * Find or create a local buffer for the given page of the given relation. * - * API is similar to bufmgr.c's BufferAlloc, except that we do not need - * to do any locking since this is all local. Also, IO_IN_PROGRESS - * does not get set. Lastly, we support only default access strategy - * (hence, usage_count is always advanced). + * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do + * any locking since this is all local. We support only default access + * strategy (hence, usage_count is always advanced). */ BufferDesc * LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, @@ -287,7 +286,7 @@ GetLocalVictimBuffer(void) } /* see LimitAdditionalPins() */ -static void +void LimitAdditionalLocalPins(uint32 *additional_pins) { uint32 max_pins; @@ -297,9 +296,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins) /* * In contrast to LimitAdditionalPins() other backends don't play a role - * here. We can allow up to NLocBuffer pins in total. + * here. We can allow up to NLocBuffer pins in total, but it might not be + * initialized yet so read num_temp_buffers. */ - max_pins = (NLocBuffer - NLocalPinnedBuffers); + max_pins = (num_temp_buffers - NLocalPinnedBuffers); if (*additional_pins >= max_pins) *additional_pins = max_pins; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index abd9029451f..313e393262f 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3112,6 +3112,20 @@ struct config_int ConfigureNamesInt[] = NULL }, + { + {"io_combine_limit", + PGC_USERSET, + RESOURCES_ASYNCHRONOUS, + gettext_noop("Limit on the size of data reads and writes."), + NULL, + GUC_UNIT_BLOCKS + }, + &io_combine_limit, + DEFAULT_IO_COMBINE_LIMIT, + 1, MAX_IO_COMBINE_LIMIT, + NULL, NULL, NULL + }, + { {"backend_flush_after", PGC_USERSET, RESOURCES_ASYNCHRONOUS, gettext_noop("Number of pages after which previously performed writes are flushed to disk."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 2244ee52f79..7fa6d5a64c8 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -203,6 +203,7 @@ #backend_flush_after = 0 # measured in pages, 0 disables #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #maintenance_io_concurrency = 10 # 1-1000; 0 disables prefetching +#io_combine_limit = 128kB # usually 1-32 blocks (depends on OS) #max_worker_processes = 8 # (change requires restart) #max_parallel_workers_per_gather = 2 # limited by max_parallel_workers #max_parallel_maintenance_workers = 2 # limited by max_parallel_workers diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index d51d46d3353..241f68c45e1 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -14,6 +14,7 @@ #ifndef BUFMGR_H #define BUFMGR_H +#include "port/pg_iovec.h" #include "storage/block.h" #include "storage/buf.h" #include "storage/bufpage.h" @@ -133,6 +134,10 @@ extern PGDLLIMPORT bool track_io_timing; extern PGDLLIMPORT int effective_io_concurrency; extern PGDLLIMPORT int maintenance_io_concurrency; +#define MAX_IO_COMBINE_LIMIT PG_IOV_MAX +#define DEFAULT_IO_COMBINE_LIMIT Min(MAX_IO_COMBINE_LIMIT, (128 * 1024) / BLCKSZ) +extern PGDLLIMPORT int io_combine_limit; + extern PGDLLIMPORT int checkpoint_flush_after; extern PGDLLIMPORT int backend_flush_after; extern PGDLLIMPORT int bgwriter_flush_after; @@ -158,7 +163,6 @@ extern PGDLLIMPORT int32 *LocalRefCount; #define BUFFER_LOCK_SHARE 1 #define BUFFER_LOCK_EXCLUSIVE 2 - /* * prototypes for functions in bufmgr.c */ @@ -177,6 +181,38 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent); + +#define READ_BUFFERS_ZERO_ON_ERROR 0x01 +#define READ_BUFFERS_ISSUE_ADVICE 0x02 + +struct ReadBuffersOperation +{ + /* The following members should be set by the caller. */ + BufferManagerRelation bmr; + ForkNumber forknum; + BufferAccessStrategy strategy; + + /* The following private members should not be accessed directly. */ + Buffer *buffers; + BlockNumber blocknum; + int flags; + int16 nblocks; + int16 io_buffers_len; +}; + +typedef struct ReadBuffersOperation ReadBuffersOperation; + +extern bool StartReadBuffer(ReadBuffersOperation *operation, + Buffer *buffer, + BlockNumber blocknum, + int flags); +extern bool StartReadBuffers(ReadBuffersOperation *operation, + Buffer *buffers, + BlockNumber blocknum, + int *nblocks, + int flags); +extern void WaitReadBuffers(ReadBuffersOperation *operation); + extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); extern bool BufferIsExclusiveLocked(Buffer buffer); @@ -250,6 +286,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); extern bool BgBufferSync(struct WritebackContext *wb_context); +extern void LimitAdditionalPins(uint32 *additional_pins); +extern void LimitAdditionalLocalPins(uint32 *additional_pins); + /* in buf_init.c */ extern void InitBufferPool(void); extern Size BufferShmemSize(void); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cfa9d5aaeac..97edd1388e9 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2286,6 +2286,7 @@ ReInitializeDSMForeignScan_function ReScanForeignScan_function ReadBufPtrType ReadBufferMode +ReadBuffersOperation ReadBytePtrType ReadExtraTocPtrType ReadFunc -- 2.40.1
From 78b7bb5e30f7998b2701f5e967ec5d9c1849f7da Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Wed, 27 Mar 2024 16:46:42 -0400 Subject: [PATCH v6 5/6] Add read_stream_reset For rescan, we want to reuse the existing ReadStream and simply release the pinned buffers. --- src/backend/storage/aio/read_stream.c | 19 +++++++++++++++++++ src/include/storage/read_stream.h | 1 + 2 files changed, 20 insertions(+) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 4e293e0df65..0d855475959 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -731,3 +731,22 @@ read_stream_end(ReadStream *stream) /* Release memory. */ pfree(stream); } + + +/* + * Reset a read stream by releasing all of the buffers. + */ +void +read_stream_reset(ReadStream *stream) +{ + Buffer buffer; + + /* Stop looking ahead. */ + stream->distance = 0; + + /* Unpin anything that wasn't consumed. */ + while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer) + ReleaseBuffer(buffer); + + stream->distance = 1; +} diff --git a/src/include/storage/read_stream.h b/src/include/storage/read_stream.h index 9e5fa2acf15..dc1733a1594 100644 --- a/src/include/storage/read_stream.h +++ b/src/include/storage/read_stream.h @@ -58,5 +58,6 @@ extern ReadStream *read_stream_begin_relation(int flags, size_t per_buffer_data_size); extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_private); extern void read_stream_end(ReadStream *stream); +extern void read_stream_reset(ReadStream *stream); #endif /* READ_STREAM_H */ -- 2.40.1
From f0c79f89574725460342fe55d56365bc2031510e Mon Sep 17 00:00:00 2001 From: Melanie Plageman <melanieplage...@gmail.com> Date: Wed, 27 Mar 2024 20:25:06 -0400 Subject: [PATCH v6 6/6] Sequential scans and TID range scans stream reads Implementing streaming read support for heap sequential scans and TID range scans includes three parts: Allocate the read stream object in heap_beginscan(). On rescan, reset the stream by releasing all pinned buffers and resetting the prefetch block. Implement a callback returning the next block to prefetch to the read stream infrastructure. Invoke the read stream API when a new page is needed. When the scan direction changes, reset the stream. ci-os-only: --- src/backend/access/heap/heapam.c | 88 ++++++++++++++++++++++++++++---- src/include/access/heapam.h | 15 ++++++ 2 files changed, 93 insertions(+), 10 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index c814aea9407..eee62b326e2 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -221,6 +221,25 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +static BlockNumber +heap_scan_stream_read_next(ReadStream *pgsr, void *private_data, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) private_data; + + if (unlikely(!scan->rs_inited)) + { + scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir); + scan->rs_inited = true; + } + else + scan->rs_prefetch_block = heapgettup_advance_block(scan, + scan->rs_prefetch_block, + scan->rs_dir); + + return scan->rs_prefetch_block; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -323,6 +342,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + /* + * Initialize to ForwardScanDirection because it is most common and heap + * scans usually must go forwards before going backward. + */ + scan->rs_dir = ForwardScanDirection; + scan->rs_prefetch_block = InvalidBlockNumber; + /* page-at-a-time fields are always invalid when not rs_inited */ /* @@ -465,6 +491,8 @@ heapbuildvis(TableScanDesc sscan) static inline void heapfetchbuf(HeapScanDesc scan, ScanDirection dir) { + Assert(scan->rs_read_stream); + /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -479,19 +507,23 @@ heapfetchbuf(HeapScanDesc scan, ScanDirection dir) */ CHECK_FOR_INTERRUPTS(); - if (unlikely(!scan->rs_inited)) + /* + * If the scan direction is changing, reset the prefetch block to the + * current block. Otherwise, we will incorrectly prefetch the blocks + * between the prefetch block and the current block again before + * prefetching blocks in the new, correct scan direction. + */ + if (unlikely(scan->rs_dir != dir)) { - scan->rs_cblock = heapgettup_initial_block(scan, dir); - Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; + scan->rs_prefetch_block = scan->rs_cblock; + read_stream_reset(scan->rs_read_stream); } - else - scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir); - /* read block if valid */ - if (BlockNumberIsValid(scan->rs_cblock)) - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, - scan->rs_cblock, RBM_NORMAL, scan->rs_strategy); + scan->rs_dir = dir; + + scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); + if (BufferIsValid(scan->rs_cbuf)) + scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } /* @@ -820,6 +852,7 @@ continue_page: scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -910,6 +943,7 @@ continue_page: ReleaseBuffer(scan->rs_cbuf); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -983,6 +1017,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, /* we only need to set this up once */ scan->rs_ctup.t_tableOid = RelationGetRelid(relation); + scan->rs_read_stream = NULL; + /* * Allocate memory to keep track of page allocation for parallel workers * when doing a parallel scan. @@ -1003,6 +1039,24 @@ heap_beginscan(Relation relation, Snapshot snapshot, initscan(scan, key, false); + /* + * We do not know the scan direction yet. If the scan does not end up + * being a forward scan, the read stream will be freed. This is best done + * after initscan() + */ + if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN || + scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN) + { + scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL, + scan->rs_strategy, + BMR_REL(scan->rs_base.rs_rd), + MAIN_FORKNUM, + heap_scan_stream_read_next, + scan, + 0); + } + + return (TableScanDesc) scan; } @@ -1041,6 +1095,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, * reinitialize scan descriptor */ initscan(scan, key, true); + + /* + * The read stream is reset on rescan. This must be done after initscan(), + * as some state referred to by read_stream_reset() is reset in + * initscan(). + */ + if (scan->rs_read_stream) + read_stream_reset(scan->rs_read_stream); } void @@ -1056,6 +1118,12 @@ heap_endscan(TableScanDesc sscan) if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * Must free the read stream before freeing the BufferAccessStrategy + */ + if (scan->rs_read_stream) + read_stream_end(scan->rs_read_stream); + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index bacef18c0d3..ae4577f1cb8 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -25,6 +25,7 @@ #include "storage/bufpage.h" #include "storage/dsm.h" #include "storage/lockdefs.h" +#include "storage/read_stream.h" #include "storage/shm_toc.h" #include "utils/relcache.h" #include "utils/snapshot.h" @@ -66,6 +67,20 @@ typedef struct HeapScanDescData HeapTupleData rs_ctup; /* current tuple in scan, if any */ + /* For scans that stream reads */ + ReadStream *rs_read_stream; + + /* + * For sequential scans and TID range scans to stream reads. The read + * stream is allocated at the beginning of the scan and reset on rescan or + * when the scan direction changes. The scan direction is saved each time + * a new page is requested. If the scan direction changes from one page to + * the next, the read stream releases all previously pinned buffers and + * resets the prefetch block. + */ + ScanDirection rs_dir; + BlockNumber rs_prefetch_block; + /* * For parallel scans to store page allocation data. NULL when not * performing a parallel scan. -- 2.40.1