On Tue, Feb 27, 2024 at 02:47:03PM -0500, Melanie Plageman wrote:
> On Mon, Jan 29, 2024 at 8:18 PM Melanie Plageman
> <melanieplage...@gmail.com> wrote:
> >
> > On Fri, Jan 26, 2024 at 8:28 AM vignesh C <vignes...@gmail.com> wrote:
> > >
> > > CFBot shows that the patch does not apply anymore as in [1]:
> > > === applying patch
> > > ./v3-0002-Add-lazy_scan_skip-unskippable-state-to-LVRelStat.patch
> > > patching file src/backend/access/heap/vacuumlazy.c
> > > ...
> > > Hunk #10 FAILED at 1042.
> > > Hunk #11 FAILED at 1121.
> > > Hunk #12 FAILED at 1132.
> > > Hunk #13 FAILED at 1161.
> > > Hunk #14 FAILED at 1172.
> > > Hunk #15 FAILED at 1194.
> > > ...
> > > 6 out of 21 hunks FAILED -- saving rejects to file
> > > src/backend/access/heap/vacuumlazy.c.rej
> > >
> > > Please post an updated version for the same.
> > >
> > > [1] - http://cfbot.cputube.org/patch_46_4755.log
> >
> > Fixed in attached rebased v4
> 
> In light of Thomas' update to the streaming read API [1], I have
> rebased and updated this patch set.
> 
> The attached v5 has some simplifications when compared to v4 but takes
> largely the same approach.

Attached is a patch set (v5a) which updates the streaming read user for
vacuum to fix an issue Andrey Borodin pointed out to me off-list.

Note that I started writing this email before seeing Heikki's upthread
review [1], so I will respond to that in a bit. There are no changes in
v5a to any of the prelim refactoring patches which Heikki reviewed in
that email. I only changed the vacuum streaming read users (last two
patches in the set).

Back to this patch set:
Andrey pointed out that it was failing to compile on windows and the
reason is that I had accidentally left an undefined variable "index" in
these places

        Assert(index > 0);
...
        ereport(DEBUG2,
                        (errmsg("table \"%s\": removed %lld dead item 
identifiers in %u pages",
                                        vacrel->relname, (long long) index, 
vacuumed_pages)));

See https://cirrus-ci.com/task/6312305361682432

I don't understand how this didn't warn me (or fail to compile) for an
assert build on my own workstation. It seems to think "index" is a
function?

Anyway, thinking about what the correct assertion would be here:

        Assert(index > 0);
        Assert(vacrel->num_index_scans > 1 ||
                   (rbstate->end_idx == vacrel->lpdead_items &&
                        vacuumed_pages == vacrel->lpdead_item_pages));

I think I can just replace "index" with "rbstate->end_index". At the end
of reaping, this should have the same value that index would have had.
The issue with this is if pg_streaming_read_buffer_get_next() somehow
never returned a valid buffer (there were no dead items), then rbstate
would potentially be uninitialized. The old assertion (index > 0) would
only have been true if there were some dead items, but there isn't an
explicit assertion in this function that there were some dead items.
Perhaps it is worth adding this? Even if we add this, perhaps it is
unacceptable from a programming standpoint to use rbstate in that scope?

In addition to fixing this slip-up, I have done some performance testing
for streaming read vacuum. Note that these tests are for both vacuum
passes (1 and 2) using streaming read.

Performance results:

The TL;DR of my performance results is that streaming read vacuum is
faster. However there is an issue with the interaction of the streaming
read code and the vacuum buffer access strategy which must be addressed.

Note that "master" in the results below is actually just a commit on my
branch [2] before the one adding the vacuum streaming read users. So it
includes all of my refactoring of the vacuum code from the preliminary
patches.

I tested two vacuum "data states". Both are relatively small tables
because the impact of streaming read can easily be seen even at small
table sizes. DDL for both data states is at the end of the email.

The first data state is a 28 MB table which has never been vacuumed and
has one or two dead tuples on every block. All of the blocks have dead
tuples, so all of the blocks must be vacuumed. We'll call this the
"sequential" data state.

The second data state is a 67 MB table which has been vacuumed and then
a small percentage of the blocks (non-consecutive blocks at irregular
intervals) are updated afterward. Because the visibility map has been
updated and only a few blocks have dead tuples, large ranges of blocks
do not need to be vacuumed. There is at least one run of blocks with
dead tuples larger than 1 block but most of the blocks with dead tuples
are a single block followed by many blocks with no dead tuples. We'll
call this the "few" data state.

I tested these data states with "master" and with streaming read vacuum
with three caching options:

- table data fully in shared buffers (via pg_prewarm)
- table data in the kernel buffer cache but not in shared buffers
- table data completely uncached

I tested the OS cached and uncached caching options with both the
default vacuum buffer access strategy and with BUFFER_USAGE_LIMIT 0
(which uses as many shared buffers as needed).

For the streaming read vacuum, I tested with maintenance_io_concurrency
10, 100, and 1000. 10 is the current default on master.
maintenance_io_concurrency is not used by vacuum on master AFAICT.

maintenance_io_concurrency is used by streaming read to determine how
many buffers it can pin at the same time (with the hope of combining
consecutive blocks into larger IOs) and, in the case of vacuum, it is
used to determine prefetch distance.

In the following results, I ran vacuum at least five times and averaged
the timing results.

Table data cached in shared buffers
===================================

Sequential data state
---------------------

The only noticeable difference in performance was that streaming read
vacuum took 2% longer than master (19 ms vs 18.6 ms). It was a bit more
noticeable at maintenance_io_concurrency 1000 than 10.

This may be resolved by a patch Thomas is working on to avoid pinning
too many buffers if larger IOs cannot be created (like in a fully SB
resident workload). We should revisit this when that patch is available.

Few data state
--------------

There was no difference in timing for any of the scenarios.

Table data cached in OS buffer cache
====================================

Sequential data state
---------------------

With the buffer access strategy disabled, streaming read vacuum took 11%
less time regardless of maintenance_io_concurrency (26 ms vs 23 ms).

With the default vacuum buffer access strategy,
maintenance_io_concurrency had a large impact:

  Note that "mic" is maintenace_io_concurrency

| data state | code      | mic  | time (ms) |
+------------+-----------+------+-----------+
| sequential | master    | NA   | 99        |
| sequential | streaming | 10   | 122       |
| sequential | streaming | 100  | 28        |

The streaming read API calculates the maximum number of pinned buffers
as 4 * maintenance_io_concurrency. The default vacuum buffer access
strategy ring buffer is 256 kB -- which is 32 buffers.

With maintenance_io_concurrency 10, streaming read code wants to pin 40
buffers. There is likely an interaction between this and the buffer
access strategy which leads to the slowdown at
maintenance_io_concurrency 10.

We could change the default maintenance_io_concurrency, but a better
option is to take the buffer access strategy into account in the
streaming read code.

Few data state
--------------

There was no difference in timing for any of the scenarios.

Table data uncached
===================

Sequential data state
---------------------

When the buffer access strategy is disabled, streaming read vacuum takes
12% less time regardless of maintenance_io_concurrency (36 ms vs 41 ms).

With the default buffer access strategy (ring buffer 256 kB) and
maintenance_io_concurrency 10 (the default), the streaming read vacuum
takes 19% more time. But if we bump maintenance_io_concurrency up to
100+, streaming read vacuum takes 64% less time:

| data state | code      | mic  | time (ms) |
+------------+-----------+------+-----------+
| sequential | master    | NA   | 113       |
| sequential | streaming | 10   | 140       |
| sequential | streaming | 100  | 41        |

This is likely due to the same adverse interaction between streaming
reads' max pinned buffers and the buffer access strategy ring buffer
size.

Few data state
--------------

The buffer access strategy had no impact here, so all of these results
are with the default buffer access strategy. The streaming read vacuum
takes 20-25% less time than master vacuum.

| data state | code      | mic  | time (ms) |
+------------+-----------+------+-----------+
| few        | master    | NA   | 4.5       |
| few        | streaming | 10   | 3.4       |
| few        | streaming | 100  | 3.5       |

The improvement is likely due to prefetching and the one range of
consecutive blocks containing dead tuples which could be merged into a
larger IO.

Higher maintenance_io_concurrency only helps a little probably because:

1) most the blocks to vacuum are not consecutive so we can't make bigger
IOs in most cases
2) we are not vacuuming enough blocks such that we want to prefetch more
than 10 blocks.

This experiment should probably be redone with larger tables containing
more blocks needing vacuum. At 3-4 ms, a 20% performance difference
isn't really that interesting.

The next step (other than the preliminary refactoring patches) is to
decide how the streaming read API should use the buffer access strategy.

Sequential Data State DDL:
  drop table if exists foo;
  create table foo (a int) with (autovacuum_enabled=false, fillfactor=25);
  insert into foo select i % 3 from generate_series(1,200000)i;
  update foo set a = 5 where a = 1;

Few Data State DDL:
  drop table if exists foo;
  create table foo (a int) with (autovacuum_enabled=false, fillfactor=25);
  insert into foo select i from generate_series(2,20000)i;
  insert into foo select 1 from generate_series(1,200)i;
  insert into foo select i from generate_series(2,20000)i;
  insert into foo select 1 from generate_series(1,200)i;
  insert into foo select i from generate_series(2,200000)i;
  insert into foo select 1 from generate_series(1,200)i;
  insert into foo select i from generate_series(2,20000)i;
  insert into foo select 1 from generate_series(1,2000)i;
  insert into foo select i from generate_series(2,20000)i;
  insert into foo select 1 from generate_series(1,200)i;
  insert into foo select i from generate_series(2,200000)i;
  insert into foo select 1 from generate_series(1,200)i;
  vacuum (freeze) foo;
  update foo set a = 5 where a = 1;

- Melanie

[1] 
https://www.postgresql.org/message-id/1eeccf12-d5d1-4b7e-b88b-7342410129d7%40iki.fi
[2] https://github.com/melanieplageman/postgres/tree/vac_pgsr
>From c7a16bbf477c08ebf553da855e0554077c21de69 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sat, 30 Dec 2023 16:30:59 -0500
Subject: [PATCH v5a 1/7] lazy_scan_skip remove unneeded local var
 nskippable_blocks

nskippable_blocks can be easily derived from next_unskippable_block's
progress when compared to the passed in next_block.
---
 src/backend/access/heap/vacuumlazy.c | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 8b320c3f89a..1dc6cc8e4db 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1103,8 +1103,7 @@ lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer, BlockNumber next_block,
 			   bool *next_unskippable_allvis, bool *skipping_current_range)
 {
 	BlockNumber rel_pages = vacrel->rel_pages,
-				next_unskippable_block = next_block,
-				nskippable_blocks = 0;
+				next_unskippable_block = next_block;
 	bool		skipsallvis = false;
 
 	*next_unskippable_allvis = true;
@@ -1161,7 +1160,6 @@ lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer, BlockNumber next_block,
 
 		vacuum_delay_point();
 		next_unskippable_block++;
-		nskippable_blocks++;
 	}
 
 	/*
@@ -1174,7 +1172,7 @@ lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer, BlockNumber next_block,
 	 * non-aggressive VACUUMs.  If the range has any all-visible pages then
 	 * skipping makes updating relfrozenxid unsafe, which is a real downside.
 	 */
-	if (nskippable_blocks < SKIP_PAGES_THRESHOLD)
+	if (next_unskippable_block - next_block < SKIP_PAGES_THRESHOLD)
 		*skipping_current_range = false;
 	else
 	{
-- 
2.40.1

>From c27436eaeedbc1551072922d92460ca1c30ed116 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sat, 30 Dec 2023 16:22:12 -0500
Subject: [PATCH v5a 2/7] Add lazy_scan_skip unskippable state to LVRelState

Future commits will remove all skipping logic from lazy_scan_heap() and
confine it to lazy_scan_skip(). To make those commits more clear, first
introduce add a struct to LVRelState containing variables needed to skip
ranges less than SKIP_PAGES_THRESHOLD.

lazy_scan_prune() and lazy_scan_new_or_empty() can now access the
buffer containing the relevant block of the visibility map through the
LVRelState.skip, so it no longer needs to be a separate function
parameter.

While we are at it, add additional information to the lazy_scan_skip()
comment, including descriptions of the role and expectations for its
function parameters.
---
 src/backend/access/heap/vacuumlazy.c | 154 ++++++++++++++++-----------
 1 file changed, 90 insertions(+), 64 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 1dc6cc8e4db..0ddb986bc03 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -204,6 +204,22 @@ typedef struct LVRelState
 	int64		live_tuples;	/* # live tuples remaining */
 	int64		recently_dead_tuples;	/* # dead, but not yet removable */
 	int64		missed_dead_tuples; /* # removable, but not removed */
+
+	/*
+	 * Parameters maintained by lazy_scan_skip() to manage skipping ranges of
+	 * pages greater than SKIP_PAGES_THRESHOLD.
+	 */
+	struct
+	{
+		/* Next unskippable block */
+		BlockNumber next_unskippable_block;
+		/* Buffer containing next unskippable block's visibility info */
+		Buffer		vmbuffer;
+		/* Next unskippable block's visibility status */
+		bool		next_unskippable_allvis;
+		/* Whether or not skippable blocks should be skipped */
+		bool		skipping_current_range;
+	}			skip;
 } LVRelState;
 
 /* Struct for saving and restoring vacuum error information. */
@@ -214,19 +230,15 @@ typedef struct LVSavedErrInfo
 	VacErrPhase phase;
 } LVSavedErrInfo;
 
-
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
-static BlockNumber lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer,
-								  BlockNumber next_block,
-								  bool *next_unskippable_allvis,
-								  bool *skipping_current_range);
+static void lazy_scan_skip(LVRelState *vacrel, BlockNumber next_block);
 static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf,
 								   BlockNumber blkno, Page page,
-								   bool sharelock, Buffer vmbuffer);
+								   bool sharelock);
 static void lazy_scan_prune(LVRelState *vacrel, Buffer buf,
 							BlockNumber blkno, Page page,
-							Buffer vmbuffer, bool all_visible_according_to_vm,
+							bool all_visible_according_to_vm,
 							bool *has_lpdead_items);
 static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf,
 							  BlockNumber blkno, Page page,
@@ -803,12 +815,8 @@ lazy_scan_heap(LVRelState *vacrel)
 {
 	BlockNumber rel_pages = vacrel->rel_pages,
 				blkno,
-				next_unskippable_block,
 				next_fsm_block_to_vacuum = 0;
 	VacDeadItems *dead_items = vacrel->dead_items;
-	Buffer		vmbuffer = InvalidBuffer;
-	bool		next_unskippable_allvis,
-				skipping_current_range;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
 		PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -822,10 +830,9 @@ lazy_scan_heap(LVRelState *vacrel)
 	initprog_val[2] = dead_items->max_items;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
+	vacrel->skip.vmbuffer = InvalidBuffer;
 	/* Set up an initial range of skippable blocks using the visibility map */
-	next_unskippable_block = lazy_scan_skip(vacrel, &vmbuffer, 0,
-											&next_unskippable_allvis,
-											&skipping_current_range);
+	lazy_scan_skip(vacrel, 0);
 	for (blkno = 0; blkno < rel_pages; blkno++)
 	{
 		Buffer		buf;
@@ -834,26 +841,23 @@ lazy_scan_heap(LVRelState *vacrel)
 		bool		has_lpdead_items;
 		bool		got_cleanup_lock = false;
 
-		if (blkno == next_unskippable_block)
+		if (blkno == vacrel->skip.next_unskippable_block)
 		{
 			/*
 			 * Can't skip this page safely.  Must scan the page.  But
 			 * determine the next skippable range after the page first.
 			 */
-			all_visible_according_to_vm = next_unskippable_allvis;
-			next_unskippable_block = lazy_scan_skip(vacrel, &vmbuffer,
-													blkno + 1,
-													&next_unskippable_allvis,
-													&skipping_current_range);
+			all_visible_according_to_vm = vacrel->skip.next_unskippable_allvis;
+			lazy_scan_skip(vacrel, blkno + 1);
 
-			Assert(next_unskippable_block >= blkno + 1);
+			Assert(vacrel->skip.next_unskippable_block >= blkno + 1);
 		}
 		else
 		{
 			/* Last page always scanned (may need to set nonempty_pages) */
 			Assert(blkno < rel_pages - 1);
 
-			if (skipping_current_range)
+			if (vacrel->skip.skipping_current_range)
 				continue;
 
 			/* Current range is too small to skip -- just scan the page */
@@ -896,10 +900,10 @@ lazy_scan_heap(LVRelState *vacrel)
 			 * correctness, but we do it anyway to avoid holding the pin
 			 * across a lengthy, unrelated operation.
 			 */
-			if (BufferIsValid(vmbuffer))
+			if (BufferIsValid(vacrel->skip.vmbuffer))
 			{
-				ReleaseBuffer(vmbuffer);
-				vmbuffer = InvalidBuffer;
+				ReleaseBuffer(vacrel->skip.vmbuffer);
+				vacrel->skip.vmbuffer = InvalidBuffer;
 			}
 
 			/* Perform a round of index and heap vacuuming */
@@ -924,7 +928,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		 * all-visible.  In most cases this will be very cheap, because we'll
 		 * already have the correct page pinned anyway.
 		 */
-		visibilitymap_pin(vacrel->rel, blkno, &vmbuffer);
+		visibilitymap_pin(vacrel->rel, blkno, &vacrel->skip.vmbuffer);
 
 		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
 								 vacrel->bstrategy);
@@ -942,8 +946,7 @@ lazy_scan_heap(LVRelState *vacrel)
 			LockBuffer(buf, BUFFER_LOCK_SHARE);
 
 		/* Check for new or empty pages before lazy_scan_[no]prune call */
-		if (lazy_scan_new_or_empty(vacrel, buf, blkno, page, !got_cleanup_lock,
-								   vmbuffer))
+		if (lazy_scan_new_or_empty(vacrel, buf, blkno, page, !got_cleanup_lock))
 		{
 			/* Processed as new/empty page (lock and pin released) */
 			continue;
@@ -985,7 +988,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		if (got_cleanup_lock)
 			lazy_scan_prune(vacrel, buf, blkno, page,
-							vmbuffer, all_visible_according_to_vm,
+							all_visible_according_to_vm,
 							&has_lpdead_items);
 
 		/*
@@ -1035,8 +1038,11 @@ lazy_scan_heap(LVRelState *vacrel)
 	}
 
 	vacrel->blkno = InvalidBlockNumber;
-	if (BufferIsValid(vmbuffer))
-		ReleaseBuffer(vmbuffer);
+	if (BufferIsValid(vacrel->skip.vmbuffer))
+	{
+		ReleaseBuffer(vacrel->skip.vmbuffer);
+		vacrel->skip.vmbuffer = InvalidBuffer;
+	}
 
 	/* report that everything is now scanned */
 	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
@@ -1080,15 +1086,34 @@ lazy_scan_heap(LVRelState *vacrel)
  *	lazy_scan_skip() -- set up range of skippable blocks using visibility map.
  *
  * lazy_scan_heap() calls here every time it needs to set up a new range of
- * blocks to skip via the visibility map.  Caller passes the next block in
- * line.  We return a next_unskippable_block for this range.  When there are
- * no skippable blocks we just return caller's next_block.  The all-visible
- * status of the returned block is set in *next_unskippable_allvis for caller,
- * too.  Block usually won't be all-visible (since it's unskippable), but it
- * can be during aggressive VACUUMs (as well as in certain edge cases).
+ * blocks to skip via the visibility map.  Caller passes next_block, the next
+ * block in line. The parameters of the skipped range are recorded in skip.
+ * vacrel is an in/out parameter here; vacuum options and information about the
+ * relation are read and vacrel->skippedallvis is set to ensure we don't
+ * advance relfrozenxid when we have skipped vacuuming all visible blocks.
+ *
+ * skip->vmbuffer will contain the block from the VM containing visibility
+ * information for the next unskippable heap block. We may end up needed a
+ * different block from the VM (if we decide not to skip a skippable block).
+ * This is okay; visibilitymap_pin() will take care of this while processing
+ * the block.
+ *
+ * A block is unskippable if it is not all visible according to the visibility
+ * map. It is also unskippable if it is the last block in the relation, if the
+ * vacuum is an aggressive vacuum, or if DISABLE_PAGE_SKIPPING was passed to
+ * vacuum.
  *
- * Sets *skipping_current_range to indicate if caller should skip this range.
- * Costs and benefits drive our decision.  Very small ranges won't be skipped.
+ * Even if a block is skippable, we may choose not to skip it if the range of
+ * skippable blocks is too small (below SKIP_PAGES_THRESHOLD). As a
+ * consequence, we must keep track of the next truly unskippable block and its
+ * visibility status along with whether or not we are skipping the current
+ * range of skippable blocks. This can be used to derive the next block
+ * lazy_scan_heap() must process and its visibility status.
+ *
+ * The block number and visibility status of the next unskippable block are set
+ * in skip->next_unskippable_block and next_unskippable_allvis.
+ * skip->skipping_current_range indicates to the caller whether or not it is
+ * processing a skippable (and thus all-visible) block.
  *
  * Note: our opinion of which blocks can be skipped can go stale immediately.
  * It's okay if caller "misses" a page whose all-visible or all-frozen marking
@@ -1098,25 +1123,26 @@ lazy_scan_heap(LVRelState *vacrel)
  * older XIDs/MXIDs.  The vacrel->skippedallvis flag will be set here when the
  * choice to skip such a range is actually made, making everything safe.)
  */
-static BlockNumber
-lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer, BlockNumber next_block,
-			   bool *next_unskippable_allvis, bool *skipping_current_range)
+static void
+lazy_scan_skip(LVRelState *vacrel, BlockNumber next_block)
 {
+	/* Use local variables for better optimized loop code */
 	BlockNumber rel_pages = vacrel->rel_pages,
 				next_unskippable_block = next_block;
+
 	bool		skipsallvis = false;
 
-	*next_unskippable_allvis = true;
+	vacrel->skip.next_unskippable_allvis = true;
 	while (next_unskippable_block < rel_pages)
 	{
 		uint8		mapbits = visibilitymap_get_status(vacrel->rel,
 													   next_unskippable_block,
-													   vmbuffer);
+													   &vacrel->skip.vmbuffer);
 
 		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) == 0)
 		{
 			Assert((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0);
-			*next_unskippable_allvis = false;
+			vacrel->skip.next_unskippable_allvis = false;
 			break;
 		}
 
@@ -1137,7 +1163,7 @@ lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer, BlockNumber next_block,
 		if (!vacrel->skipwithvm)
 		{
 			/* Caller shouldn't rely on all_visible_according_to_vm */
-			*next_unskippable_allvis = false;
+			vacrel->skip.next_unskippable_allvis = false;
 			break;
 		}
 
@@ -1162,6 +1188,8 @@ lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer, BlockNumber next_block,
 		next_unskippable_block++;
 	}
 
+	vacrel->skip.next_unskippable_block = next_unskippable_block;
+
 	/*
 	 * We only skip a range with at least SKIP_PAGES_THRESHOLD consecutive
 	 * pages.  Since we're reading sequentially, the OS should be doing
@@ -1172,16 +1200,14 @@ lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer, BlockNumber next_block,
 	 * non-aggressive VACUUMs.  If the range has any all-visible pages then
 	 * skipping makes updating relfrozenxid unsafe, which is a real downside.
 	 */
-	if (next_unskippable_block - next_block < SKIP_PAGES_THRESHOLD)
-		*skipping_current_range = false;
+	if (vacrel->skip.next_unskippable_block - next_block < SKIP_PAGES_THRESHOLD)
+		vacrel->skip.skipping_current_range = false;
 	else
 	{
-		*skipping_current_range = true;
+		vacrel->skip.skipping_current_range = true;
 		if (skipsallvis)
 			vacrel->skippedallvis = true;
 	}
-
-	return next_unskippable_block;
 }
 
 /*
@@ -1214,7 +1240,7 @@ lazy_scan_skip(LVRelState *vacrel, Buffer *vmbuffer, BlockNumber next_block,
  */
 static bool
 lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno,
-					   Page page, bool sharelock, Buffer vmbuffer)
+					   Page page, bool sharelock)
 {
 	Size		freespace;
 
@@ -1300,7 +1326,7 @@ lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno,
 
 			PageSetAllVisible(page);
 			visibilitymap_set(vacrel->rel, blkno, buf, InvalidXLogRecPtr,
-							  vmbuffer, InvalidTransactionId,
+							  vacrel->skip.vmbuffer, InvalidTransactionId,
 							  VISIBILITYMAP_ALL_VISIBLE | VISIBILITYMAP_ALL_FROZEN);
 			END_CRIT_SECTION();
 		}
@@ -1336,10 +1362,11 @@ lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno,
  * any tuple that becomes dead after the call to heap_page_prune() can't need to
  * be frozen, because it was visible to another session when vacuum started.
  *
- * vmbuffer is the buffer containing the VM block with visibility information
- * for the heap block, blkno. all_visible_according_to_vm is the saved
- * visibility status of the heap block looked up earlier by the caller. We
- * won't rely entirely on this status, as it may be out of date.
+ * vacrel->skipstate.vmbuffer is the buffer containing the VM block with
+ * visibility information for the heap block, blkno.
+ * all_visible_according_to_vm is the saved visibility status of the heap block
+ * looked up earlier by the caller. We won't rely entirely on this status, as
+ * it may be out of date.
  *
  * *has_lpdead_items is set to true or false depending on whether, upon return
  * from this function, any LP_DEAD items are still present on the page.
@@ -1349,7 +1376,6 @@ lazy_scan_prune(LVRelState *vacrel,
 				Buffer buf,
 				BlockNumber blkno,
 				Page page,
-				Buffer vmbuffer,
 				bool all_visible_according_to_vm,
 				bool *has_lpdead_items)
 {
@@ -1783,7 +1809,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		PageSetAllVisible(page);
 		MarkBufferDirty(buf);
 		visibilitymap_set(vacrel->rel, blkno, buf, InvalidXLogRecPtr,
-						  vmbuffer, visibility_cutoff_xid,
+						  vacrel->skip.vmbuffer, visibility_cutoff_xid,
 						  flags);
 	}
 
@@ -1794,11 +1820,11 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * buffer lock before concluding that the VM is corrupt.
 	 */
 	else if (all_visible_according_to_vm && !PageIsAllVisible(page) &&
-			 visibilitymap_get_status(vacrel->rel, blkno, &vmbuffer) != 0)
+			 visibilitymap_get_status(vacrel->rel, blkno, &vacrel->skip.vmbuffer) != 0)
 	{
 		elog(WARNING, "page is not marked all-visible but visibility map bit is set in relation \"%s\" page %u",
 			 vacrel->relname, blkno);
-		visibilitymap_clear(vacrel->rel, blkno, vmbuffer,
+		visibilitymap_clear(vacrel->rel, blkno, vacrel->skip.vmbuffer,
 							VISIBILITYMAP_VALID_BITS);
 	}
 
@@ -1822,7 +1848,7 @@ lazy_scan_prune(LVRelState *vacrel,
 			 vacrel->relname, blkno);
 		PageClearAllVisible(page);
 		MarkBufferDirty(buf);
-		visibilitymap_clear(vacrel->rel, blkno, vmbuffer,
+		visibilitymap_clear(vacrel->rel, blkno, vacrel->skip.vmbuffer,
 							VISIBILITYMAP_VALID_BITS);
 	}
 
@@ -1832,7 +1858,7 @@ lazy_scan_prune(LVRelState *vacrel,
 	 * true, so we must check both all_visible and all_frozen.
 	 */
 	else if (all_visible_according_to_vm && all_visible &&
-			 all_frozen && !VM_ALL_FROZEN(vacrel->rel, blkno, &vmbuffer))
+			 all_frozen && !VM_ALL_FROZEN(vacrel->rel, blkno, &vacrel->skip.vmbuffer))
 	{
 		/*
 		 * Avoid relying on all_visible_according_to_vm as a proxy for the
@@ -1854,7 +1880,7 @@ lazy_scan_prune(LVRelState *vacrel,
 		 */
 		Assert(!TransactionIdIsValid(visibility_cutoff_xid));
 		visibilitymap_set(vacrel->rel, blkno, buf, InvalidXLogRecPtr,
-						  vmbuffer, InvalidTransactionId,
+						  vacrel->skip.vmbuffer, InvalidTransactionId,
 						  VISIBILITYMAP_ALL_VISIBLE |
 						  VISIBILITYMAP_ALL_FROZEN);
 	}
-- 
2.40.1

>From a33c255419eb539901b8d07852e1c4b189b615df Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sat, 30 Dec 2023 16:59:27 -0500
Subject: [PATCH v5a 3/7] Confine vacuum skip logic to lazy_scan_skip

In preparation for vacuum to use the streaming read interface (and
eventually AIO), refactor vacuum's logic for skipping blocks such that
it is entirely confined to lazy_scan_skip(). This turns lazy_scan_skip()
and the skip state in LVRelState it uses into an iterator which yields
blocks to lazy_scan_heap(). Such a structure is conducive to an async
interface. While we are at it, rename lazy_scan_skip() to
heap_vac_scan_get_next_block(), which now more accurately describes it.

By always calling heap_vac_scan_get_next_block() -- instead of only when
we have reached the next unskippable block, we no longer need the
skipping_current_range variable. lazy_scan_heap() no longer needs to
manage the skipped range -- checking if we reached the end in order to
then call heap_vac_scan_get_next_block(). And
heap_vac_scan_get_next_block() can derive the visibility status of a
block from whether or not we are in a skippable range -- that is,
whether or not the next_block is equal to the next unskippable block.
---
 src/backend/access/heap/vacuumlazy.c | 243 ++++++++++++++-------------
 1 file changed, 126 insertions(+), 117 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 0ddb986bc03..99d160335e1 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -206,8 +206,8 @@ typedef struct LVRelState
 	int64		missed_dead_tuples; /* # removable, but not removed */
 
 	/*
-	 * Parameters maintained by lazy_scan_skip() to manage skipping ranges of
-	 * pages greater than SKIP_PAGES_THRESHOLD.
+	 * Parameters maintained by heap_vac_scan_get_next_block() to manage
+	 * skipping ranges of pages greater than SKIP_PAGES_THRESHOLD.
 	 */
 	struct
 	{
@@ -232,7 +232,9 @@ typedef struct LVSavedErrInfo
 
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
-static void lazy_scan_skip(LVRelState *vacrel, BlockNumber next_block);
+static bool heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
+										 BlockNumber *blkno,
+										 bool *all_visible_according_to_vm);
 static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf,
 								   BlockNumber blkno, Page page,
 								   bool sharelock);
@@ -814,8 +816,11 @@ static void
 lazy_scan_heap(LVRelState *vacrel)
 {
 	BlockNumber rel_pages = vacrel->rel_pages,
-				blkno,
 				next_fsm_block_to_vacuum = 0;
+	bool		all_visible_according_to_vm;
+
+	/* relies on InvalidBlockNumber overflowing to 0 */
+	BlockNumber blkno = InvalidBlockNumber;
 	VacDeadItems *dead_items = vacrel->dead_items;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
@@ -830,40 +835,17 @@ lazy_scan_heap(LVRelState *vacrel)
 	initprog_val[2] = dead_items->max_items;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
+	vacrel->skip.next_unskippable_block = InvalidBlockNumber;
 	vacrel->skip.vmbuffer = InvalidBuffer;
-	/* Set up an initial range of skippable blocks using the visibility map */
-	lazy_scan_skip(vacrel, 0);
-	for (blkno = 0; blkno < rel_pages; blkno++)
+
+	while (heap_vac_scan_get_next_block(vacrel, blkno + 1,
+										&blkno, &all_visible_according_to_vm))
 	{
 		Buffer		buf;
 		Page		page;
-		bool		all_visible_according_to_vm;
 		bool		has_lpdead_items;
 		bool		got_cleanup_lock = false;
 
-		if (blkno == vacrel->skip.next_unskippable_block)
-		{
-			/*
-			 * Can't skip this page safely.  Must scan the page.  But
-			 * determine the next skippable range after the page first.
-			 */
-			all_visible_according_to_vm = vacrel->skip.next_unskippable_allvis;
-			lazy_scan_skip(vacrel, blkno + 1);
-
-			Assert(vacrel->skip.next_unskippable_block >= blkno + 1);
-		}
-		else
-		{
-			/* Last page always scanned (may need to set nonempty_pages) */
-			Assert(blkno < rel_pages - 1);
-
-			if (vacrel->skip.skipping_current_range)
-				continue;
-
-			/* Current range is too small to skip -- just scan the page */
-			all_visible_according_to_vm = true;
-		}
-
 		vacrel->scanned_pages++;
 
 		/* Report as block scanned, update error traceback information */
@@ -1083,20 +1065,14 @@ lazy_scan_heap(LVRelState *vacrel)
 }
 
 /*
- *	lazy_scan_skip() -- set up range of skippable blocks using visibility map.
- *
- * lazy_scan_heap() calls here every time it needs to set up a new range of
- * blocks to skip via the visibility map.  Caller passes next_block, the next
- * block in line. The parameters of the skipped range are recorded in skip.
- * vacrel is an in/out parameter here; vacuum options and information about the
- * relation are read and vacrel->skippedallvis is set to ensure we don't
- * advance relfrozenxid when we have skipped vacuuming all visible blocks.
+ *	heap_vac_scan_get_next_block() -- get next block for vacuum to process
  *
- * skip->vmbuffer will contain the block from the VM containing visibility
- * information for the next unskippable heap block. We may end up needed a
- * different block from the VM (if we decide not to skip a skippable block).
- * This is okay; visibilitymap_pin() will take care of this while processing
- * the block.
+ * lazy_scan_heap() calls here every time it needs to get the next block to
+ * prune and vacuum, using the visibility map, vacuum options, and various
+ * thresholds to skip blocks which do not need to be processed. Caller passes
+ * next_block, the next block in line. This block may end up being skipped.
+ * heap_vac_scan_get_next_block() sets blkno to next block that actually needs
+ * to be processed.
  *
  * A block is unskippable if it is not all visible according to the visibility
  * map. It is also unskippable if it is the last block in the relation, if the
@@ -1106,14 +1082,25 @@ lazy_scan_heap(LVRelState *vacrel)
  * Even if a block is skippable, we may choose not to skip it if the range of
  * skippable blocks is too small (below SKIP_PAGES_THRESHOLD). As a
  * consequence, we must keep track of the next truly unskippable block and its
- * visibility status along with whether or not we are skipping the current
- * range of skippable blocks. This can be used to derive the next block
- * lazy_scan_heap() must process and its visibility status.
+ * visibility status separate from the next block lazy_scan_heap() should
+ * process (and its visibility status).
  *
  * The block number and visibility status of the next unskippable block are set
- * in skip->next_unskippable_block and next_unskippable_allvis.
- * skip->skipping_current_range indicates to the caller whether or not it is
- * processing a skippable (and thus all-visible) block.
+ * in vacrel->skip->next_unskippable_block and next_unskippable_allvis.
+ *
+ * The block number and visibility status of the next block to process are set
+ * in blkno and all_visible_according_to_vm. heap_vac_scan_get_next_block()
+ * returns false if there are no further blocks to process.
+ *
+ * vacrel is an in/out parameter here; vacuum options and information about the
+ * relation are read and vacrel->skippedallvis is set to ensure we don't
+ * advance relfrozenxid when we have skipped vacuuming all visible blocks.
+ *
+ * skip->vmbuffer will contain the block from the VM containing visibility
+ * information for the next unskippable heap block. We may end up needed a
+ * different block from the VM (if we decide not to skip a skippable block).
+ * This is okay; visibilitymap_pin() will take care of this while processing
+ * the block.
  *
  * Note: our opinion of which blocks can be skipped can go stale immediately.
  * It's okay if caller "misses" a page whose all-visible or all-frozen marking
@@ -1123,91 +1110,113 @@ lazy_scan_heap(LVRelState *vacrel)
  * older XIDs/MXIDs.  The vacrel->skippedallvis flag will be set here when the
  * choice to skip such a range is actually made, making everything safe.)
  */
-static void
-lazy_scan_skip(LVRelState *vacrel, BlockNumber next_block)
+static bool
+heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
+							 BlockNumber *blkno, bool *all_visible_according_to_vm)
 {
-	/* Use local variables for better optimized loop code */
-	BlockNumber rel_pages = vacrel->rel_pages,
-				next_unskippable_block = next_block;
-
 	bool		skipsallvis = false;
 
-	vacrel->skip.next_unskippable_allvis = true;
-	while (next_unskippable_block < rel_pages)
+	if (next_block >= vacrel->rel_pages)
 	{
-		uint8		mapbits = visibilitymap_get_status(vacrel->rel,
-													   next_unskippable_block,
-													   &vacrel->skip.vmbuffer);
+		*blkno = InvalidBlockNumber;
+		return false;
+	}
 
-		if ((mapbits & VISIBILITYMAP_ALL_VISIBLE) == 0)
+	if (vacrel->skip.next_unskippable_block == InvalidBlockNumber ||
+		next_block > vacrel->skip.next_unskippable_block)
+	{
+		/* Use local variables for better optimized loop code */
+		BlockNumber rel_pages = vacrel->rel_pages;
+		BlockNumber next_unskippable_block = vacrel->skip.next_unskippable_block;
+
+		while (++next_unskippable_block < rel_pages)
 		{
-			Assert((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0);
-			vacrel->skip.next_unskippable_allvis = false;
-			break;
-		}
+			uint8		mapbits = visibilitymap_get_status(vacrel->rel,
+														   next_unskippable_block,
+														   &vacrel->skip.vmbuffer);
 
-		/*
-		 * Caller must scan the last page to determine whether it has tuples
-		 * (caller must have the opportunity to set vacrel->nonempty_pages).
-		 * This rule avoids having lazy_truncate_heap() take access-exclusive
-		 * lock on rel to attempt a truncation that fails anyway, just because
-		 * there are tuples on the last page (it is likely that there will be
-		 * tuples on other nearby pages as well, but those can be skipped).
-		 *
-		 * Implement this by always treating the last block as unsafe to skip.
-		 */
-		if (next_unskippable_block == rel_pages - 1)
-			break;
+			vacrel->skip.next_unskippable_allvis = mapbits & VISIBILITYMAP_ALL_VISIBLE;
 
-		/* DISABLE_PAGE_SKIPPING makes all skipping unsafe */
-		if (!vacrel->skipwithvm)
-		{
-			/* Caller shouldn't rely on all_visible_according_to_vm */
-			vacrel->skip.next_unskippable_allvis = false;
-			break;
-		}
+			if (!vacrel->skip.next_unskippable_allvis)
+			{
+				Assert((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0);
+				break;
+			}
 
-		/*
-		 * Aggressive VACUUM caller can't skip pages just because they are
-		 * all-visible.  They may still skip all-frozen pages, which can't
-		 * contain XIDs < OldestXmin (XIDs that aren't already frozen by now).
-		 */
-		if ((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0)
-		{
-			if (vacrel->aggressive)
+			/*
+			 * Caller must scan the last page to determine whether it has
+			 * tuples (caller must have the opportunity to set
+			 * vacrel->nonempty_pages). This rule avoids having
+			 * lazy_truncate_heap() take access-exclusive lock on rel to
+			 * attempt a truncation that fails anyway, just because there are
+			 * tuples on the last page (it is likely that there will be tuples
+			 * on other nearby pages as well, but those can be skipped).
+			 *
+			 * Implement this by always treating the last block as unsafe to
+			 * skip.
+			 */
+			if (next_unskippable_block == rel_pages - 1)
 				break;
 
+			/* DISABLE_PAGE_SKIPPING makes all skipping unsafe */
+			if (!vacrel->skipwithvm)
+			{
+				/* Caller shouldn't rely on all_visible_according_to_vm */
+				vacrel->skip.next_unskippable_allvis = false;
+				break;
+			}
+
 			/*
-			 * All-visible block is safe to skip in non-aggressive case.  But
-			 * remember that the final range contains such a block for later.
+			 * Aggressive VACUUM caller can't skip pages just because they are
+			 * all-visible.  They may still skip all-frozen pages, which can't
+			 * contain XIDs < OldestXmin (XIDs that aren't already frozen by
+			 * now).
 			 */
-			skipsallvis = true;
+			if ((mapbits & VISIBILITYMAP_ALL_FROZEN) == 0)
+			{
+				if (vacrel->aggressive)
+					break;
+
+				/*
+				 * All-visible block is safe to skip in non-aggressive case.
+				 * But remember that the final range contains such a block for
+				 * later.
+				 */
+				skipsallvis = true;
+			}
+
+			vacuum_delay_point();
 		}
 
-		vacuum_delay_point();
-		next_unskippable_block++;
-	}
+		vacrel->skip.next_unskippable_block = next_unskippable_block;
 
-	vacrel->skip.next_unskippable_block = next_unskippable_block;
+		/*
+		 * We only skip a range with at least SKIP_PAGES_THRESHOLD consecutive
+		 * pages.  Since we're reading sequentially, the OS should be doing
+		 * readahead for us, so there's no gain in skipping a page now and
+		 * then. Skipping such a range might even discourage sequential
+		 * detection.
+		 *
+		 * This test also enables more frequent relfrozenxid advancement
+		 * during non-aggressive VACUUMs.  If the range has any all-visible
+		 * pages then skipping makes updating relfrozenxid unsafe, which is a
+		 * real downside.
+		 */
+		if (vacrel->skip.next_unskippable_block - next_block >= SKIP_PAGES_THRESHOLD)
+		{
+			next_block = vacrel->skip.next_unskippable_block;
+			if (skipsallvis)
+				vacrel->skippedallvis = true;
+		}
+	}
 
-	/*
-	 * We only skip a range with at least SKIP_PAGES_THRESHOLD consecutive
-	 * pages.  Since we're reading sequentially, the OS should be doing
-	 * readahead for us, so there's no gain in skipping a page now and then.
-	 * Skipping such a range might even discourage sequential detection.
-	 *
-	 * This test also enables more frequent relfrozenxid advancement during
-	 * non-aggressive VACUUMs.  If the range has any all-visible pages then
-	 * skipping makes updating relfrozenxid unsafe, which is a real downside.
-	 */
-	if (vacrel->skip.next_unskippable_block - next_block < SKIP_PAGES_THRESHOLD)
-		vacrel->skip.skipping_current_range = false;
+	if (next_block == vacrel->skip.next_unskippable_block)
+		*all_visible_according_to_vm = vacrel->skip.next_unskippable_allvis;
 	else
-	{
-		vacrel->skip.skipping_current_range = true;
-		if (skipsallvis)
-			vacrel->skippedallvis = true;
-	}
+		*all_visible_according_to_vm = true;
+
+	*blkno = next_block;
+	return true;
 }
 
 /*
-- 
2.40.1

>From 4ab5ed0863ca4fcdc197811d60011971a29b8857 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sun, 31 Dec 2023 12:49:56 -0500
Subject: [PATCH v5a 4/7] Remove unneeded vacuum_delay_point from
 heap_vac_scan_get_next_block

heap_vac_scan_get_next_block() does relatively little work, so there is
no need to call vacuum_delay_point(). A future commit will call
heap_vac_scan_get_next_block() from a callback, and we would like to
avoid calling vacuum_delay_point() in that callback.
---
 src/backend/access/heap/vacuumlazy.c | 2 --
 1 file changed, 2 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 99d160335e1..65d257aab83 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1184,8 +1184,6 @@ heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 				 */
 				skipsallvis = true;
 			}
-
-			vacuum_delay_point();
 		}
 
 		vacrel->skip.next_unskippable_block = next_unskippable_block;
-- 
2.40.1

>From 0c82c513818f1aa3e8aa982344aaac13f54629e4 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Wed, 6 Mar 2024 14:46:08 -0500
Subject: [PATCH v5a 5/7] Streaming Read API

---
 src/backend/storage/Makefile             |   2 +-
 src/backend/storage/aio/Makefile         |  14 +
 src/backend/storage/aio/meson.build      |   5 +
 src/backend/storage/aio/streaming_read.c | 612 ++++++++++++++++++++++
 src/backend/storage/buffer/bufmgr.c      | 641 ++++++++++++++++-------
 src/backend/storage/buffer/localbuf.c    |  14 +-
 src/backend/storage/meson.build          |   1 +
 src/include/storage/bufmgr.h             |  45 ++
 src/include/storage/streaming_read.h     |  52 ++
 src/tools/pgindent/typedefs.list         |   3 +
 10 files changed, 1179 insertions(+), 210 deletions(-)
 create mode 100644 src/backend/storage/aio/Makefile
 create mode 100644 src/backend/storage/aio/meson.build
 create mode 100644 src/backend/storage/aio/streaming_read.c
 create mode 100644 src/include/storage/streaming_read.h

diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca20..eec03f6f2b4 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = aio buffer file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
new file mode 100644
index 00000000000..bcab44c802f
--- /dev/null
+++ b/src/backend/storage/aio/Makefile
@@ -0,0 +1,14 @@
+#
+# Makefile for storage/aio
+#
+# src/backend/storage/aio/Makefile
+#
+
+subdir = src/backend/storage/aio
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = \
+	streaming_read.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
new file mode 100644
index 00000000000..39aef2a84a2
--- /dev/null
+++ b/src/backend/storage/aio/meson.build
@@ -0,0 +1,5 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+backend_sources += files(
+  'streaming_read.c',
+)
diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c
new file mode 100644
index 00000000000..71f2c4a70b6
--- /dev/null
+++ b/src/backend/storage/aio/streaming_read.c
@@ -0,0 +1,612 @@
+#include "postgres.h"
+
+#include "storage/streaming_read.h"
+#include "utils/rel.h"
+
+/*
+ * Element type for PgStreamingRead's circular array of block ranges.
+ */
+typedef struct PgStreamingReadRange
+{
+	bool		need_wait;
+	bool		advice_issued;
+	BlockNumber blocknum;
+	int			nblocks;
+	int			per_buffer_data_index;
+	Buffer		buffers[MAX_BUFFERS_PER_TRANSFER];
+	ReadBuffersOperation operation;
+} PgStreamingReadRange;
+
+/*
+ * Streaming read object.
+ */
+struct PgStreamingRead
+{
+	int			max_ios;
+	int			ios_in_progress;
+	int			max_pinned_buffers;
+	int			pinned_buffers;
+	int			pinned_buffers_trigger;
+	int			next_tail_buffer;
+	int			ramp_up_pin_limit;
+	int			ramp_up_pin_stall;
+	bool		finished;
+	bool		advice_enabled;
+	void	   *pgsr_private;
+	PgStreamingReadBufferCB callback;
+
+	BufferAccessStrategy strategy;
+	BufferManagerRelation bmr;
+	ForkNumber	forknum;
+
+	/* Sometimes we need to buffer one block for flow control. */
+	BlockNumber unget_blocknum;
+	void	   *unget_per_buffer_data;
+
+	/* Next expected block, for detecting sequential access. */
+	BlockNumber seq_blocknum;
+
+	/* Space for optional per-buffer private data. */
+	size_t		per_buffer_data_size;
+	void	   *per_buffer_data;
+
+	/* Circular buffer of ranges. */
+	int			size;
+	int			head;
+	int			tail;
+	PgStreamingReadRange ranges[FLEXIBLE_ARRAY_MEMBER];
+};
+
+static PgStreamingRead *
+pg_streaming_read_buffer_alloc_internal(int flags,
+										void *pgsr_private,
+										size_t per_buffer_data_size,
+										BufferAccessStrategy strategy)
+{
+	PgStreamingRead *pgsr;
+	int			size;
+	int			max_ios;
+	uint32		max_pinned_buffers;
+
+
+	/*
+	 * Decide how many assumed I/Os we will allow to run concurrently.  That
+	 * is, advice to the kernel to tell it that we will soon read.  This
+	 * number also affects how far we look ahead for opportunities to start
+	 * more I/Os.
+	 */
+	if (flags & PGSR_FLAG_MAINTENANCE)
+		max_ios = maintenance_io_concurrency;
+	else
+		max_ios = effective_io_concurrency;
+
+	/*
+	 * The desired level of I/O concurrency controls how far ahead we are
+	 * willing to look ahead.  We also clamp it to at least
+	 * MAX_BUFFER_PER_TRANFER so that we can have a chance to build up a full
+	 * sized read, even when max_ios is zero.
+	 */
+	max_pinned_buffers = Max(max_ios * 4, MAX_BUFFERS_PER_TRANSFER);
+
+	/*
+	 * The *_io_concurrency GUCs might be set to 0, but we want to allow at
+	 * least one, to keep our gating logic simple.
+	 */
+	max_ios = Max(max_ios, 1);
+
+	/*
+	 * Don't allow this backend to pin too many buffers.  For now we'll apply
+	 * the limit for the shared buffer pool and the local buffer pool, without
+	 * worrying which it is.
+	 */
+	LimitAdditionalPins(&max_pinned_buffers);
+	LimitAdditionalLocalPins(&max_pinned_buffers);
+	Assert(max_pinned_buffers > 0);
+
+	/*
+	 * pgsr->ranges is a circular buffer.  When it is empty, head == tail.
+	 * When it is full, there is an empty element between head and tail.  Head
+	 * can also be empty (nblocks == 0), therefore we need two extra elements
+	 * for non-occupied ranges, on top of max_pinned_buffers to allow for the
+	 * maxmimum possible number of occupied ranges of the smallest possible
+	 * size of one.
+	 */
+	size = max_pinned_buffers + 2;
+
+	pgsr = (PgStreamingRead *)
+		palloc0(offsetof(PgStreamingRead, ranges) +
+				sizeof(pgsr->ranges[0]) * size);
+
+	pgsr->max_ios = max_ios;
+	pgsr->per_buffer_data_size = per_buffer_data_size;
+	pgsr->max_pinned_buffers = max_pinned_buffers;
+	pgsr->pgsr_private = pgsr_private;
+	pgsr->strategy = strategy;
+	pgsr->size = size;
+
+	pgsr->unget_blocknum = InvalidBlockNumber;
+
+#ifdef USE_PREFETCH
+
+	/*
+	 * This system supports prefetching advice.  As long as direct I/O isn't
+	 * enabled, and the caller hasn't promised sequential access, we can use
+	 * it.
+	 */
+	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+		(flags & PGSR_FLAG_SEQUENTIAL) == 0)
+		pgsr->advice_enabled = true;
+#endif
+
+	/*
+	 * We start off building small ranges, but double that quickly, for the
+	 * benefit of users that don't know how far ahead they'll read.  This can
+	 * be disabled by users that already know they'll read all the way.
+	 */
+	if (flags & PGSR_FLAG_FULL)
+		pgsr->ramp_up_pin_limit = INT_MAX;
+	else
+		pgsr->ramp_up_pin_limit = 1;
+
+	/*
+	 * We want to avoid creating ranges that are smaller than they could be
+	 * just because we hit max_pinned_buffers.  We only look ahead when the
+	 * number of pinned buffers falls below this trigger number, or put
+	 * another way, we stop looking ahead when we wouldn't be able to build a
+	 * "full sized" range.
+	 */
+	pgsr->pinned_buffers_trigger =
+		Max(1, (int) max_pinned_buffers - MAX_BUFFERS_PER_TRANSFER);
+
+	/* Space for the callback to store extra data along with each block. */
+	if (per_buffer_data_size)
+		pgsr->per_buffer_data = palloc(per_buffer_data_size * max_pinned_buffers);
+
+	return pgsr;
+}
+
+/*
+ * Create a new streaming read object that can be used to perform the
+ * equivalent of a series of ReadBuffer() calls for one fork of one relation.
+ * Internally, it generates larger vectored reads where possible by looking
+ * ahead.
+ */
+PgStreamingRead *
+pg_streaming_read_buffer_alloc(int flags,
+							   void *pgsr_private,
+							   size_t per_buffer_data_size,
+							   BufferAccessStrategy strategy,
+							   BufferManagerRelation bmr,
+							   ForkNumber forknum,
+							   PgStreamingReadBufferCB next_block_cb)
+{
+	PgStreamingRead *result;
+
+	result = pg_streaming_read_buffer_alloc_internal(flags,
+													 pgsr_private,
+													 per_buffer_data_size,
+													 strategy);
+	result->callback = next_block_cb;
+	result->bmr = bmr;
+	result->forknum = forknum;
+
+	return result;
+}
+
+/*
+ * Find the per-buffer data index for the Nth block of a range.
+ */
+static int
+get_per_buffer_data_index(PgStreamingRead *pgsr, PgStreamingReadRange *range, int n)
+{
+	int			result;
+
+	/*
+	 * Find slot in the circular buffer of per-buffer data, without using the
+	 * expensive % operator.
+	 */
+	result = range->per_buffer_data_index + n;
+	if (result >= pgsr->max_pinned_buffers)
+		result -= pgsr->max_pinned_buffers;
+	Assert(result == (range->per_buffer_data_index + n) % pgsr->max_pinned_buffers);
+
+	return result;
+}
+
+/*
+ * Return a pointer to the per-buffer data by index.
+ */
+static void *
+get_per_buffer_data_by_index(PgStreamingRead *pgsr, int per_buffer_data_index)
+{
+	return (char *) pgsr->per_buffer_data +
+		pgsr->per_buffer_data_size * per_buffer_data_index;
+}
+
+/*
+ * Return a pointer to the per-buffer data for the Nth block of a range.
+ */
+static void *
+get_per_buffer_data(PgStreamingRead *pgsr, PgStreamingReadRange *range, int n)
+{
+	return get_per_buffer_data_by_index(pgsr,
+										get_per_buffer_data_index(pgsr,
+																  range,
+																  n));
+}
+
+/*
+ * Start reading the head range, and create a new head range.  The new head
+ * range is returned.  It may not be empty, if StartReadBuffers() couldn't
+ * start the entire range; in that case the returned range contains the
+ * remaining portion of the range.
+ */
+static PgStreamingReadRange *
+pg_streaming_read_start_head_range(PgStreamingRead *pgsr)
+{
+	PgStreamingReadRange *head_range;
+	PgStreamingReadRange *new_head_range;
+	int			nblocks_pinned;
+	int			flags;
+
+	/* Caller should make sure we never exceed max_ios. */
+	Assert(pgsr->ios_in_progress < pgsr->max_ios);
+
+	/* Should only call if the head range has some blocks to read. */
+	head_range = &pgsr->ranges[pgsr->head];
+	Assert(head_range->nblocks > 0);
+
+	/*
+	 * If advice hasn't been suppressed, and this system supports it, this
+	 * isn't a strictly sequential pattern, then we'll issue advice.
+	 */
+	if (pgsr->advice_enabled && head_range->blocknum != pgsr->seq_blocknum)
+		flags = READ_BUFFERS_ISSUE_ADVICE;
+	else
+		flags = 0;
+
+
+	/* Start reading as many blocks as we can from the head range. */
+	nblocks_pinned = head_range->nblocks;
+	head_range->need_wait =
+		StartReadBuffers(pgsr->bmr,
+						 head_range->buffers,
+						 pgsr->forknum,
+						 head_range->blocknum,
+						 &nblocks_pinned,
+						 pgsr->strategy,
+						 flags,
+						 &head_range->operation);
+
+	/* Did that start an I/O? */
+	if (head_range->need_wait && (flags & READ_BUFFERS_ISSUE_ADVICE))
+	{
+		head_range->advice_issued = true;
+		pgsr->ios_in_progress++;
+		Assert(pgsr->ios_in_progress <= pgsr->max_ios);
+	}
+
+	/*
+	 * StartReadBuffers() might have pinned fewer blocks than we asked it to,
+	 * but always at least one.
+	 */
+	Assert(nblocks_pinned <= head_range->nblocks);
+	Assert(nblocks_pinned >= 1);
+	pgsr->pinned_buffers += nblocks_pinned;
+
+	/*
+	 * Remember where the next block would be after that, so we can detect
+	 * sequential access next time.
+	 */
+	pgsr->seq_blocknum = head_range->blocknum + nblocks_pinned;
+
+	/*
+	 * Create a new head range.  There must be space, because we have enough
+	 * elements for every range to hold just one block, up to the pin limit.
+	 */
+	Assert(pgsr->size > pgsr->max_pinned_buffers);
+	Assert((pgsr->head + 1) % pgsr->size != pgsr->tail);
+	if (++pgsr->head == pgsr->size)
+		pgsr->head = 0;
+	new_head_range = &pgsr->ranges[pgsr->head];
+	new_head_range->nblocks = 0;
+	new_head_range->advice_issued = false;
+
+	/*
+	 * If we didn't manage to start the whole read above, we split the range,
+	 * moving the remainder into the new head range.
+	 */
+	if (nblocks_pinned < head_range->nblocks)
+	{
+		int			nblocks_remaining = head_range->nblocks - nblocks_pinned;
+
+		head_range->nblocks = nblocks_pinned;
+
+		new_head_range->blocknum = head_range->blocknum + nblocks_pinned;
+		new_head_range->nblocks = nblocks_remaining;
+	}
+
+	/* The new range has per-buffer data starting after the previous range. */
+	new_head_range->per_buffer_data_index =
+		get_per_buffer_data_index(pgsr, head_range, nblocks_pinned);
+
+	return new_head_range;
+}
+
+/*
+ * Ask the callback which block it would like us to read next, with a small
+ * buffer in front to allow pg_streaming_unget_block() to work.
+ */
+static BlockNumber
+pg_streaming_get_block(PgStreamingRead *pgsr, void *per_buffer_data)
+{
+	BlockNumber result;
+
+	if (unlikely(pgsr->unget_blocknum != InvalidBlockNumber))
+	{
+		/*
+		 * If we had to unget a block, now it is time to return that one
+		 * again.
+		 */
+		result = pgsr->unget_blocknum;
+		pgsr->unget_blocknum = InvalidBlockNumber;
+
+		/*
+		 * The same per_buffer_data element must have been used, and still
+		 * contains whatever data the callback wrote into it.  So we just
+		 * sanity-check that we were called with the value that
+		 * pg_streaming_unget_block() pushed back.
+		 */
+		Assert(per_buffer_data == pgsr->unget_per_buffer_data);
+	}
+	else
+	{
+		/* Use the installed callback directly. */
+		result = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data);
+	}
+
+	return result;
+}
+
+/*
+ * In order to deal with short reads in StartReadBuffers(), we sometimes need
+ * to defer handling of a block until later.  This *must* be called with the
+ * last value returned by pg_streaming_get_block().
+ */
+static void
+pg_streaming_unget_block(PgStreamingRead *pgsr, BlockNumber blocknum, void *per_buffer_data)
+{
+	Assert(pgsr->unget_blocknum == InvalidBlockNumber);
+	pgsr->unget_blocknum = blocknum;
+	pgsr->unget_per_buffer_data = per_buffer_data;
+}
+
+static void
+pg_streaming_read_look_ahead(PgStreamingRead *pgsr)
+{
+	PgStreamingReadRange *range;
+
+	/*
+	 * If we're still ramping up, we may have to stall to wait for buffers to
+	 * be consumed first before we do any more prefetching.
+	 */
+	if (pgsr->ramp_up_pin_stall > 0)
+	{
+		Assert(pgsr->pinned_buffers > 0);
+		return;
+	}
+
+	/*
+	 * If we're finished or can't start more I/O, then don't look ahead.
+	 */
+	if (pgsr->finished || pgsr->ios_in_progress == pgsr->max_ios)
+		return;
+
+	/*
+	 * We'll also wait until the number of pinned buffers falls below our
+	 * trigger level, so that we have the chance to create a full range.
+	 */
+	if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+		return;
+
+	do
+	{
+		BlockNumber blocknum;
+		void	   *per_buffer_data;
+
+		/* Do we have a full-sized range? */
+		range = &pgsr->ranges[pgsr->head];
+		if (range->nblocks == lengthof(range->buffers))
+		{
+			/* Start as much of it as we can. */
+			range = pg_streaming_read_start_head_range(pgsr);
+
+			/* If we're now at the I/O limit, stop here. */
+			if (pgsr->ios_in_progress == pgsr->max_ios)
+				return;
+
+			/*
+			 * If we couldn't form a full range, then stop here to avoid
+			 * creating small I/O.
+			 */
+			if (pgsr->pinned_buffers >= pgsr->pinned_buffers_trigger)
+				return;
+
+			/*
+			 * That might have only been partially started, but always
+			 * processes at least one so that'll do for now.
+			 */
+			Assert(range->nblocks < lengthof(range->buffers));
+		}
+
+		/* Find per-buffer data slot for the next block. */
+		per_buffer_data = get_per_buffer_data(pgsr, range, range->nblocks);
+
+		/* Find out which block the callback wants to read next. */
+		blocknum = pg_streaming_get_block(pgsr, per_buffer_data);
+		if (blocknum == InvalidBlockNumber)
+		{
+			/* End of stream. */
+			pgsr->finished = true;
+			break;
+		}
+
+		/*
+		 * Is there a head range that we cannot extend, because the requested
+		 * block is not consecutive?
+		 */
+		if (range->nblocks > 0 &&
+			range->blocknum + range->nblocks != blocknum)
+		{
+			/* Yes.  Start it, so we can begin building a new one. */
+			range = pg_streaming_read_start_head_range(pgsr);
+
+			/*
+			 * It's possible that it was only partially started, and we have a
+			 * new range with the remainder.  Keep starting I/Os until we get
+			 * it all out of the way, or we hit the I/O limit.
+			 */
+			while (range->nblocks > 0 && pgsr->ios_in_progress < pgsr->max_ios)
+				range = pg_streaming_read_start_head_range(pgsr);
+
+			/*
+			 * We have to 'unget' the block returned by the callback if we
+			 * don't have enough I/O capacity left to start something.
+			 */
+			if (pgsr->ios_in_progress == pgsr->max_ios)
+			{
+				pg_streaming_unget_block(pgsr, blocknum, per_buffer_data);
+				return;
+			}
+		}
+
+		/* If we have a new, empty range, initialize the start block. */
+		if (range->nblocks == 0)
+		{
+			range->blocknum = blocknum;
+		}
+
+		/* This block extends the range by one. */
+		Assert(range->blocknum + range->nblocks == blocknum);
+		range->nblocks++;
+
+	} while (pgsr->pinned_buffers + range->nblocks < pgsr->max_pinned_buffers &&
+			 pgsr->pinned_buffers + range->nblocks < pgsr->ramp_up_pin_limit);
+
+	/* If we've hit the ramp-up limit, insert a stall. */
+	if (pgsr->pinned_buffers + range->nblocks >= pgsr->ramp_up_pin_limit)
+	{
+		/* Can't get here if an earlier stall hasn't finished. */
+		Assert(pgsr->ramp_up_pin_stall == 0);
+		/* Don't do any more prefetching until these buffers are consumed. */
+		pgsr->ramp_up_pin_stall = pgsr->ramp_up_pin_limit;
+		/* Double it.  It will soon be out of the way. */
+		pgsr->ramp_up_pin_limit *= 2;
+	}
+
+	/* Start as much as we can. */
+	while (range->nblocks > 0)
+	{
+		range = pg_streaming_read_start_head_range(pgsr);
+		if (pgsr->ios_in_progress == pgsr->max_ios)
+			break;
+	}
+}
+
+Buffer
+pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_data)
+{
+	pg_streaming_read_look_ahead(pgsr);
+
+	/* See if we have one buffer to return. */
+	while (pgsr->tail != pgsr->head)
+	{
+		PgStreamingReadRange *tail_range;
+
+		tail_range = &pgsr->ranges[pgsr->tail];
+
+		/*
+		 * Do we need to perform an I/O before returning the buffers from this
+		 * range?
+		 */
+		if (tail_range->need_wait)
+		{
+			WaitReadBuffers(&tail_range->operation);
+			tail_range->need_wait = false;
+
+			/*
+			 * We don't really know if the kernel generated a physical I/O
+			 * when we issued advice, let alone when it finished, but it has
+			 * certainly finished now because we've performed the read.
+			 */
+			if (tail_range->advice_issued)
+			{
+				Assert(pgsr->ios_in_progress > 0);
+				pgsr->ios_in_progress--;
+			}
+		}
+
+		/* Are there more buffers available in this range? */
+		if (pgsr->next_tail_buffer < tail_range->nblocks)
+		{
+			int			buffer_index;
+			Buffer		buffer;
+
+			buffer_index = pgsr->next_tail_buffer++;
+			buffer = tail_range->buffers[buffer_index];
+
+			Assert(BufferIsValid(buffer));
+
+			/* We are giving away ownership of this pinned buffer. */
+			Assert(pgsr->pinned_buffers > 0);
+			pgsr->pinned_buffers--;
+
+			if (pgsr->ramp_up_pin_stall > 0)
+				pgsr->ramp_up_pin_stall--;
+
+			if (per_buffer_data)
+				*per_buffer_data = get_per_buffer_data(pgsr, tail_range, buffer_index);
+
+			return buffer;
+		}
+
+		/* Advance tail to next range, if there is one. */
+		if (++pgsr->tail == pgsr->size)
+			pgsr->tail = 0;
+		pgsr->next_tail_buffer = 0;
+
+		/*
+		 * If tail crashed into head, and head is not empty, then it is time
+		 * to start that range.
+		 */
+		if (pgsr->tail == pgsr->head &&
+			pgsr->ranges[pgsr->head].nblocks > 0)
+			pg_streaming_read_start_head_range(pgsr);
+	}
+
+	Assert(pgsr->pinned_buffers == 0);
+
+	return InvalidBuffer;
+}
+
+void
+pg_streaming_read_free(PgStreamingRead *pgsr)
+{
+	Buffer		buffer;
+
+	/* Stop looking ahead. */
+	pgsr->finished = true;
+
+	/* Unpin anything that wasn't consumed. */
+	while ((buffer = pg_streaming_read_buffer_get_next(pgsr, NULL)) != InvalidBuffer)
+		ReleaseBuffer(buffer);
+
+	Assert(pgsr->pinned_buffers == 0);
+	Assert(pgsr->ios_in_progress == 0);
+
+	/* Release memory. */
+	if (pgsr->per_buffer_data)
+		pfree(pgsr->per_buffer_data);
+
+	pfree(pgsr);
+}
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f0f8d4259c5..729d1f91721 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -19,6 +19,11 @@
  *		and pin it so that no one can destroy it while this process
  *		is using it.
  *
+ * StartReadBuffers() -- as above, but for multiple contiguous blocks in
+ *		two steps.
+ *
+ * WaitReadBuffers() -- second step of StartReadBuffers().
+ *
  * ReleaseBuffer() -- unpin a buffer
  *
  * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
@@ -471,10 +476,9 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
 )
 
 
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(BufferManagerRelation bmr,
 								ForkNumber forkNum, BlockNumber blockNum,
-								ReadBufferMode mode, BufferAccessStrategy strategy,
-								bool *hit);
+								ReadBufferMode mode, BufferAccessStrategy strategy);
 static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr,
 										   ForkNumber fork,
 										   BufferAccessStrategy strategy,
@@ -500,7 +504,7 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
 static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
 						  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
-static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
 							  uint32 set_flag_bits, bool forget_owner);
 static void AbortBufferIO(Buffer buffer);
@@ -781,7 +785,6 @@ Buffer
 ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 				   ReadBufferMode mode, BufferAccessStrategy strategy)
 {
-	bool		hit;
 	Buffer		buf;
 
 	/*
@@ -794,15 +797,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("cannot access temporary tables of other sessions")));
 
-	/*
-	 * Read the buffer, and update pgstat counters to reflect a cache hit or
-	 * miss.
-	 */
-	pgstat_count_buffer_read(reln);
-	buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence,
-							forkNum, blockNum, mode, strategy, &hit);
-	if (hit)
-		pgstat_count_buffer_hit(reln);
+	buf = ReadBuffer_common(BMR_REL(reln),
+							forkNum, blockNum, mode, strategy);
+
 	return buf;
 }
 
@@ -822,13 +819,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
 						  BlockNumber blockNum, ReadBufferMode mode,
 						  BufferAccessStrategy strategy, bool permanent)
 {
-	bool		hit;
-
 	SMgrRelation smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
 
-	return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
-							 RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
-							 mode, strategy, &hit);
+	return ReadBuffer_common(BMR_SMGR(smgr, permanent ? RELPERSISTENCE_PERMANENT :
+									  RELPERSISTENCE_UNLOGGED),
+							 forkNum, blockNum,
+							 mode, strategy);
 }
 
 /*
@@ -994,35 +990,68 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
 	 */
 	if (buffer == InvalidBuffer)
 	{
-		bool		hit;
-
 		Assert(extended_by == 0);
-		buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
-								   fork, extend_to - 1, mode, strategy,
-								   &hit);
+		buffer = ReadBuffer_common(bmr, fork, extend_to - 1, mode, strategy);
 	}
 
 	return buffer;
 }
 
+/*
+ * Zero a buffer and lock it, as part of the implementation of
+ * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK.  The buffer must be already
+ * pinned.  It does not have to be valid, but it is valid and locked on
+ * return.
+ */
+static void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
+{
+	BufferDesc *bufHdr;
+	uint32		buf_state;
+
+	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+	if (BufferIsLocal(buffer))
+		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+	else
+	{
+		bufHdr = GetBufferDescriptor(buffer - 1);
+		if (mode == RBM_ZERO_AND_LOCK)
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		else
+			LockBufferForCleanup(buffer);
+	}
+
+	memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state |= BM_VALID;
+		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+	}
+	else
+	{
+		buf_state = LockBufHdr(bufHdr);
+		buf_state |= BM_VALID;
+		UnlockBufHdr(bufHdr, buf_state);
+	}
+}
+
 /*
  * ReadBuffer_common -- common logic for all ReadBuffer variants
  *
  * *hit is set to true if the request was satisfied from shared buffer cache.
  */
 static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
+ReadBuffer_common(BufferManagerRelation bmr, ForkNumber forkNum,
 				  BlockNumber blockNum, ReadBufferMode mode,
-				  BufferAccessStrategy strategy, bool *hit)
+				  BufferAccessStrategy strategy)
 {
-	BufferDesc *bufHdr;
-	Block		bufBlock;
-	bool		found;
-	IOContext	io_context;
-	IOObject	io_object;
-	bool		isLocalBuf = SmgrIsTemp(smgr);
-
-	*hit = false;
+	ReadBuffersOperation operation;
+	Buffer		buffer;
+	int			nblocks;
+	int			flags;
 
 	/*
 	 * Backward compatibility path, most code should use ExtendBufferedRel()
@@ -1041,181 +1070,404 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
 			flags |= EB_LOCK_FIRST;
 
-		return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
-								 forkNum, strategy, flags);
+		return ExtendBufferedRel(bmr, forkNum, strategy, flags);
 	}
 
-	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
-									   smgr->smgr_rlocator.locator.spcOid,
-									   smgr->smgr_rlocator.locator.dbOid,
-									   smgr->smgr_rlocator.locator.relNumber,
-									   smgr->smgr_rlocator.backend);
+	nblocks = 1;
+	if (mode == RBM_ZERO_ON_ERROR)
+		flags = READ_BUFFERS_ZERO_ON_ERROR;
+	else
+		flags = 0;
+	if (StartReadBuffers(bmr,
+						 &buffer,
+						 forkNum,
+						 blockNum,
+						 &nblocks,
+						 strategy,
+						 flags,
+						 &operation))
+		WaitReadBuffers(&operation);
+	Assert(nblocks == 1);		/* single block can't be short */
+
+	if (mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO_AND_LOCK)
+		ZeroBuffer(buffer, mode);
+
+	return buffer;
+}
+
+static Buffer
+PrepareReadBuffer(BufferManagerRelation bmr,
+				  ForkNumber forkNum,
+				  BlockNumber blockNum,
+				  BufferAccessStrategy strategy,
+				  bool *foundPtr)
+{
+	BufferDesc *bufHdr;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	Assert(blockNum != P_NEW);
 
+	Assert(bmr.smgr);
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
 	if (isLocalBuf)
 	{
-		/*
-		 * We do not use a BufferAccessStrategy for I/O of temporary tables.
-		 * However, in some cases, the "strategy" may not be NULL, so we can't
-		 * rely on IOContextForStrategy() to set the right IOContext for us.
-		 * This may happen in cases like CREATE TEMPORARY TABLE AS...
-		 */
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
-		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
-		if (found)
-			pgBufferUsage.local_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.local_blks_read++;
 	}
 	else
 	{
-		/*
-		 * lookup the buffer.  IO_IN_PROGRESS is set if the requested block is
-		 * not currently in memory.
-		 */
 		io_context = IOContextForStrategy(strategy);
 		io_object = IOOBJECT_RELATION;
-		bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
-							 strategy, &found, io_context);
-		if (found)
-			pgBufferUsage.shared_blks_hit++;
-		else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
-				 mode == RBM_ZERO_ON_ERROR)
-			pgBufferUsage.shared_blks_read++;
 	}
 
-	/* At this point we do NOT hold any locks. */
+	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
+									   bmr.smgr->smgr_rlocator.locator.spcOid,
+									   bmr.smgr->smgr_rlocator.locator.dbOid,
+									   bmr.smgr->smgr_rlocator.locator.relNumber,
+									   bmr.smgr->smgr_rlocator.backend);
 
-	/* if it was already in the buffer pool, we're done */
-	if (found)
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+	if (isLocalBuf)
+	{
+		bufHdr = LocalBufferAlloc(bmr.smgr, forkNum, blockNum, foundPtr);
+		if (*foundPtr)
+			pgBufferUsage.local_blks_hit++;
+	}
+	else
+	{
+		bufHdr = BufferAlloc(bmr.smgr, bmr.relpersistence, forkNum, blockNum,
+							 strategy, foundPtr, io_context);
+		if (*foundPtr)
+			pgBufferUsage.shared_blks_hit++;
+	}
+	if (bmr.rel)
+	{
+		/*
+		 * While pgBufferUsage's "read" counter isn't bumped unless we reach
+		 * WaitReadBuffers() (so, not for hits, and not for buffers that are
+		 * zeroed instead), the per-relation stats always count them.
+		 */
+		pgstat_count_buffer_read(bmr.rel);
+		if (*foundPtr)
+			pgstat_count_buffer_hit(bmr.rel);
+	}
+	if (*foundPtr)
 	{
-		/* Just need to update stats before we exit */
-		*hit = true;
 		VacuumPageHit++;
 		pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
 		if (VacuumCostActive)
 			VacuumCostBalance += VacuumCostPageHit;
 
 		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  found);
+										  bmr.smgr->smgr_rlocator.locator.spcOid,
+										  bmr.smgr->smgr_rlocator.locator.dbOid,
+										  bmr.smgr->smgr_rlocator.locator.relNumber,
+										  bmr.smgr->smgr_rlocator.backend,
+										  true);
+	}
 
-		/*
-		 * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
-		 * on return.
-		 */
-		if (!isLocalBuf)
-		{
-			if (mode == RBM_ZERO_AND_LOCK)
-				LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
-							  LW_EXCLUSIVE);
-			else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
-				LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
-		}
+	return BufferDescriptorGetBuffer(bufHdr);
+}
 
-		return BufferDescriptorGetBuffer(bufHdr);
+/*
+ * Begin reading a range of blocks beginning at blockNum and extending for
+ * *nblocks.  On return, up to *nblocks pinned buffers holding those blocks
+ * are written into the buffers array, and *nblocks is updated to contain the
+ * actual number, which may be fewer than requested.
+ *
+ * If false is returned, no I/O is necessary and WaitReadBuffers() is not
+ * necessary.  If true is returned, one I/O has been started, and
+ * WaitReadBuffers() must be called with the same operation object before the
+ * buffers are accessed.  Along with the operation object, the caller-supplied
+ * array of buffers must remain valid until WaitReadBuffers() is called.
+ *
+ * Currently the I/O is only started with optional operating system advice,
+ * and the real I/O happens in WaitReadBuffers().  In future work, true I/O
+ * could be initiated here.
+ */
+bool
+StartReadBuffers(BufferManagerRelation bmr,
+				 Buffer *buffers,
+				 ForkNumber forkNum,
+				 BlockNumber blockNum,
+				 int *nblocks,
+				 BufferAccessStrategy strategy,
+				 int flags,
+				 ReadBuffersOperation *operation)
+{
+	int			actual_nblocks = *nblocks;
+
+	if (bmr.rel)
+	{
+		bmr.smgr = RelationGetSmgr(bmr.rel);
+		bmr.relpersistence = bmr.rel->rd_rel->relpersistence;
 	}
 
-	/*
-	 * if we have gotten to this point, we have allocated a buffer for the
-	 * page but its contents are not yet valid.  IO_IN_PROGRESS is set for it,
-	 * if it's a shared buffer.
-	 */
-	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID));	/* spinlock not needed */
+	operation->bmr = bmr;
+	operation->forknum = forkNum;
+	operation->blocknum = blockNum;
+	operation->buffers = buffers;
+	operation->nblocks = actual_nblocks;
+	operation->strategy = strategy;
+	operation->flags = flags;
 
-	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+	operation->io_buffers_len = 0;
 
-	/*
-	 * Read in the page, unless the caller intends to overwrite it and just
-	 * wants us to allocate a buffer.
-	 */
-	if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
-		MemSet((char *) bufBlock, 0, BLCKSZ);
-	else
+	for (int i = 0; i < actual_nblocks; ++i)
 	{
-		instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
+		bool		found;
 
-		smgrread(smgr, forkNum, blockNum, bufBlock);
+		buffers[i] = PrepareReadBuffer(bmr,
+									   forkNum,
+									   blockNum + i,
+									   strategy,
+									   &found);
 
-		pgstat_count_io_op_time(io_object, io_context,
-								IOOP_READ, io_start, 1);
+		if (found)
+		{
+			/*
+			 * Terminate the read as soon as we get a hit.  It could be a
+			 * single buffer hit, or it could be a hit that follows a readable
+			 * range.  We don't want to create more than one readable range,
+			 * so we stop here.
+			 */
+			actual_nblocks = operation->nblocks = *nblocks = i + 1;
+		}
+		else
+		{
+			/* Extend the readable range to cover this block. */
+			operation->io_buffers_len++;
+		}
+	}
 
-		/* check for garbage data */
-		if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
-									PIV_LOG_WARNING | PIV_REPORT_STAT))
+	if (operation->io_buffers_len > 0)
+	{
+		if (flags & READ_BUFFERS_ISSUE_ADVICE)
 		{
-			if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
-			{
-				ereport(WARNING,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s; zeroing out page",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
-				MemSet((char *) bufBlock, 0, BLCKSZ);
-			}
-			else
-				ereport(ERROR,
-						(errcode(ERRCODE_DATA_CORRUPTED),
-						 errmsg("invalid page in block %u of relation %s",
-								blockNum,
-								relpath(smgr->smgr_rlocator, forkNum))));
+			/*
+			 * In theory we should only do this if PrepareReadBuffers() had to
+			 * allocate new buffers above.  That way, if two calls to
+			 * StartReadBuffers() were made for the same blocks before
+			 * WaitReadBuffers(), only the first would issue the advice.
+			 * That'd be a better simulation of true asynchronous I/O, which
+			 * would only start the I/O once, but isn't done here for
+			 * simplicity.  Note also that the following call might actually
+			 * issue two advice calls if we cross a segment boundary; in a
+			 * true asynchronous version we might choose to process only one
+			 * real I/O at a time in that case.
+			 */
+			smgrprefetch(bmr.smgr, forkNum, blockNum, operation->io_buffers_len);
 		}
+
+		/* Indicate that WaitReadBuffers() should be called. */
+		return true;
 	}
+	else
+	{
+		return false;
+	}
+}
 
-	/*
-	 * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer
-	 * content lock before marking the page as valid, to make sure that no
-	 * other backend sees the zeroed page before the caller has had a chance
-	 * to initialize it.
-	 *
-	 * Since no-one else can be looking at the page contents yet, there is no
-	 * difference between an exclusive lock and a cleanup-strength lock. (Note
-	 * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
-	 * they assert that the buffer is already valid.)
-	 */
-	if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
-		!isLocalBuf)
+static inline bool
+WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+	if (BufferIsLocal(buffer))
 	{
-		LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+
+		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
 	}
+	else
+		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
+}
+
+void
+WaitReadBuffers(ReadBuffersOperation *operation)
+{
+	BufferManagerRelation bmr;
+	Buffer	   *buffers;
+	int			nblocks;
+	BlockNumber blocknum;
+	ForkNumber	forknum;
+	bool		isLocalBuf;
+	IOContext	io_context;
+	IOObject	io_object;
+
+	/*
+	 * Currently operations are only allowed to include a read of some range,
+	 * with an optional extra buffer that is already pinned at the end.  So
+	 * nblocks can be at most one more than io_buffers_len.
+	 */
+	Assert((operation->nblocks == operation->io_buffers_len) ||
+		   (operation->nblocks == operation->io_buffers_len + 1));
 
+	/* Find the range of the physical read we need to perform. */
+	nblocks = operation->io_buffers_len;
+	if (nblocks == 0)
+		return;					/* nothing to do */
+
+	buffers = &operation->buffers[0];
+	blocknum = operation->blocknum;
+	forknum = operation->forknum;
+	bmr = operation->bmr;
+
+	isLocalBuf = SmgrIsTemp(bmr.smgr);
 	if (isLocalBuf)
 	{
-		/* Only need to adjust flags */
-		uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
-
-		buf_state |= BM_VALID;
-		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
 	}
 	else
 	{
-		/* Set BM_VALID, terminate IO, and wake up any waiters */
-		TerminateBufferIO(bufHdr, false, BM_VALID, true);
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
 	}
 
-	VacuumPageMiss++;
-	if (VacuumCostActive)
-		VacuumCostBalance += VacuumCostPageMiss;
+	/*
+	 * We count all these blocks as read by this backend.  This is traditional
+	 * behavior, but might turn out to be not true if we find that someone
+	 * else has beaten us and completed the read of some of these blocks.  In
+	 * that case the system globally double-counts, but we traditionally don't
+	 * count this as a "hit", and we don't have a separate counter for "miss,
+	 * but another backend completed the read".
+	 */
+	if (isLocalBuf)
+		pgBufferUsage.local_blks_read += nblocks;
+	else
+		pgBufferUsage.shared_blks_read += nblocks;
 
-	TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-									  smgr->smgr_rlocator.locator.spcOid,
-									  smgr->smgr_rlocator.locator.dbOid,
-									  smgr->smgr_rlocator.locator.relNumber,
-									  smgr->smgr_rlocator.backend,
-									  found);
+	for (int i = 0; i < nblocks; ++i)
+	{
+		int			io_buffers_len;
+		Buffer		io_buffers[MAX_BUFFERS_PER_TRANSFER];
+		void	   *io_pages[MAX_BUFFERS_PER_TRANSFER];
+		instr_time	io_start;
+		BlockNumber io_first_block;
 
-	return BufferDescriptorGetBuffer(bufHdr);
+		/*
+		 * Skip this block if someone else has already completed it.  If an
+		 * I/O is already in progress in another backend, this will wait for
+		 * the outcome: either done, or something went wrong and we will
+		 * retry.
+		 */
+		if (!WaitReadBuffersCanStartIO(buffers[i], false))
+		{
+			/*
+			 * Report this as a 'hit' for this backend, even though it must
+			 * have started out as a miss in PrepareReadBuffer().
+			 */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  true);
+			continue;
+		}
+
+		/* We found a buffer that we need to read in. */
+		io_buffers[0] = buffers[i];
+		io_pages[0] = BufferGetBlock(buffers[i]);
+		io_first_block = blocknum + i;
+		io_buffers_len = 1;
+
+		/*
+		 * How many neighboring-on-disk blocks can we can scatter-read into
+		 * other buffers at the same time?  In this case we don't wait if we
+		 * see an I/O already in progress.  We already hold BM_IO_IN_PROGRESS
+		 * for the head block, so we should get on with that I/O as soon as
+		 * possible.  We'll come back to this block again, above.
+		 */
+		while ((i + 1) < nblocks &&
+			   WaitReadBuffersCanStartIO(buffers[i + 1], true))
+		{
+			/* Must be consecutive block numbers. */
+			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+				   BufferGetBlockNumber(buffers[i]) + 1);
+
+			io_buffers[io_buffers_len] = buffers[++i];
+			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+		}
+
+		io_start = pgstat_prepare_io_time(track_io_timing);
+		smgrreadv(bmr.smgr, forknum, io_first_block, io_pages, io_buffers_len);
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
+								io_buffers_len);
+
+		/* Verify each block we read, and terminate the I/O. */
+		for (int j = 0; j < io_buffers_len; ++j)
+		{
+			BufferDesc *bufHdr;
+			Block		bufBlock;
+
+			if (isLocalBuf)
+			{
+				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
+				bufBlock = LocalBufHdrGetBlock(bufHdr);
+			}
+			else
+			{
+				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+				bufBlock = BufHdrGetBlock(bufHdr);
+			}
+
+			/* check for garbage data */
+			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
+										PIV_LOG_WARNING | PIV_REPORT_STAT))
+			{
+				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s; zeroing out page",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+					memset(bufBlock, 0, BLCKSZ);
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg("invalid page in block %u of relation %s",
+									io_first_block + j,
+									relpath(bmr.smgr->smgr_rlocator, forknum))));
+			}
+
+			/* Terminate I/O and set BM_VALID. */
+			if (isLocalBuf)
+			{
+				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+				buf_state |= BM_VALID;
+				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+			}
+			else
+			{
+				/* Set BM_VALID, terminate IO, and wake up any waiters */
+				TerminateBufferIO(bufHdr, false, BM_VALID, true);
+			}
+
+			/* Report I/Os as completing individually. */
+			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
+											  bmr.smgr->smgr_rlocator.locator.spcOid,
+											  bmr.smgr->smgr_rlocator.locator.dbOid,
+											  bmr.smgr->smgr_rlocator.locator.relNumber,
+											  bmr.smgr->smgr_rlocator.backend,
+											  false);
+		}
+
+		VacuumPageMiss += io_buffers_len;
+		if (VacuumCostActive)
+			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	}
 }
 
 /*
- * BufferAlloc -- subroutine for ReadBuffer.  Handles lookup of a shared
- *		buffer.  If no buffer exists already, selects a replacement
- *		victim and evicts the old page, but does NOT read in new page.
+ * BufferAlloc -- subroutine for StartReadBuffers.  Handles lookup of a shared
+ *		buffer.  If no buffer exists already, selects a replacement victim and
+ *		evicts the old page, but does NOT read in new page.
  *
  * "strategy" can be a buffer replacement strategy object, or NULL for
  * the default strategy.  The selected buffer's usage_count is advanced when
@@ -1223,11 +1475,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  *
  * The returned buffer is pinned and is already marked as holding the
  * desired page.  If it already did have the desired page, *foundPtr is
- * set true.  Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true.  Otherwise, *foundPtr is set false.
  *
  * io_context is passed as an output parameter to avoid calling
  * IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1286,19 +1534,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called StartReadBuffers() but not yet WaitReadBuffers().
 			 */
-			if (StartBufferIO(buf, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return buf;
@@ -1363,19 +1602,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		{
 			/*
 			 * We can only get here if (a) someone else is still reading in
-			 * the page, or (b) a previous read attempt failed.  We have to
-			 * wait for any active read attempt to finish, and then set up our
-			 * own read attempt if the page is still not BM_VALID.
-			 * StartBufferIO does it all.
+			 * the page, (b) a previous read attempt failed, or (c) someone
+			 * called StartReadBuffers() but not yet WaitReadBuffers().
 			 */
-			if (StartBufferIO(existing_buf_hdr, true))
-			{
-				/*
-				 * If we get here, previous attempts to read the buffer must
-				 * have failed ... but we shall bravely try again.
-				 */
-				*foundPtr = false;
-			}
+			*foundPtr = false;
 		}
 
 		return existing_buf_hdr;
@@ -1407,15 +1637,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	LWLockRelease(newPartitionLock);
 
 	/*
-	 * Buffer contents are currently invalid.  Try to obtain the right to
-	 * start I/O.  If StartBufferIO returns false, then someone else managed
-	 * to read it before we did, so there's nothing left for BufferAlloc() to
-	 * do.
+	 * Buffer contents are currently invalid.
 	 */
-	if (StartBufferIO(victim_buf_hdr, true))
-		*foundPtr = false;
-	else
-		*foundPtr = true;
+	*foundPtr = false;
 
 	return victim_buf_hdr;
 }
@@ -1769,7 +1993,7 @@ again:
  * pessimistic, but outside of toy-sized shared_buffers it should allow
  * sufficient pins.
  */
-static void
+void
 LimitAdditionalPins(uint32 *additional_pins)
 {
 	uint32		max_backends;
@@ -2034,7 +2258,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 
 				buf_state &= ~BM_VALID;
 				UnlockBufHdr(existing_hdr, buf_state);
-			} while (!StartBufferIO(existing_hdr, true));
+			} while (!StartBufferIO(existing_hdr, true, false));
 		}
 		else
 		{
@@ -2057,7 +2281,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			LWLockRelease(partition_lock);
 
 			/* XXX: could combine the locked operations in it with the above */
-			StartBufferIO(victim_buf_hdr, true);
+			StartBufferIO(victim_buf_hdr, true, false);
 		}
 	}
 
@@ -2372,7 +2596,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 	else
 	{
 		/*
-		 * If we previously pinned the buffer, it must surely be valid.
+		 * If we previously pinned the buffer, it is likely to be valid, but
+		 * it may not be if StartReadBuffers() was called and
+		 * WaitReadBuffers() hasn't been called yet.  We'll check by loading
+		 * the flags without locking.  This is racy, but it's OK to return
+		 * false spuriously: when WaitReadBuffers() calls StartBufferIO(),
+		 * it'll see that it's now valid.
 		 *
 		 * Note: We deliberately avoid a Valgrind client request here.
 		 * Individual access methods can optionally superimpose buffer page
@@ -2381,7 +2610,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 		 * that the buffer page is legitimately non-accessible here.  We
 		 * cannot meddle with that.
 		 */
-		result = true;
+		result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
 	}
 
 	ref->refcount++;
@@ -3449,7 +3678,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * someone else flushed the buffer before we could, so we need not do
 	 * anything.
 	 */
-	if (!StartBufferIO(buf, false))
+	if (!StartBufferIO(buf, false, false))
 		return;
 
 	/* Setup error traceback support for ereport() */
@@ -5184,9 +5413,15 @@ WaitIO(BufferDesc *buf)
  *
  * Returns true if we successfully marked the buffer as I/O busy,
  * false if someone else already did the work.
+ *
+ * If nowait is true, then we don't wait for an I/O to be finished by another
+ * backend.  In that case, false indicates either that the I/O was already
+ * finished, or is still in progress.  This is useful for callers that want to
+ * find out if they can perform the I/O as part of a larger operation, without
+ * waiting for the answer or distinguishing the reasons why not.
  */
 static bool
-StartBufferIO(BufferDesc *buf, bool forInput)
+StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 {
 	uint32		buf_state;
 
@@ -5199,6 +5434,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
 		UnlockBufHdr(buf, buf_state);
+		if (nowait)
+			return false;
 		WaitIO(buf);
 	}
 
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index fcfac335a57..985a2c7049c 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -108,10 +108,9 @@ PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
  * LocalBufferAlloc -
  *	  Find or create a local buffer for the given page of the given relation.
  *
- * API is similar to bufmgr.c's BufferAlloc, except that we do not need
- * to do any locking since this is all local.   Also, IO_IN_PROGRESS
- * does not get set.  Lastly, we support only default access strategy
- * (hence, usage_count is always advanced).
+ * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
+ * any locking since this is all local.  We support only default access
+ * strategy (hence, usage_count is always advanced).
  */
 BufferDesc *
 LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
@@ -287,7 +286,7 @@ GetLocalVictimBuffer(void)
 }
 
 /* see LimitAdditionalPins() */
-static void
+void
 LimitAdditionalLocalPins(uint32 *additional_pins)
 {
 	uint32		max_pins;
@@ -297,9 +296,10 @@ LimitAdditionalLocalPins(uint32 *additional_pins)
 
 	/*
 	 * In contrast to LimitAdditionalPins() other backends don't play a role
-	 * here. We can allow up to NLocBuffer pins in total.
+	 * here. We can allow up to NLocBuffer pins in total, but it might not be
+	 * initialized yet so read num_temp_buffers.
 	 */
-	max_pins = (NLocBuffer - NLocalPinnedBuffers);
+	max_pins = (num_temp_buffers - NLocalPinnedBuffers);
 
 	if (*additional_pins >= max_pins)
 		*additional_pins = max_pins;
diff --git a/src/backend/storage/meson.build b/src/backend/storage/meson.build
index 40345bdca27..739d13293fb 100644
--- a/src/backend/storage/meson.build
+++ b/src/backend/storage/meson.build
@@ -1,5 +1,6 @@
 # Copyright (c) 2022-2024, PostgreSQL Global Development Group
 
+subdir('aio')
 subdir('buffer')
 subdir('file')
 subdir('freespace')
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index d51d46d3353..b57f71f97e3 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,7 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "port/pg_iovec.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -158,6 +159,11 @@ extern PGDLLIMPORT int32 *LocalRefCount;
 #define BUFFER_LOCK_SHARE		1
 #define BUFFER_LOCK_EXCLUSIVE	2
 
+/*
+ * Maximum number of buffers for multi-buffer I/O functions.  This is set to
+ * allow 128kB transfers, unless BLCKSZ and IOV_MAX imply a a smaller maximum.
+ */
+#define MAX_BUFFERS_PER_TRANSFER Min(PG_IOV_MAX, (128 * 1024) / BLCKSZ)
 
 /*
  * prototypes for functions in bufmgr.c
@@ -177,6 +183,42 @@ extern Buffer ReadBufferWithoutRelcache(RelFileLocator rlocator,
 										ForkNumber forkNum, BlockNumber blockNum,
 										ReadBufferMode mode, BufferAccessStrategy strategy,
 										bool permanent);
+
+#define READ_BUFFERS_ZERO_ON_ERROR 0x01
+#define READ_BUFFERS_ISSUE_ADVICE 0x02
+
+/*
+ * Private state used by StartReadBuffers() and WaitReadBuffers().  Declared
+ * in public header only to allow inclusion in other structs, but contents
+ * should not be accessed.
+ */
+struct ReadBuffersOperation
+{
+	/* Parameters passed in to StartReadBuffers(). */
+	BufferManagerRelation bmr;
+	Buffer	   *buffers;
+	ForkNumber	forknum;
+	BlockNumber blocknum;
+	int			nblocks;
+	BufferAccessStrategy strategy;
+	int			flags;
+
+	/* Range of buffers, if we need to perform a read. */
+	int			io_buffers_len;
+};
+
+typedef struct ReadBuffersOperation ReadBuffersOperation;
+
+extern bool StartReadBuffers(BufferManagerRelation bmr,
+							 Buffer *buffers,
+							 ForkNumber forknum,
+							 BlockNumber blocknum,
+							 int *nblocks,
+							 BufferAccessStrategy strategy,
+							 int flags,
+							 ReadBuffersOperation *operation);
+extern void WaitReadBuffers(ReadBuffersOperation *operation);
+
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
 extern bool BufferIsExclusiveLocked(Buffer buffer);
@@ -250,6 +292,9 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
 
 extern bool BgBufferSync(struct WritebackContext *wb_context);
 
+extern void LimitAdditionalPins(uint32 *additional_pins);
+extern void LimitAdditionalLocalPins(uint32 *additional_pins);
+
 /* in buf_init.c */
 extern void InitBufferPool(void);
 extern Size BufferShmemSize(void);
diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h
new file mode 100644
index 00000000000..c4d3892bb26
--- /dev/null
+++ b/src/include/storage/streaming_read.h
@@ -0,0 +1,52 @@
+#ifndef STREAMING_READ_H
+#define STREAMING_READ_H
+
+#include "storage/bufmgr.h"
+#include "storage/fd.h"
+#include "storage/smgr.h"
+
+/* Default tuning, reasonable for many users. */
+#define PGSR_FLAG_DEFAULT 0x00
+
+/*
+ * I/O streams that are performing maintenance work on behalf of potentially
+ * many users.
+ */
+#define PGSR_FLAG_MAINTENANCE 0x01
+
+/*
+ * We usually avoid issuing prefetch advice automatically when sequential
+ * access is detected, but this flag explicitly disables it, for cases that
+ * might not be correctly detected.  Explicit advice is known to perform worse
+ * than letting the kernel (at least Linux) detect sequential access.
+ */
+#define PGSR_FLAG_SEQUENTIAL 0x02
+
+/*
+ * We usually ramp up from smaller reads to larger ones, to support users who
+ * don't know if it's worth reading lots of buffers yet.  This flag disables
+ * that, declaring ahead of time that we'll be reading all available buffers.
+ */
+#define PGSR_FLAG_FULL 0x04
+
+struct PgStreamingRead;
+typedef struct PgStreamingRead PgStreamingRead;
+
+/* Callback that returns the next block number to read. */
+typedef BlockNumber (*PgStreamingReadBufferCB) (PgStreamingRead *pgsr,
+												void *pgsr_private,
+												void *per_buffer_private);
+
+extern PgStreamingRead *pg_streaming_read_buffer_alloc(int flags,
+													   void *pgsr_private,
+													   size_t per_buffer_private_size,
+													   BufferAccessStrategy strategy,
+													   BufferManagerRelation bmr,
+													   ForkNumber forknum,
+													   PgStreamingReadBufferCB next_block_cb);
+
+extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr);
+extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private);
+extern void pg_streaming_read_free(PgStreamingRead *pgsr);
+
+#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 95ae7845d86..aea8babd71a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2097,6 +2097,8 @@ PgStat_TableCounts
 PgStat_TableStatus
 PgStat_TableXactStatus
 PgStat_WalStats
+PgStreamingRead
+PgStreamingReadRange
 PgXmlErrorContext
 PgXmlStrictness
 Pg_finfo_record
@@ -2267,6 +2269,7 @@ ReInitializeDSMForeignScan_function
 ReScanForeignScan_function
 ReadBufPtrType
 ReadBufferMode
+ReadBuffersOperation
 ReadBytePtrType
 ReadExtraTocPtrType
 ReadFunc
-- 
2.40.1

>From 2e5ed537b9053ff3212177c1732d6afa2100fa0f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Sun, 31 Dec 2023 11:29:02 -0500
Subject: [PATCH v5a 6/7] Vacuum first pass uses Streaming Read interface

Now vacuum's first pass, which HOT prunes and records the TIDs of
non-removable dead tuples, uses the streaming read API by implementing a
streaming read callback which invokes heap_vac_scan_get_next_block().
---
 src/backend/access/heap/vacuumlazy.c | 79 +++++++++++++++++++++-------
 1 file changed, 59 insertions(+), 20 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 65d257aab83..fbbc87938e4 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -54,6 +54,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
+#include "storage/streaming_read.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
@@ -168,7 +169,12 @@ typedef struct LVRelState
 	char	   *relnamespace;
 	char	   *relname;
 	char	   *indname;		/* Current index name */
-	BlockNumber blkno;			/* used only for heap operations */
+
+	/*
+	 * The current block being processed by vacuum. Used only for heap
+	 * operations. Primarily for error reporting and logging.
+	 */
+	BlockNumber blkno;
 	OffsetNumber offnum;		/* used only for heap operations */
 	VacErrPhase phase;
 	bool		verbose;		/* VACUUM VERBOSE? */
@@ -189,6 +195,12 @@ typedef struct LVRelState
 	BlockNumber missed_dead_pages;	/* # pages with missed dead tuples */
 	BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
 
+	/*
+	 * The most recent block submitted in the streaming read callback by the
+	 * first vacuum pass.
+	 */
+	BlockNumber blkno_prefetch;
+
 	/* Statistics output by us, for table */
 	double		new_rel_tuples; /* new estimated total # of tuples */
 	double		new_live_tuples;	/* new estimated total # of live tuples */
@@ -232,7 +244,7 @@ typedef struct LVSavedErrInfo
 
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
-static bool heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
+static void heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 										 BlockNumber *blkno,
 										 bool *all_visible_according_to_vm);
 static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf,
@@ -416,6 +428,9 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	vacrel->nonempty_pages = 0;
 	/* dead_items_alloc allocates vacrel->dead_items later on */
 
+	/* relies on InvalidBlockNumber overflowing to 0 */
+	vacrel->blkno_prefetch = InvalidBlockNumber;
+
 	/* Allocate/initialize output statistics state */
 	vacrel->new_rel_tuples = 0;
 	vacrel->new_live_tuples = 0;
@@ -776,6 +791,22 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 	}
 }
 
+static BlockNumber
+vacuum_scan_pgsr_next(PgStreamingRead *pgsr,
+					  void *pgsr_private, void *per_buffer_data)
+{
+	LVRelState *vacrel = pgsr_private;
+	bool	   *all_visible_according_to_vm = per_buffer_data;
+
+	vacrel->blkno_prefetch++;
+
+	heap_vac_scan_get_next_block(vacrel,
+								 vacrel->blkno_prefetch, &vacrel->blkno_prefetch,
+								 all_visible_according_to_vm);
+
+	return vacrel->blkno_prefetch;
+}
+
 /*
  *	lazy_scan_heap() -- workhorse function for VACUUM
  *
@@ -815,12 +846,11 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 static void
 lazy_scan_heap(LVRelState *vacrel)
 {
+	Buffer		buf;
 	BlockNumber rel_pages = vacrel->rel_pages,
 				next_fsm_block_to_vacuum = 0;
-	bool		all_visible_according_to_vm;
+	bool	   *all_visible_according_to_vm;
 
-	/* relies on InvalidBlockNumber overflowing to 0 */
-	BlockNumber blkno = InvalidBlockNumber;
 	VacDeadItems *dead_items = vacrel->dead_items;
 	const int	initprog_index[] = {
 		PROGRESS_VACUUM_PHASE,
@@ -828,6 +858,11 @@ lazy_scan_heap(LVRelState *vacrel)
 		PROGRESS_VACUUM_MAX_DEAD_TUPLES
 	};
 	int64		initprog_val[3];
+	PgStreamingRead *pgsr;
+
+	pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_MAINTENANCE, vacrel,
+										  sizeof(bool), vacrel->bstrategy, BMR_REL(vacrel->rel),
+										  MAIN_FORKNUM, vacuum_scan_pgsr_next);
 
 	/* Report that we're scanning the heap, advertising total # of blocks */
 	initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
@@ -838,13 +873,19 @@ lazy_scan_heap(LVRelState *vacrel)
 	vacrel->skip.next_unskippable_block = InvalidBlockNumber;
 	vacrel->skip.vmbuffer = InvalidBuffer;
 
-	while (heap_vac_scan_get_next_block(vacrel, blkno + 1,
-										&blkno, &all_visible_according_to_vm))
+	while (BufferIsValid(buf =
+						 pg_streaming_read_buffer_get_next(pgsr, (void **) &all_visible_according_to_vm)))
 	{
-		Buffer		buf;
 		Page		page;
 		bool		has_lpdead_items;
 		bool		got_cleanup_lock = false;
+		BlockNumber blkno;
+
+		vacrel->blkno = blkno = BufferGetBlockNumber(buf);
+
+		CheckBufferIsPinnedOnce(buf);
+
+		page = BufferGetPage(buf);
 
 		vacrel->scanned_pages++;
 
@@ -912,9 +953,6 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		visibilitymap_pin(vacrel->rel, blkno, &vacrel->skip.vmbuffer);
 
-		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-								 vacrel->bstrategy);
-		page = BufferGetPage(buf);
 
 		/*
 		 * We need a buffer cleanup lock to prune HOT chains and defragment
@@ -970,7 +1008,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		if (got_cleanup_lock)
 			lazy_scan_prune(vacrel, buf, blkno, page,
-							all_visible_according_to_vm,
+							*all_visible_according_to_vm,
 							&has_lpdead_items);
 
 		/*
@@ -1027,7 +1065,7 @@ lazy_scan_heap(LVRelState *vacrel)
 	}
 
 	/* report that everything is now scanned */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, vacrel->rel_pages);
 
 	/* now we can compute the new value for pg_class.reltuples */
 	vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
@@ -1042,6 +1080,8 @@ lazy_scan_heap(LVRelState *vacrel)
 		Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples +
 		vacrel->missed_dead_tuples;
 
+	pg_streaming_read_free(pgsr);
+
 	/*
 	 * Do index vacuuming (call each index's ambulkdelete routine), then do
 	 * related heap vacuuming
@@ -1053,11 +1093,11 @@ lazy_scan_heap(LVRelState *vacrel)
 	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
 	 * not there were indexes, and whether or not we bypassed index vacuuming.
 	 */
-	if (blkno > next_fsm_block_to_vacuum)
-		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, blkno);
+	if (vacrel->rel_pages > next_fsm_block_to_vacuum)
+		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, vacrel->rel_pages);
 
 	/* report all blocks vacuumed */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, vacrel->rel_pages);
 
 	/* Do final index cleanup (call each index's amvacuumcleanup routine) */
 	if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
@@ -1090,7 +1130,7 @@ lazy_scan_heap(LVRelState *vacrel)
  *
  * The block number and visibility status of the next block to process are set
  * in blkno and all_visible_according_to_vm. heap_vac_scan_get_next_block()
- * returns false if there are no further blocks to process.
+ * sets blkno to InvalidBlockNumber if there are no further blocks to process.
  *
  * vacrel is an in/out parameter here; vacuum options and information about the
  * relation are read and vacrel->skippedallvis is set to ensure we don't
@@ -1110,7 +1150,7 @@ lazy_scan_heap(LVRelState *vacrel)
  * older XIDs/MXIDs.  The vacrel->skippedallvis flag will be set here when the
  * choice to skip such a range is actually made, making everything safe.)
  */
-static bool
+static void
 heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 							 BlockNumber *blkno, bool *all_visible_according_to_vm)
 {
@@ -1119,7 +1159,7 @@ heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 	if (next_block >= vacrel->rel_pages)
 	{
 		*blkno = InvalidBlockNumber;
-		return false;
+		return;
 	}
 
 	if (vacrel->skip.next_unskippable_block == InvalidBlockNumber ||
@@ -1214,7 +1254,6 @@ heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
 		*all_visible_according_to_vm = true;
 
 	*blkno = next_block;
-	return true;
 }
 
 /*
-- 
2.40.1

>From 84a5907b486d1f6c2ffe029a2e28fc557065739f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Tue, 27 Feb 2024 14:35:36 -0500
Subject: [PATCH v5a 7/7] Vacuum second pass uses Streaming Read interface

Now vacuum's second pass, which removes dead items referring to dead
tuples catalogued in the first pass, uses the streaming read API by
implementing a streaming read callback which returns the next block
containing previously catalogued dead items. A new struct,
VacReapBlkState, is introduced to provide the caller with the starting
and ending indexes of dead items to vacuum.

ci-os-only:
---
 src/backend/access/heap/vacuumlazy.c | 110 ++++++++++++++++++++-------
 src/tools/pgindent/typedefs.list     |   1 +
 2 files changed, 85 insertions(+), 26 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index fbbc87938e4..68c146984b1 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -201,6 +201,12 @@ typedef struct LVRelState
 	 */
 	BlockNumber blkno_prefetch;
 
+	/*
+	 * The index of the next TID in dead_items to reap during the second
+	 * vacuum pass.
+	 */
+	int			idx_prefetch;
+
 	/* Statistics output by us, for table */
 	double		new_rel_tuples; /* new estimated total # of tuples */
 	double		new_live_tuples;	/* new estimated total # of live tuples */
@@ -242,6 +248,21 @@ typedef struct LVSavedErrInfo
 	VacErrPhase phase;
 } LVSavedErrInfo;
 
+/*
+ * State set up in streaming read callback during vacuum's second pass which
+ * removes dead items referring to dead tuples cataloged in the first pass
+ */
+typedef struct VacReapBlkState
+{
+	/*
+	 * The indexes of the TIDs of the first and last dead tuples in a single
+	 * block in the currently vacuumed relation. The callback will set these
+	 * up prior to adding this block to the stream.
+	 */
+	int			start_idx;
+	int			end_idx;
+} VacReapBlkState;
+
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
 static void heap_vac_scan_get_next_block(LVRelState *vacrel, BlockNumber next_block,
@@ -260,8 +281,9 @@ static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf,
 static void lazy_vacuum(LVRelState *vacrel);
 static bool lazy_vacuum_all_indexes(LVRelState *vacrel);
 static void lazy_vacuum_heap_rel(LVRelState *vacrel);
-static int	lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
-								  Buffer buffer, int index, Buffer vmbuffer);
+static void lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
+								  Buffer buffer, Buffer vmbuffer,
+								  VacReapBlkState *rbstate);
 static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
 static void lazy_cleanup_all_indexes(LVRelState *vacrel);
 static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
@@ -2401,6 +2423,37 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	return allindexes;
 }
 
+static BlockNumber
+vacuum_reap_lp_pgsr_next(PgStreamingRead *pgsr,
+						 void *pgsr_private,
+						 void *per_buffer_data)
+{
+	BlockNumber blkno;
+	LVRelState *vacrel = pgsr_private;
+	VacReapBlkState *rbstate = per_buffer_data;
+
+	VacDeadItems *dead_items = vacrel->dead_items;
+
+	if (vacrel->idx_prefetch == dead_items->num_items)
+		return InvalidBlockNumber;
+
+	blkno = ItemPointerGetBlockNumber(&dead_items->items[vacrel->idx_prefetch]);
+	rbstate->start_idx = vacrel->idx_prefetch;
+
+	for (; vacrel->idx_prefetch < dead_items->num_items; vacrel->idx_prefetch++)
+	{
+		BlockNumber curblkno =
+			ItemPointerGetBlockNumber(&dead_items->items[vacrel->idx_prefetch]);
+
+		if (blkno != curblkno)
+			break;				/* past end of tuples for this block */
+	}
+
+	rbstate->end_idx = vacrel->idx_prefetch;
+
+	return blkno;
+}
+
 /*
  *	lazy_vacuum_heap_rel() -- second pass over the heap for two pass strategy
  *
@@ -2422,7 +2475,9 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 static void
 lazy_vacuum_heap_rel(LVRelState *vacrel)
 {
-	int			index = 0;
+	Buffer		buf;
+	PgStreamingRead *pgsr;
+	VacReapBlkState *rbstate;
 	BlockNumber vacuumed_pages = 0;
 	Buffer		vmbuffer = InvalidBuffer;
 	LVSavedErrInfo saved_err_info;
@@ -2440,17 +2495,21 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 							 VACUUM_ERRCB_PHASE_VACUUM_HEAP,
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
-	while (index < vacrel->dead_items->num_items)
+	pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_MAINTENANCE, vacrel,
+										  sizeof(VacReapBlkState), vacrel->bstrategy, BMR_REL(vacrel->rel),
+										  MAIN_FORKNUM, vacuum_reap_lp_pgsr_next);
+
+	while (BufferIsValid(buf =
+						 pg_streaming_read_buffer_get_next(pgsr,
+														   (void **) &rbstate)))
 	{
 		BlockNumber blkno;
-		Buffer		buf;
 		Page		page;
 		Size		freespace;
 
 		vacuum_delay_point();
 
-		blkno = ItemPointerGetBlockNumber(&vacrel->dead_items->items[index]);
-		vacrel->blkno = blkno;
+		vacrel->blkno = blkno = BufferGetBlockNumber(buf);
 
 		/*
 		 * Pin the visibility map page in case we need to mark the page
@@ -2460,10 +2519,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		visibilitymap_pin(vacrel->rel, blkno, &vmbuffer);
 
 		/* We need a non-cleanup exclusive lock to mark dead_items unused */
-		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-								 vacrel->bstrategy);
 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-		index = lazy_vacuum_heap_page(vacrel, blkno, buf, index, vmbuffer);
+		lazy_vacuum_heap_page(vacrel, blkno, buf, vmbuffer, rbstate);
 
 		/* Now that we've vacuumed the page, record its available space */
 		page = BufferGetPage(buf);
@@ -2482,14 +2539,16 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 	 * We set all LP_DEAD items from the first heap pass to LP_UNUSED during
 	 * the second heap pass.  No more, no less.
 	 */
-	Assert(index > 0);
+	Assert(rbstate->end_idx > 0);
 	Assert(vacrel->num_index_scans > 1 ||
-		   (index == vacrel->lpdead_items &&
+		   (rbstate->end_idx == vacrel->lpdead_items &&
 			vacuumed_pages == vacrel->lpdead_item_pages));
 
+	pg_streaming_read_free(pgsr);
+
 	ereport(DEBUG2,
 			(errmsg("table \"%s\": removed %lld dead item identifiers in %u pages",
-					vacrel->relname, (long long) index, vacuumed_pages)));
+					vacrel->relname, (long long) rbstate->end_idx, vacuumed_pages)));
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
@@ -2503,13 +2562,12 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
  * cleanup lock is also acceptable).  vmbuffer must be valid and already have
  * a pin on blkno's visibility map page.
  *
- * index is an offset into the vacrel->dead_items array for the first listed
- * LP_DEAD item on the page.  The return value is the first index immediately
- * after all LP_DEAD items for the same page in the array.
+ * Given a block and dead items recorded during the first pass, set those items
+ * dead and truncate the line pointer array. Update the VM as appropriate.
  */
-static int
-lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
-					  int index, Buffer vmbuffer)
+static void
+lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
+					  Buffer buffer, Buffer vmbuffer, VacReapBlkState *rbstate)
 {
 	VacDeadItems *dead_items = vacrel->dead_items;
 	Page		page = BufferGetPage(buffer);
@@ -2530,16 +2588,17 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 
 	START_CRIT_SECTION();
 
-	for (; index < dead_items->num_items; index++)
+	for (int i = rbstate->start_idx; i < rbstate->end_idx; i++)
 	{
-		BlockNumber tblk;
 		OffsetNumber toff;
+		ItemPointer dead_item;
 		ItemId		itemid;
 
-		tblk = ItemPointerGetBlockNumber(&dead_items->items[index]);
-		if (tblk != blkno)
-			break;				/* past end of tuples for this block */
-		toff = ItemPointerGetOffsetNumber(&dead_items->items[index]);
+		dead_item = &dead_items->items[i];
+
+		Assert(ItemPointerGetBlockNumber(dead_item) == blkno);
+
+		toff = ItemPointerGetOffsetNumber(dead_item);
 		itemid = PageGetItemId(page, toff);
 
 		Assert(ItemIdIsDead(itemid) && !ItemIdHasStorage(itemid));
@@ -2609,7 +2668,6 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
 
 	/* Revert to the previous phase information for error traceback */
 	restore_vacuum_error_info(vacrel, &saved_err_info);
-	return index;
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index aea8babd71a..a8f0b5f091d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2972,6 +2972,7 @@ VacOptValue
 VacuumParams
 VacuumRelation
 VacuumStmt
+VacReapBlkState
 ValidIOData
 ValidateIndexState
 ValuesScan
-- 
2.40.1

Reply via email to