Here is a new straw-man patch set.  I'd already shown the basic
techniques for vectored writes from the buffer pool (FlushBuffers(),
note the "s"), but that was sort of kludged into place while I was
hacking on the lower level bits and pieces, and now I'm building
layers further up.  The main idea is: you can clean buffers with a
"WriteStream", and here are a bunch of example users to show that
working.

A WriteStream is approximately the opposite of a ReadStream (committed
in v17).  You push pinned dirty buffers into it (well they don't have
to be dirty, and it's OK if someone else cleans the buffer
concurrently, the point is that you recently dirtied them).  It
combines buffers up to io_combine_limit, but defers writing as long as
possible within some limits to avoid flushing the WAL, and tries to
coordinate with the WAL writer.  The WAL writer interaction is a very
tricky problem, and that aspect is only a toy for now, but it's at
least partially successful (see problems at end).

The CHECKPOINT code uses the WriteStream API directly.  It creates one
stream per tablespace, so that the existing load balancing algorithm
doesn't defeat the I/O combining algorithm.  Unsurprisingly, it looks
like this:

postgres=# checkpoint;

...
pwritev(18,...,2,0x1499e000) = 131072 (0x20000)
pwrite(18,...,131072,0x149be000) = 131072 (0x20000)
pwrite(18,...,131072,0x149de000) = 131072 (0x20000)
...

Sometimes you'll see it signalling the WAL writer.  It builds up a
queue of writes that it doesn't want to perform yet, in the hope of
getting a free ride WRT WAL.

Other places can benefit from a more centrally placed write stream,
indirectly.  Our BAS_BULKWRITE and BAS_VACUUM buffer access strategies
already perform "write-behind".  That's a name I borrowed from some OS
stuff, where the kernel has clues that bulk data (for example a big
file copy) will not likely be needed again soon so you want to get it
out of the way soon before it trashes your whole buffer pool (AKA
"scan resistance"), but you want to defer just a little bit to perform
I/O combining.  That applies directly here, but we have the additional
concern of delaying the referenced WAL write in the hope that someone
else will do it for us.

In this experiment, I am trying to give that pre-existing behaviour an
explicit name (better names welcome!), and optimise it.  If you're
dirtying buffers in a ring, you'll soon crash into your own tail and
have to write it out, and it is very often sequential blocks due to
the scan-like nature of many bulk I/O jobs, so I/O combining is very
effective.  The main problem is that you'll often have to flush WAL
first, which this patch set tries to address to some extent.  In the
strategy write-behind case you don't really need a LSN reordering
queue, just a plain FIFO queue would do, but hopefully that doesn't
cost much.  (Cf CHECKPOINT, which sorts blocks by buffer tag, but
expects LSNs in random order, so it does seem to need reordering.)

With this patch set, instead of calling ReleaseBuffer() after you've
dirtied a buffer in one of those bulk writing code paths, you can use
StrategyReleaseBuffer(), and the strategy will fire it into the stream
to get I/O combining and LSN reordering; it'll be unpinned later, and
certainly before you get the same buffer back for a new block.  So
those write-behind user patches are very short, they just do
s/ReleaseBuffer/StrategyReleaseBuffer/ plus minor details.
Unsurprisingly, it looks like this:

postgres=# copy t from program 'seq -f %1.0f 1 10000000';

...
pwrite(44,...,131072,0x2f986000) = 131072 (0x20000) <-- streaming write-behind!
pwrite(44,...,131072,0x2f966000) = 131072 (0x20000)
pwrite(44,...,131072,0x2f946000) = 131072 (0x20000)
...

postgres=# vacuum t;

...
pwrite(35,...,131072,0x3fb3e000) = 131072 (0x20000) <-- streaming write-behind!
preadv(35,...,122880}],2,0x3fb7a000) = 131072 (0x20000) <-- from Melanie's patch
pwritev(35,...,2,0x3fb5e000) = 131072 (0x20000)
pread(35,...,131072,0x3fb9a000) = 131072 (0x20000)
...

Next I considered how to get INSERT, UPDATE, DELETE to participate.
The problem is that they use BAS_BULKREAD, even though they might
dirty buffers.  In master, BAS_BULKREAD doesn't do write-behind,
instead it uses the "reject" mechanism: as soon as it smells a dirty
buffer, it escapes the ring and abandons all hope of scan resistance.
As buffer/README says in parentheses:

  Bulk writes work similarly to VACUUM.  Currently this applies only to
  COPY IN and CREATE TABLE AS SELECT.  (Might it be interesting to make
  seqscan UPDATE and DELETE use the bulkwrite strategy?)  For bulk writes
  we use a ring size of 16MB (but not more than 1/8th of shared_buffers).

Hmm... what I'm now thinking is that the distinction might be a little
bogus.  Who knows how much scanned data will finish up being dirtied?
I wonder if it would make more sense to abandon
BAS_BULKREAD/BAS_BULKWRITE, and instead make an adaptive strategy.  A
ring that starts small, and grows/shrinks in response to dirty data
(instead of "rejecting").  That would have at least superficial
similarities to the ARC algorithm, the "adaptive" bit that controls
ring size (it's interested in recency vs frequency, but here it's more
like "we're willing to waste more memory on dirty data, because we
need to keep it around longer, to avoid flushing the WAL, but not
longer than that" which may be a different dimension to value cached
data on, I'm not sure).

Of course there must be some workloads/machines where using a strategy
(instead of BAS_BULKREAD when it degrades to BAS_NORMAL behaviour)
will be slower because of WAL flushes, but that's not a fair fight:
the flip side of that coin is that you've trashed the buffer pool,
which is an external cost paid by someone else, ie it's anti-social,
BufferAccessStrategy's very raison d'ĂȘtre.

Anyway, in the meantime, I hacked heapam.c to use BAS_BULKWRITE just
to see how it would work with this patch set.  (This causes an
assertion to fail in some test, something about the stats for
different IO contexts that was upset by IOCONTEXT_BULKWRITE, which I
didn't bother to debug, it's only a demo hack.)  Unsurprisingly, it
looks like this:

postgres=# delete from t;

...
pread(25,...,131072,0xc89e000) = 131072 (0x20000)   <-- already committed
pread(25,...,131072,0xc8be000) = 131072 (0x20000)       read-stream behaviour
kill(75954,SIGURG)             = 0 (0x0)            <-- hey WAL writer!
pread(25,...,131072,0xc8de000) = 131072 (0x20000)
pread(25,...,131072,0xc8fe000) = 131072 (0x20000)
...
pwrite(25,...,131072,0x15200000) = 131072 (0x20000) <-- write-behind!
pwrite(25,...,131072,0x151e0000) = 131072 (0x20000)
pwrite(25,...,131072,0x151c0000) = 131072 (0x20000)
...

UPDATE and INSERT conceptually work too, but they suffer from other
stupid page-at-a-time problems around extension so it's more fun to
look at DELETE first.

The whole write-behind notion, and the realisation that we already
have it and should just make it into a "real thing", jumped out at me
while studying Melanie's VACUUM pass 1 and VACUUM pass 2 patches for
adding read streams.  Rebased and attached here.  That required
hacking on the new tidstore.c stuff a bit.  (We failed to get the
VACUUM read stream bits into v17, but the study of that led to the
default BAS_VACUUM size being cranked up to reflect modern realities,
and generally sent me down this rabbit hole for a while.)

Some problems:
* If you wake the WAL writer more often, throughput might actually go
down on high latency storage due to serialisation of WAL flushes.  So
far I have declined to try to write an adaptive algorithm to figure
out whether to do it, and where the threshold should be.  I suspect it
might involve measuring time and hill-climbing...  One option is to
abandon this part (ie just do no worse than master at WAL flushing),
or at least consider that a separate project.
* This might hold too many pins!  It does respect the limit mechanism,
but that can let you have a lot of pins (it's a bit TOCTOU-racy too,
we might need something smarter).  One idea would be to release pins
while writes are in the LSN queue, and reacquire them with
ReadRecentBuffer() as required, since we don't really care if someone
else evicts them in the meantime.
* It seems a bit weird that we *also* have the WritebackContext
machinery.  I could probably subsume that whole mechanism into
write_stream.c.  If you squint, sync_file_range() is a sort of dual of
POSIX_FADV_WILLNEED, which the read counterpart looks after.
* I would like to merge Heikki's bulk write stuff into this somehow,
not yet thought about it much.

The patches are POC-quality only and certainly have bugs/missed edge
cases/etc.  Thoughts, better ideas, references to writing about this
problem space, etc, welcome.
From 63cb8f88fd65ef34536c7d4360b964ca5e6cf62d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 25 Apr 2024 23:45:48 +1200
Subject: [PATCH v2 01/11] Teach WritebackContext to work with block ranges.

Instead of having to feed it one block at a time, allow writeback of
ranges to scheduled, in preparation for I/O combining.
---
 src/backend/storage/buffer/bufmgr.c | 28 ++++++++++++++++------------
 src/include/storage/buf_internals.h |  4 +++-
 2 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 49637284f91..d7e434daaf1 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1992,7 +1992,7 @@ again:
 		LWLockRelease(content_lock);
 
 		ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
-									  &buf_hdr->tag);
+									  &buf_hdr->tag, 1);
 	}
 
 
@@ -3486,7 +3486,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	 * SyncOneBuffer() is only called by checkpointer and bgwriter, so
 	 * IOContext will always be IOCONTEXT_NORMAL.
 	 */
-	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag, 1);
 
 	return result | BUF_WRITTEN;
 }
@@ -5836,11 +5836,11 @@ WritebackContextInit(WritebackContext *context, int *max_pending)
 }
 
 /*
- * Add buffer to list of pending writeback requests.
+ * Add buffer tag range to list of pending writeback requests.
  */
 void
 ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context,
-							  BufferTag *tag)
+							  BufferTag *tag, int nblocks)
 {
 	PendingWriteback *pending;
 
@@ -5858,6 +5858,7 @@ ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context
 		pending = &wb_context->pending_writebacks[wb_context->nr_pending++];
 
 		pending->tag = *tag;
+		pending->nblocks = nblocks;
 	}
 
 	/*
@@ -5914,10 +5915,11 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 		int			ahead;
 		BufferTag	tag;
 		RelFileLocator currlocator;
-		Size		nblocks = 1;
+		Size		nblocks;
 
 		cur = &wb_context->pending_writebacks[i];
 		tag = cur->tag;
+		nblocks = cur->nblocks;
 		currlocator = BufTagGetRelFileLocator(&tag);
 
 		/*
@@ -5926,6 +5928,8 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 		 */
 		for (ahead = 0; i + ahead + 1 < wb_context->nr_pending; ahead++)
 		{
+			BlockNumber this_end;
+			BlockNumber next_end;
 
 			next = &wb_context->pending_writebacks[i + ahead + 1];
 
@@ -5935,15 +5939,15 @@ IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context)
 				BufTagGetForkNum(&cur->tag) != BufTagGetForkNum(&next->tag))
 				break;
 
-			/* ok, block queued twice, skip */
-			if (cur->tag.blockNum == next->tag.blockNum)
-				continue;
-
-			/* only merge consecutive writes */
-			if (cur->tag.blockNum + 1 != next->tag.blockNum)
+			/* only merge consecutive or overlapping writes */
+			if (next->tag.blockNum > tag.blockNum + nblocks)
 				break;
 
-			nblocks++;
+			/* find the nblocks value that covers the end of both */
+			this_end = tag.blockNum + nblocks;
+			next_end = next->tag.blockNum + next->nblocks;
+			nblocks = Max(this_end, next_end) - tag.blockNum;
+
 			cur = next;
 		}
 
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index f190e6e5e46..9cc22893499 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -291,6 +291,7 @@ typedef struct PendingWriteback
 {
 	/* could store different types of pending flushes here */
 	BufferTag	tag;
+	int			nblocks;
 } PendingWriteback;
 
 /* struct forward declared in bufmgr.h */
@@ -417,7 +418,8 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
 extern void WritebackContextInit(WritebackContext *context, int *max_pending);
 extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context);
 extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
-										  IOContext io_context, BufferTag *tag);
+										  IOContext io_context, BufferTag *tag,
+										  int nblocks);
 
 /* freelist.c */
 extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
-- 
2.44.0

From 514f17a2713f135dd513d8ff16cc2604c5005799 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Wed, 13 Mar 2024 17:02:42 +1300
Subject: [PATCH v2 02/11] Provide vectored variant of FlushOneBuffer().

FlushPinnedBuffers() is just like FlushOneBuffer() except that it tries
to write out multiple consecutive disk blocks at a time with
smgrwritev().
---
 src/backend/storage/buffer/bufmgr.c | 339 ++++++++++++++++++++++------
 src/include/storage/buf_internals.h |  10 +
 src/include/storage/bufmgr.h        |   3 +
 3 files changed, 287 insertions(+), 65 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index d7e434daaf1..f7ada9dcc7e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -57,6 +57,7 @@
 #include "storage/smgr.h"
 #include "storage/standby.h"
 #include "utils/memdebug.h"
+#include "utils/memutils.h"
 #include "utils/ps_status.h"
 #include "utils/rel.h"
 #include "utils/resowner.h"
@@ -526,6 +527,9 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
+static int	FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
+						 IOObject io_object, IOContext io_context,
+						 int *first_written_index, int *written);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
 									   BlockNumber nForkBlock,
@@ -2841,6 +2845,72 @@ UnpinBufferNoOwner(BufferDesc *buf)
 #define ST_DEFINE
 #include <lib/sort_template.h>
 
+/*
+ * Flush a range of already pinned buffers that hold consecutive blocks of a
+ * relation fork.  They are not pinned on return.  Returns the number that
+ * were written out (if this is less than nbuffers, it is because another
+ * backend already wrote some out).
+ */
+static int
+SyncBuffers(BufferDesc **bufs, int nbuffers,
+			WritebackContext *wb_context)
+{
+	int			total_written = 0;
+
+	while (nbuffers > 0)
+	{
+		int			nlocked;
+
+		/* Lock first buffer. */
+		LWLockAcquire(BufferDescriptorGetContentLock(bufs[0]), LW_SHARED);
+		nlocked = 1;
+
+		/* Lock as many more as we can without waiting, to avoid deadlocks. */
+		while (nlocked < nbuffers &&
+			   LWLockConditionalAcquire(BufferDescriptorGetContentLock(bufs[nlocked]),
+										LW_SHARED))
+			nlocked++;
+
+		while (nlocked > 0)
+		{
+			int			flushed;
+			int			written;
+			int			first_written_index;
+
+			/*
+			 * Flush as many as we can with a single write, which may be fewer
+			 * than requested if buffers in this range turn out to have been
+			 * flushed already, creating gaps between flushable block ranges.
+			 */
+			flushed = FlushBuffers(bufs, nlocked, NULL, IOOBJECT_RELATION,
+								   IOCONTEXT_NORMAL,
+								   &first_written_index, &written);
+			total_written += written;
+
+			/* Unlock in reverse order (currently more efficient). */
+			for (int i = flushed - 1; i >= 0; --i)
+				LWLockRelease(BufferDescriptorGetContentLock(bufs[i]));
+
+			/* Queue writeback control. */
+			if (written > 0 && wb_context)
+				ScheduleBufferTagForWriteback(wb_context,
+											  IOCONTEXT_NORMAL,
+											  &bufs[first_written_index]->tag,
+											  written);
+
+			/* Unpin. */
+			for (int i = 0; i < flushed; ++i)
+				UnpinBuffer(bufs[i]);
+
+			bufs += flushed;
+			nlocked -= flushed;
+			nbuffers -= flushed;
+		}
+	}
+
+	return total_written;
+}
+
 /*
  * BufferSync -- Write out all dirty buffers in the pool.
  *
@@ -3704,9 +3774,19 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
 	*blknum = bufHdr->tag.blockNum;
 }
 
+struct shared_buffer_write_error_info
+{
+	BufferDesc *buf;
+	int			nblocks;
+};
+
 /*
- * FlushBuffer
- *		Physically write out a shared buffer.
+ * FlushBuffers
+ *		Physically write out shared buffers.
+ *
+ * The buffers do not have to be consecutive in memory but must refer to
+ * consecutive blocks of the same relation fork in increasing order of block
+ * number.
  *
  * NOTE: this actually just passes the buffer contents to the kernel; the
  * real write to disk won't happen until the kernel feels like it.  This
@@ -3714,66 +3794,156 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
  * However, we will need to force the changes to disk via fsync before
  * we can checkpoint WAL.
  *
- * The caller must hold a pin on the buffer and have share-locked the
+ * The caller must hold a pin on the buffers and have share-locked the
  * buffer contents.  (Note: a share-lock does not prevent updates of
  * hint bits in the buffer, so the page could change while the write
  * is in progress, but we assume that that will not invalidate the data
  * written.)
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
- * as the second parameter.  If not, pass NULL.
+ * as the third parameter.  If not, pass NULL.
  */
-static void
-FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
-			IOContext io_context)
+static int
+FlushBuffers(BufferDesc **bufs, int nbuffers, SMgrRelation reln,
+			 IOObject io_object, IOContext io_context,
+			 int *first_written_index, int *written)
 {
-	XLogRecPtr	recptr;
+	XLogRecPtr	max_recptr;
+	struct shared_buffer_write_error_info errinfo;
 	ErrorContextCallback errcallback;
 	instr_time	io_start;
-	Block		bufBlock;
-	char	   *bufToWrite;
-	uint32		buf_state;
+	int			first_start_io_index;
+	void	   *bufBlocks[MAX_IO_COMBINE_LIMIT];
+	bool		need_checksums = DataChecksumsEnabled();
+	static PGIOAlignedBlock *copies;
+
+	Assert(nbuffers > 0);
+	nbuffers = Min(nbuffers, io_combine_limit);
 
 	/*
-	 * Try to start an I/O operation.  If StartBufferIO returns false, then
-	 * someone else flushed the buffer before we could, so we need not do
-	 * anything.
+	 * Update page checksums if desired.  Since we have only shared lock on
+	 * the buffer, other processes might be updating hint bits in it, so we
+	 * must copy the page to private storage if we do checksumming.
+	 *
+	 * XXX:TODO kill static local memory, or better yet, kill unlocked hint
+	 * bit modifications so we don't need this
 	 */
-	if (!StartBufferIO(buf, false, false))
-		return;
+	if (need_checksums)
+	{
+		if (!copies)
+			copies = MemoryContextAllocAligned(TopMemoryContext,
+											   BLCKSZ * io_combine_limit,
+											   PG_IO_ALIGN_SIZE,
+											   0);
+		for (int i = 0; i < nbuffers; ++i)
+		{
+			memcpy(&copies[i], BufHdrGetBlock(bufs[i]), BLCKSZ);
+			PageSetChecksumInplace((Page) &copies[i],
+								   bufs[0]->tag.blockNum + i);
+		}
+	}
+
+	/*
+	 * Try to start an I/O operation on as many buffers as we can.  If
+	 * StartBufferIO returns false, then someone else flushed the buffer
+	 * before we could, so we need not do anything any we give up on any
+	 * remaining buffers, as they are not consecutive with the blocks we've
+	 * collected.
+	 */
+	first_start_io_index = -1;
+	for (int i = 0; i < nbuffers; ++i)
+	{
+		/* Must be consecutive blocks. */
+		if (i > 0)
+			Assert(BufferTagsConsecutive(&bufs[i - 1]->tag, &bufs[i]->tag));
+
+		/* Only wait for the first one, so we can guarantee progress. */
+		if (!StartBufferIO(bufs[i], false, i > 0))
+		{
+			if (first_start_io_index >= 0)
+			{
+				/*
+				 * We can't go any further because later blocks after this gap
+				 * would not be consecutive.  Cap nbuffers here.
+				 */
+				nbuffers = i;
+				break;
+			}
+			else
+			{
+				/*
+				 * Still searching for the first block we can win the right to
+				 * start I/O on, so it doesn't matter that we couldn't get
+				 * this one.
+				 */
+			}
+		}
+		else
+		{
+			/* Keep track of the first block we started I/O on. */
+			if (first_start_io_index < 0)
+				first_start_io_index = i;
+		}
+		/* Collect the source buffers (copies or shared buffers). */
+		bufBlocks[i] = need_checksums ? &copies[i] : BufHdrGetBlock(bufs[i]);
+	}
+
+	/* If we can't write even one buffer, then we're done. */
+	if (first_start_io_index < 0)
+	{
+		if (first_written_index)
+			*first_written_index = 0;
+		if (written)
+			*written = 0;
+		return nbuffers;
+	}
 
 	/* Setup error traceback support for ereport() */
+	errinfo.buf = bufs[first_start_io_index];
+	errinfo.nblocks = nbuffers;
 	errcallback.callback = shared_buffer_write_error_callback;
-	errcallback.arg = (void *) buf;
+	errcallback.arg = &errinfo;
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
 	/* Find smgr relation for buffer */
 	if (reln == NULL)
-		reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER);
+		reln = smgropen(BufTagGetRelFileLocator(&bufs[first_start_io_index]->tag),
+						INVALID_PROC_NUMBER);
 
-	TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag),
-										buf->tag.blockNum,
+	TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+										bufs[first_start_io_index]->tag.blockNum,
 										reln->smgr_rlocator.locator.spcOid,
 										reln->smgr_rlocator.locator.dbOid,
 										reln->smgr_rlocator.locator.relNumber);
 
-	buf_state = LockBufHdr(buf);
+	/* Find the highest LSN across all the I/O-started buffers. */
+	max_recptr = 0;
+	for (int i = first_start_io_index; i < nbuffers; ++i)
+	{
+		XLogRecPtr	page_recptr;
+		uint32		buf_state;
 
-	/*
-	 * Run PageGetLSN while holding header lock, since we don't have the
-	 * buffer locked exclusively in all cases.
-	 */
-	recptr = BufferGetLSN(buf);
+		buf_state = LockBufHdr(bufs[i]);
 
-	/* To check if block content changes while flushing. - vadim 01/17/97 */
-	buf_state &= ~BM_JUST_DIRTIED;
-	UnlockBufHdr(buf, buf_state);
+		/*
+		 * Run PageGetLSN while holding header lock, since we don't have the
+		 * buffer locked exclusively in all cases.
+		 */
+		page_recptr = BufferGetLSN(bufs[i]);
+
+		/* To check if block content changes while flushing. - vadim 01/17/97 */
+		buf_state &= ~BM_JUST_DIRTIED;
+		UnlockBufHdr(bufs[i], buf_state);
+
+		if (buf_state & BM_PERMANENT)
+			max_recptr = Max(page_recptr, max_recptr);
+	}
 
 	/*
-	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
-	 * rule that log updates must hit disk before any of the data-file changes
-	 * they describe do.
+	 * Force XLOG flush up to buffers' greatest LSN.  This implements the
+	 * basic WAL rule that log updates must hit disk before any of the
+	 * data-file changes they describe do.
 	 *
 	 * However, this rule does not apply to unlogged relations, which will be
 	 * lost after a crash anyway.  Most unlogged relation pages do not bear
@@ -3787,33 +3957,23 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * disastrous system-wide consequences.  To make sure that can't happen,
 	 * skip the flush if the buffer isn't permanent.
 	 */
-	if (buf_state & BM_PERMANENT)
-		XLogFlush(recptr);
+	if (max_recptr > 0)
+		XLogFlush(max_recptr);
 
 	/*
-	 * Now it's safe to write buffer to disk. Note that no one else should
+	 * Now it's safe to write buffers to disk. Note that no one else should
 	 * have been able to write it while we were busy with log flushing because
-	 * only one process at a time can set the BM_IO_IN_PROGRESS bit.
+	 * only one process at a time can set the BM_IO_IN_PROGRESS bits.
 	 */
-	bufBlock = BufHdrGetBlock(buf);
-
-	/*
-	 * Update page checksum if desired.  Since we have only shared lock on the
-	 * buffer, other processes might be updating hint bits in it, so we must
-	 * copy the page to private storage if we do checksumming.
-	 */
-	bufToWrite = PageSetChecksumCopy((Page) bufBlock, buf->tag.blockNum);
 
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
-	/*
-	 * bufToWrite is either the shared buffer or a copy, as appropriate.
-	 */
-	smgrwrite(reln,
-			  BufTagGetForkNum(&buf->tag),
-			  buf->tag.blockNum,
-			  bufToWrite,
-			  false);
+	smgrwritev(reln,
+			   BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+			   bufs[first_start_io_index]->tag.blockNum,
+			   (const void **) &bufBlocks[first_start_io_index],
+			   nbuffers - first_start_io_index,
+			   false);
 
 	/*
 	 * When a strategy is in use, only flushes of dirty buffers already in the
@@ -3834,24 +3994,47 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * of a dirty shared buffer (IOCONTEXT_NORMAL IOOP_WRITE).
 	 */
 	pgstat_count_io_op_time(IOOBJECT_RELATION, io_context,
-							IOOP_WRITE, io_start, 1);
+							IOOP_WRITE, io_start,
+							nbuffers - first_start_io_index);
 
-	pgBufferUsage.shared_blks_written++;
+	pgBufferUsage.shared_blks_written += nbuffers - first_start_io_index;
 
 	/*
-	 * Mark the buffer as clean (unless BM_JUST_DIRTIED has become set) and
+	 * Mark the buffers as clean (unless BM_JUST_DIRTIED has become set) and
 	 * end the BM_IO_IN_PROGRESS state.
 	 */
-	TerminateBufferIO(buf, true, 0, true);
+	for (int i = first_start_io_index; i < nbuffers; ++i)
+		TerminateBufferIO(bufs[i], true, 0, true);
 
-	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&buf->tag),
-									   buf->tag.blockNum,
+	TRACE_POSTGRESQL_BUFFER_FLUSH_DONE(BufTagGetForkNum(&bufs[first_start_io_index]->tag),
+									   bufs[0]->tag.blockNum,
 									   reln->smgr_rlocator.locator.spcOid,
 									   reln->smgr_rlocator.locator.dbOid,
 									   reln->smgr_rlocator.locator.relNumber);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	/* Report the range of buffers that we actually wrote, if any. */
+	if (first_written_index)
+		*first_written_index = first_start_io_index;
+	if (written)
+		*written = nbuffers - first_start_io_index;
+
+	return nbuffers;
+}
+
+/*
+ * FlushBuffer
+ *		Physically write out just one shared buffer.
+ *
+ * Single-block variant of FlushBuffers().
+ */
+static void
+FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
+			IOContext io_context)
+{
+	FlushBuffers(&buf, 1, reln, io_object, io_context, NULL, NULL);
 }
 
 /*
@@ -4843,6 +5026,25 @@ FlushOneBuffer(Buffer buffer)
 	FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
 }
 
+/*
+ * Flush a range of already pinned buffers to the OS.
+ */
+int
+FlushPinnedBuffers(Buffer *buffers, int nblocks, WritebackContext *wbcontext)
+{
+	BufferDesc *bufHdrs[MAX_IO_COMBINE_LIMIT];
+
+	for (int i = 0; i < nblocks; ++i)
+	{
+		Assert(BufferIsValid(buffers[i]));
+		Assert(!BufferIsLocal(buffers[i]));
+		Assert(BufferIsPinned(buffers[i]));
+		bufHdrs[i] = GetBufferDescriptor(buffers[i] - 1);
+	}
+
+	return SyncBuffers(bufHdrs, nblocks, wbcontext);
+}
+
 /*
  * ReleaseBuffer -- release the pin on a buffer
  */
@@ -5620,16 +5822,23 @@ AbortBufferIO(Buffer buffer)
 static void
 shared_buffer_write_error_callback(void *arg)
 {
-	BufferDesc *bufHdr = (BufferDesc *) arg;
+	struct shared_buffer_write_error_info *info = arg;
 
 	/* Buffer is pinned, so we can read the tag without locking the spinlock */
-	if (bufHdr != NULL)
+	if (info != NULL)
 	{
-		char	   *path = relpathperm(BufTagGetRelFileLocator(&bufHdr->tag),
-									   BufTagGetForkNum(&bufHdr->tag));
-
-		errcontext("writing block %u of relation %s",
-				   bufHdr->tag.blockNum, path);
+		char	   *path = relpathperm(BufTagGetRelFileLocator(&info->buf->tag),
+									   BufTagGetForkNum(&info->buf->tag));
+
+		if (info->nblocks > 1)
+			errcontext("writing blocks %u..%u of relation %s",
+					   info->buf->tag.blockNum,
+					   info->buf->tag.blockNum + info->nblocks - 1,
+					   path);
+		else
+			errcontext("writing block %u of relation %s",
+					   info->buf->tag.blockNum,
+					   path);
 		pfree(path);
 	}
 }
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 9cc22893499..69b9dd131a2 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -160,6 +160,16 @@ BufferTagsEqual(const BufferTag *tag1, const BufferTag *tag2)
 		(tag1->forkNum == tag2->forkNum);
 }
 
+static inline bool
+BufferTagsConsecutive(const BufferTag *tag1, const BufferTag *tag2)
+{
+	return (tag1->blockNum + 1 == tag2->blockNum) &&
+		(tag1->spcOid == tag2->spcOid) &&
+		(tag1->dbOid == tag2->dbOid) &&
+		(tag1->relNumber == tag2->relNumber) &&
+		(tag1->forkNum == tag2->forkNum);
+}
+
 static inline bool
 BufTagMatchesRelFileLocator(const BufferTag *tag,
 							const RelFileLocator *rlocator)
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 42211bfec4f..5db927bc497 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -265,6 +265,9 @@ extern BlockNumber BufferGetBlockNumber(Buffer buffer);
 extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
 												   ForkNumber forkNum);
 extern void FlushOneBuffer(Buffer buffer);
+extern int	FlushPinnedBuffers(Buffer *buffers,
+							   int nblocks,
+							   struct WritebackContext *wb_context);
 extern void FlushRelationBuffers(Relation rel);
 extern void FlushRelationsAllBuffers(struct SMgrRelationData **smgrs, int nrels);
 extern void CreateAndCopyRelationData(RelFileLocator src_rlocator,
-- 
2.44.0

From edd44f46a011951b6b9b27351cb004193eebed6a Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 25 Apr 2024 22:40:31 +1200
Subject: [PATCH v2 03/11] Provide stream API for cleaning the buffer pool.

User code pushes individual pinned buffers into the stream, and it
builds larger writes up to io_combine_limit, and also re-orders writes
to try to avoid having to flush the WAL.
---
 src/backend/storage/aio/Makefile       |   3 +-
 src/backend/storage/aio/meson.build    |   1 +
 src/backend/storage/aio/write_stream.c | 633 +++++++++++++++++++++++++
 src/include/storage/write_stream.h     |  60 +++
 src/tools/pgindent/typedefs.list       |   3 +
 5 files changed, 699 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/storage/aio/write_stream.c
 create mode 100644 src/include/storage/write_stream.h

diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
index 2f29a9ec4d1..2da929869b6 100644
--- a/src/backend/storage/aio/Makefile
+++ b/src/backend/storage/aio/Makefile
@@ -9,6 +9,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
-	read_stream.o
+	read_stream.o \
+	write_stream.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
index 10e1aa3b20b..95148b55073 100644
--- a/src/backend/storage/aio/meson.build
+++ b/src/backend/storage/aio/meson.build
@@ -2,4 +2,5 @@
 
 backend_sources += files(
   'read_stream.c',
+  'write_stream.c',
 )
diff --git a/src/backend/storage/aio/write_stream.c b/src/backend/storage/aio/write_stream.c
new file mode 100644
index 00000000000..b37e5e331c4
--- /dev/null
+++ b/src/backend/storage/aio/write_stream.c
@@ -0,0 +1,633 @@
+/*-------------------------------------------------------------------------
+ *
+ * write_stream.c
+ *	  Mechanism for flushing buffered data efficiently
+ *
+ * Portions Copyright (c) 2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/aio/write_stream.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "lib/ilist.h"
+#include "lib/pairingheap.h"
+#include "storage/buf_internals.h"
+#include "storage/proc.h"
+#include "storage/write_stream.h"
+
+
+typedef struct WriteStreamWrite
+{
+	/*
+	 * A WriteStreamWrite can be on the freelist, or in the queue, or the
+	 * current pending_write.
+	 */
+	union
+	{
+		pairingheap_node queue_node;
+		dlist_node	freelist_node;
+	}			u;
+
+	/*
+	 * A generation counter for writes, so we can check if a write has
+	 * finished even though it might have been recycled.
+	 */
+	uint64		counter;
+
+	/*
+	 * The last known high LSN for the buffers of this write.  This may be out
+	 * of date. XXX
+	 */
+	XLogRecPtr	flush_lsn;
+
+	/* Pinned buffer. */
+	Buffer		buffers[MAX_IO_COMBINE_LIMIT];
+	int			nblocks;
+
+	/* Cross-check for membership in different data structures. */
+#ifdef USE_ASSERT_CHECKING
+	enum
+	{
+		WSW_FREE,
+		WSW_PENDING,
+		WSW_QUEUED
+	}			location;
+#endif
+} WriteStreamWrite;
+
+struct WriteStream
+{
+	int16		pinned_buffers;
+	int16		max_pinned_buffers;
+
+	int16		queued_writes;
+	int16		max_queued_writes;
+
+	WritebackContext *wb_context;
+
+	/* The write we are currently trying to grow. */
+	WriteStreamWrite *pending_write;
+
+	/*
+	 * Known flushed threshold.  In recovery this set higher than any LSN,
+	 * because recovery currently always logs before replaying and doesn't
+	 * allow GetFlushRecPtr() to be called.
+	 */
+	XLogRecPtr	flush_lsn_limit;
+
+	/* Writes prioritized by LSN. */
+	pairingheap lsn_reorder_queue;
+
+	/* Spare writes. */
+	dlist_head	write_freelist;
+
+	size_t		writes_array_size;
+	WriteStreamWrite writes[FLEXIBLE_ARRAY_MEMBER];
+};
+
+static inline bool
+pinned_buffers_are_consecutive(Buffer buf1, Buffer buf2)
+{
+	BufferDesc *hdr1;
+	BufferDesc *hdr2;
+
+	hdr1 = GetBufferDescriptor(buf1 - 1);
+	hdr2 = GetBufferDescriptor(buf2 - 1);
+
+	return BufferTagsConsecutive(&hdr1->tag, &hdr2->tag);
+}
+
+/*
+ * Wake the WAL writer.  Used when writes deferred due to the need to flush
+ * the WAL first will soon have to be written, in the hope that it can help
+ * with that.
+ */
+static void
+write_stream_wake_wal_writer(void)
+{
+	if (ProcGlobal->walwriterLatch)
+		SetLatch(ProcGlobal->walwriterLatch);
+}
+
+/*
+ * Perform one write.  The write object is released to the freelist.
+ * If the write fails and raises an error, the stream is invalid and can't be
+ * used anymore.
+ */
+static void
+write_stream_do_write(WriteStream *stream, WriteStreamWrite *write)
+{
+	/* Sanity checks. */
+	Assert(write->nblocks > 0);
+	Assert(write->location == WSW_QUEUED || write->location == WSW_PENDING);
+
+	/*
+	 * For now this is synchronous, and returns only when it has processed all
+	 * writes or discovered they are no longer needed.  In an asynchronous
+	 * design, here we would move it to a second queue for I/Os in progress.
+	 */
+	FlushPinnedBuffers(write->buffers, write->nblocks, stream->wb_context);
+
+#ifdef USE_ASSERT_CHECKING
+	for (int i = 0; i < write->nblocks; ++i)
+		write->buffers[i] = InvalidBuffer;
+#endif
+
+	Assert(stream->pinned_buffers >= write->nblocks);
+	stream->pinned_buffers -= write->nblocks;
+
+	/*
+	 * Allow write_stream_wait() to see that all the buffers in this write are
+	 * finished.
+	 */
+	write->counter++;
+
+	/* This write object is now available for re-use. */
+	dlist_push_head(&stream->write_freelist, &write->u.freelist_node);
+#ifdef USE_ASSERT_CHECKING
+	write->location = WSW_FREE;
+#endif
+}
+
+/*
+ * Check if a given LSN is certainly flushed already.
+ */
+static inline bool
+write_stream_check_flush_limit(WriteStream *stream, XLogRecPtr lsn)
+{
+	/* This is always true when in recovery. */
+	if (likely(lsn <= stream->flush_lsn_limit))
+		return true;
+
+	/* Update our copy from shared memory. */
+	stream->flush_lsn_limit = GetFlushRecPtr(NULL);
+	return lsn <= stream->flush_lsn_limit;
+}
+
+/*
+ * Make the write queue bigger.
+ */
+static void
+write_stream_expand_queue(WriteStream *stream, int16 max_queued_writes)
+{
+	size_t		initialized_size;
+	size_t		new_initialized_size;
+
+	/*
+	 * We need one extra element for the current pending write.  (If we had
+	 * asynchronous I/O, we'd want more for the in writes in progress.)
+	 */
+	initialized_size = stream->max_queued_writes + 1;
+	new_initialized_size = max_queued_writes + 1;
+	Assert(new_initialized_size <= stream->writes_array_size);
+
+	/* Initialize more writes, and put them on the freelist. */
+	while (initialized_size < new_initialized_size)
+	{
+		WriteStreamWrite *write = &stream->writes[initialized_size++];
+
+#ifdef USE_ASSERT_CHECKING
+		write->location = WSW_FREE;
+		for (int j = 0; j < lengthof(write->buffers); ++j)
+			write->buffers[j] = InvalidBuffer;
+#endif
+		dlist_push_head(&stream->write_freelist, &write->u.freelist_node);
+	}
+	stream->max_queued_writes = initialized_size - 1;
+	Assert(stream->max_queued_writes > 0);
+}
+
+/*
+ * Perform at least the write with the highest priority (lowest flush LSN).
+ * Also do any more writes can be done without flushing.
+ *
+ * If try_avoid_flush is set, then consider expanding the queue size instead
+ * if a flush would be necessary.  Either way, when this function returns,
+ * there is space for a new write to be insert into the queue.
+ */
+static void
+write_stream_do_highest_priority_write(WriteStream *stream, bool try_avoid_flush)
+{
+	WriteStreamWrite *write;
+
+	/* We should only be called if there is at least one queued. */
+	Assert(stream->queued_writes > 0);
+	Assert(!pairingheap_is_empty(&stream->lsn_reorder_queue));
+
+	write = pairingheap_container(WriteStreamWrite,
+								  u.queue_node,
+								  pairingheap_first(&stream->lsn_reorder_queue));
+
+	/*
+	 * Consider making the queue bigger instead, if we'd have to flush to
+	 * preform this write.  That might give the WAL writer a better chance of
+	 * keeping up.
+	 */
+	if (try_avoid_flush &&
+		stream->max_queued_writes < stream->writes_array_size - 1 &&
+		!write_stream_check_flush_limit(stream, write->flush_lsn))
+	{
+		write_stream_expand_queue(stream, stream->max_queued_writes + 1);
+		return;
+	}
+
+	/* Perform the highest priority write. */
+	pairingheap_remove_first(&stream->lsn_reorder_queue);
+	Assert(stream->queued_writes > 0);
+	stream->queued_writes--;
+	write_stream_do_write(stream, write);
+
+	/*
+	 * How many more writes can we do without flushing the WAL?  Usually all
+	 * of them, if we had to flush above, because of XLogFlush()'s policy of
+	 * flushing as much as possible, even past the requested LSN.  If we
+	 * didn't have to flush, then someone else must have, and in that case the
+	 * following loop might find only some.
+	 */
+	while (!pairingheap_is_empty(&stream->lsn_reorder_queue))
+	{
+		write = pairingheap_container(WriteStreamWrite,
+									  u.queue_node,
+									  pairingheap_first(&stream->lsn_reorder_queue));
+		if (!write_stream_check_flush_limit(stream, write->flush_lsn))
+			break;
+
+		/* Perform the highest priority loop. */
+		pairingheap_remove_first(&stream->lsn_reorder_queue);
+		Assert(stream->queued_writes > 0);
+		stream->queued_writes--;
+		write_stream_do_write(stream, write);
+	}
+
+	/*
+	 * If that didn't clear more than half of the queue, then try waking the
+	 * WAL writer.
+	 */
+	if (stream->queued_writes >= stream->max_queued_writes / 2)
+		write_stream_wake_wal_writer();
+}
+
+/*
+ * Push the current pending write onto the priority queue.  It will wait there
+ * until the queue is full and it has the lowest LSN.  Hopefully this means
+ * that we'll have to flush the WAL less often.  If the queue is full, perform
+ * the the highest priority write first.
+ *
+ * If we can already see that no WAL flush is required, then skip the queue and
+ * do the write immediately.
+ */
+static void
+write_stream_enqueue_pending_write(WriteStream *stream)
+{
+	WriteStreamWrite *write = stream->pending_write;
+
+	Assert(write != NULL);
+	Assert(write->location = WSW_PENDING);
+
+	/*
+	 * If no WAL flushing is required to write this data, we can skip the LSN
+	 * queue and proceed directly to I/O.
+	 */
+	if (likely(write_stream_check_flush_limit(stream, write->flush_lsn)))
+	{
+		stream->pending_write = NULL;
+		write_stream_do_write(stream, write);
+		return;
+	}
+
+	/* Have we hit the queue limit? */
+	if (stream->queued_writes == stream->max_queued_writes)
+		write_stream_do_highest_priority_write(stream, true /* try_avoid_flush */ );
+	Assert(stream->queued_writes < stream->max_queued_writes);
+
+	pairingheap_add(&stream->lsn_reorder_queue, &write->u.queue_node);
+	stream->queued_writes++;
+#ifdef USE_ASSERT_CHECKING
+	write->location = WSW_QUEUED;
+#endif
+
+	/*
+	 * As we cross the halfway full mark, kick the WAL writer.  We won't be
+	 * forced to write any data out until the queue is full, but perhaps by
+	 * then we'll be able to perform some writes without flushing the WAL...
+	 *
+	 * XXX Could this hurt?  Maybe, if it causes twice as many WAL flushes in
+	 * total between WAL writer and this process, given that PostgreSQL always
+	 * serializes WAL writes.
+	 */
+	if (stream->queued_writes == stream->max_queued_writes / 2)
+		write_stream_wake_wal_writer();
+
+	stream->pending_write = NULL;
+}
+
+static int
+write_stream_lsn_compare(const pairingheap_node *a,
+						 const pairingheap_node *b,
+						 void *arg)
+{
+	XLogRecPtr	a_lsn;
+	XLogRecPtr	b_lsn;
+
+	a_lsn = pairingheap_const_container(WriteStreamWrite, u.queue_node, a)->flush_lsn;
+	b_lsn = pairingheap_const_container(WriteStreamWrite, u.queue_node, b)->flush_lsn;
+	return a_lsn < b_lsn ? -1 : a_lsn > b_lsn ? 1 : 0;
+}
+
+/*
+ * Begin a new write stream.
+ *
+ * Caller can specify the maximum number of written buffers that can be
+ * deferred by internal queuing, or -1 for a reasonable default.  The queue is
+ * initially small, but expands as required to try to avoid having to flush
+ * the WAL.
+ */
+WriteStream *
+write_stream_begin(int flags, WritebackContext *wb_context, int max_distance)
+{
+	WriteStream *stream;
+	uint32		max_pinned_buffers;
+	int16		max_queued_writes;
+	size_t		writes_array_size;
+
+	/* An arbitrary default. */
+	if (max_distance <= 0)
+		max_distance = 16 * io_combine_limit;
+
+	/*
+	 * How many buffer pins are we actually allowed to hold at once?
+	 *
+	 * XXX We could break the link between distance and pins, by unpinning the
+	 * buffers held by writes while they are sitting in the re-order queue.
+	 * When we're ready to perform the write, we could re-pin them with
+	 * ReadRecentBuffer(), which should be pretty fast, and if it fails we are
+	 * lucky: someone else cleaned the buffer for us already, and we have
+	 * nothing to do (except consider the other buffers in the same write).
+	 */
+	max_pinned_buffers = max_distance;
+	LimitAdditionalPins(&max_pinned_buffers);
+	Assert(max_pinned_buffers > 0);
+
+	/*
+	 * Maximum possible number of deferred writes, when expanding the queue.
+	 * We don't know how successful we'll be at write combining, so allow for
+	 * average write size of half of io_combine_limit.  Round up to avoid
+	 * zero.
+	 */
+	writes_array_size = max_pinned_buffers;
+	if (io_combine_limit > 1)
+		writes_array_size /= io_combine_limit / 2;
+	writes_array_size++;
+
+	/* Add one for the current pending write, and cap for data type. */
+	writes_array_size++;
+	writes_array_size = Min(PG_INT16_MAX, writes_array_size);
+
+	/* Initial size of the reorder queue. */
+	max_queued_writes = 1;
+
+	/* Allocate and initialize. */
+	stream = palloc(offsetof(WriteStream, writes) +
+					writes_array_size * sizeof(WriteStreamWrite));
+	stream->max_pinned_buffers = max_pinned_buffers;
+	stream->max_queued_writes = -1; /* expanded below */
+	stream->writes_array_size = writes_array_size;
+	stream->queued_writes = 0;
+	stream->pinned_buffers = 0;
+	stream->pending_write = NULL;
+	stream->wb_context = wb_context;
+	dlist_init(&stream->write_freelist);
+	stream->lsn_reorder_queue.ph_root = NULL;
+	stream->lsn_reorder_queue.ph_arg = NULL;
+	stream->lsn_reorder_queue.ph_compare = write_stream_lsn_compare;
+
+	/* What LSN should we initially consider to be flushed? */
+	if (RecoveryInProgress())
+		stream->flush_lsn_limit = PG_UINT64_MAX;
+	else
+		stream->flush_lsn_limit = 0;
+
+	write_stream_expand_queue(stream, max_queued_writes);
+
+	return stream;
+}
+
+
+/*
+ * Start writing out one buffer.  The write is finished either when the stream
+ * is eventually flushed with write_stream_end(), but if the caller is
+ * interested in waiting for individual buffers to be written out, the
+ * returned handle can be used to wait.
+ */
+WriteStreamWriteHandle
+write_stream_write_buffer(WriteStream *stream, Buffer buffer)
+{
+	WriteStreamWriteHandle handle;
+	WriteStreamWrite *pending_write;
+
+	/*
+	 * If we've hit the limit on pinned buffers already, we'll need to perform
+	 * one write.  This should only happen with small pin limits, eg tiny
+	 * buffer pool.
+	 */
+	if (unlikely(stream->pinned_buffers == stream->max_pinned_buffers))
+	{
+		if (stream->queued_writes == 0)
+		{
+			/* It must be the pending write holding pins, so move it along... */
+			Assert(stream->pending_write);
+			write_stream_enqueue_pending_write(stream);
+		}
+
+		/*
+		 * Either that was enough to free up some pins, or the queue now has
+		 * something in it, and we'll force the write with the lowest LSN to
+		 * be performed.
+		 */
+		Assert(stream->pinned_buffers < stream->max_pinned_buffers ||
+			   stream->queued_writes > 0);
+		if (stream->pinned_buffers == stream->max_pinned_buffers)
+		{
+			Assert(stream->queued_writes > 0);
+			write_stream_do_highest_priority_write(stream, false);
+		}
+	}
+
+	/* Caller transfers ownership of the buffer pin to this stream. */
+	Assert(stream->pinned_buffers < stream->max_pinned_buffers);
+	stream->pinned_buffers++;
+
+	/* Can we combine this buffer with the current pending write, if any? */
+	pending_write = stream->pending_write;
+	if (likely(pending_write))
+	{
+		if (likely(pending_write->nblocks < io_combine_limit &&
+				   pinned_buffers_are_consecutive(pending_write->buffers[pending_write->nblocks - 1],
+												  buffer)))
+		{
+			/*
+			 * Yes!  Append, and adjust LSN high water mark for the combined
+			 * write. It doesn't matter if the LSN changes later, we're only
+			 * using it to guide prioritization, not for correctness.
+			 */
+			pending_write->buffers[pending_write->nblocks++] = buffer;
+			pending_write->flush_lsn = Max(pending_write->flush_lsn,
+										   BufferGetLSNAtomic(buffer));
+			Assert(pending_write->location == WSW_PENDING);
+
+			handle.p = &pending_write->counter;
+			handle.c = pending_write->counter;
+			return handle;
+		}
+
+		/* No.  Queue it up, to get it out of the way. */
+		write_stream_enqueue_pending_write(stream);
+	}
+
+	/*
+	 * There must be at least one write object in the freelist, because there
+	 * isn't a pending write.
+	 */
+	Assert(stream->pending_write == NULL);
+	Assert(!dlist_is_empty(&stream->write_freelist));
+
+	/* Start forming a new write, with this buffer as the head. */
+	pending_write = dlist_container(WriteStreamWrite,
+									u.freelist_node,
+									dlist_pop_head_node(&stream->write_freelist));
+	Assert(pending_write->location == WSW_FREE);
+	pending_write->buffers[0] = buffer;
+	pending_write->nblocks = 1;
+	pending_write->flush_lsn = BufferGetLSNAtomic(buffer);
+#ifdef USE_ASSERT_CHECKING
+	pending_write->location = WSW_PENDING;
+#endif
+	stream->pending_write = pending_write;
+
+	handle.p = &pending_write->counter;
+	handle.c = pending_write->counter;
+	return handle;
+}
+
+/*
+ * Slow patch for write_stream_wait().  See header.
+ */
+void
+write_stream_wait_slow(WriteStream *stream, WriteStreamWriteHandle handle)
+{
+	WriteStreamWrite *write;
+
+	/* Convert handle back to a write object. */
+	write = (WriteStreamWrite *)
+		((char *) handle.p - offsetof(WriteStreamWrite, counter));
+
+	/* The inline fast path shouldn't have sent us here if done already. */
+	Assert(handle.c == write->counter);
+
+	/* If it's the current pending write, enqueue it. */
+	if (write == stream->pending_write)
+	{
+		Assert(write->location == WSW_PENDING);
+		write_stream_enqueue_pending_write(stream);
+
+		/* Finished yet? */
+		if (handle.c != write->counter)
+		{
+			Assert(write->location == WSW_FREE);
+			return;
+		}
+	}
+
+	/* Keep writing out queued writes until the handle is satisfied. */
+	Assert(write->location == WSW_QUEUED);
+	while (handle.c != write->counter)
+	{
+		/* There must be one! */
+		Assert(stream->queued_writes > 0);
+		write_stream_do_highest_priority_write(stream, false);
+	}
+}
+
+/*
+ * Release all pins and forget all queued writes.
+ */
+void
+write_stream_reset(WriteStream *stream)
+{
+	WriteStreamWrite *write;
+
+	if ((write = stream->pending_write))
+	{
+		stream->pending_write = NULL;
+		write->counter++;
+		for (int i = 0; i < write->nblocks; ++i)
+		{
+			ReleaseBuffer(write->buffers[i]);
+			stream->pinned_buffers--;
+		}
+		dlist_push_head(&stream->write_freelist, &write->u.freelist_node);
+#ifdef USE_ASSERT_CHECKING
+		write->location = WSW_FREE;
+#endif
+	}
+
+	while (!pairingheap_is_empty(&stream->lsn_reorder_queue))
+	{
+		write = pairingheap_container(WriteStreamWrite,
+									  u.queue_node,
+									  pairingheap_remove_first(&stream->lsn_reorder_queue));
+		write->counter++;
+		for (int i = 0; i < write->nblocks; ++i)
+		{
+			ReleaseBuffer(write->buffers[i]);
+			stream->pinned_buffers--;
+		}
+		dlist_push_head(&stream->write_freelist, &write->u.freelist_node);
+#ifdef USE_ASSERT_CHECKING
+		write->location = WSW_FREE;
+#endif
+		stream->queued_writes--;
+	}
+
+	Assert(stream->pending_write == NULL);
+	Assert(stream->queued_writes == 0);
+	Assert(stream->pinned_buffers == 0);
+}
+
+/*
+ * Finish all queued writes.  Nothing is pinned on return.
+ */
+void
+write_stream_wait_all(WriteStream *stream)
+{
+	/* Complete any queued up writes. */
+	while (stream->queued_writes > 0 ||
+		   stream->pending_write)
+	{
+		if (stream->pending_write &&
+			stream->queued_writes < stream->max_queued_writes)
+			write_stream_enqueue_pending_write(stream);
+		if (stream->queued_writes > 0)
+			write_stream_do_highest_priority_write(stream, false);
+	}
+
+	Assert(stream->pending_write == NULL);
+	Assert(stream->queued_writes == 0);
+	Assert(stream->pinned_buffers == 0);
+}
+
+/*
+ * Finish all writes and release all resources.
+ */
+void
+write_stream_end(WriteStream *stream)
+{
+	write_stream_wait_all(stream);
+	pfree(stream);
+}
diff --git a/src/include/storage/write_stream.h b/src/include/storage/write_stream.h
new file mode 100644
index 00000000000..0877124153f
--- /dev/null
+++ b/src/include/storage/write_stream.h
@@ -0,0 +1,60 @@
+/*-------------------------------------------------------------------------
+ *
+ * write_stream.h
+ *	  Mechanism for writing out buffered data efficiently
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/write_stream.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WRITE_STREAM_H
+#define WRITE_STREAM_H
+
+#include "storage/bufmgr.h"
+
+/*
+ * An opaque handle type returned by write_stream_write_buffer.  Calls can
+ * optionally wait for individual buffers to be written using this handle.
+ */
+typedef struct WriteStreamWriteHandle
+{
+	uint64	   *p;
+	uint64		c;
+} WriteStreamWriteHandle;
+
+struct WriteStream;
+typedef struct WriteStream WriteStream;
+
+extern WriteStream *write_stream_begin(int flags,
+									   struct WritebackContext *wb_context,
+									   int max_deferred_writes);
+extern WriteStreamWriteHandle write_stream_write_buffer(WriteStream *stream,
+														Buffer buffer);
+extern void write_stream_wait_slow(WriteStream *stream,
+								   WriteStreamWriteHandle handle);
+extern void write_stream_wait_all(WriteStream *stream);
+extern void write_stream_reset(WriteStream *stream);
+extern void write_stream_end(WriteStream *stream);
+
+/*
+ * Wait for an individual write_stream_write_buffer() operation to be
+ * finished.  While write_stream_end() finished all writes, some callers might
+ * need to wait for an individual buffer write to complete without waiting for
+ * all of them.
+ */
+static inline void
+write_stream_wait(WriteStream *stream, WriteStreamWriteHandle handle)
+{
+	/*
+	 * Only bother to enter the slow path if the completion counter hasn't
+	 * moved.  Tolerate zero-initialized object.
+	 */
+	if (handle.p && *handle.p == handle.c)
+		write_stream_wait_slow(stream, handle);
+}
+
+#endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d551ada3255..78bcf69e226 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3148,6 +3148,9 @@ WriteDataPtrType
 WriteExtraTocPtrType
 WriteFunc
 WriteManifestState
+WriteStream
+WriteStreamWrite
+WriteStreamWriteHandle
 WriteTarState
 WritebackContext
 X509
-- 
2.44.0

From eefd9b1ca2a15ab3a361c6a20ba8d247ac40b3e3 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 26 Apr 2024 00:10:22 +1200
Subject: [PATCH v2 04/11] Use streaming I/O in CHECKPOINT.

While writing data out for a checkpoint, use the new write stream API so
we can build up large writes up to io_combine_limit.  This also means
that writes can be reordered sometimes to try to get the WAL writer to
help.
---
 src/backend/storage/buffer/bufmgr.c | 33 ++++++++++++++++++++++++++---
 1 file changed, 30 insertions(+), 3 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f7ada9dcc7e..5f5a48ba6ca 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -56,6 +56,7 @@
 #include "storage/proc.h"
 #include "storage/smgr.h"
 #include "storage/standby.h"
+#include "storage/write_stream.h"
 #include "utils/memdebug.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -121,6 +122,9 @@ typedef struct CkptTsStatus
 
 	/* current offset in CkptBufferIds for this tablespace */
 	int			index;
+
+	/* I/O stream for writing out to this tablespace */
+	WriteStream *stream;
 } CkptTsStatus;
 
 /*
@@ -3062,6 +3066,13 @@ BufferSync(int flags)
 			 */
 
 			last_tsid = cur_tsid;
+
+			/*
+			 * Create a separate WriteStream for each tablespace.  If we only
+			 * had one stream, we'd lose some write-combining opportunities at
+			 * arbitrary boundaries when switching between tablespaces.
+			 */
+			s->stream = write_stream_begin(0, &wb_context, -1);
 		}
 		else
 		{
@@ -3132,11 +3143,23 @@ BufferSync(int flags)
 		 */
 		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+			ResourceOwnerEnlarge(CurrentResourceOwner);
+			ReservePrivateRefCountEntry();
+
+			buf_state = LockBufHdr(bufHdr);
+			if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buffers_written++;
+				/* It's clean, so nothing to do */
+				UnlockBufHdr(bufHdr, buf_state);
+			}
+			else
+			{
+				PinBuffer_Locked(bufHdr);
+
+				write_stream_write_buffer(ts_stat->stream,
+										  BufferDescriptorGetBuffer(bufHdr));
 				num_written++;
+				PendingCheckpointerStats.buffers_written++;
 			}
 		}
 
@@ -3167,6 +3190,10 @@ BufferSync(int flags)
 		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
 	}
 
+	/* Finish all I/O streams, waiting for outstanding writes to complete. */
+	for (i = 0; i < num_spaces; i++)
+		write_stream_end(per_ts_stat[i].stream);
+
 	/*
 	 * Issue all pending flushes. Only checkpointer calls BufferSync(), so
 	 * IOContext will always be IOCONTEXT_NORMAL.
-- 
2.44.0

From 8ff649aea11bc6f09c2ee157b5f173869e62fa42 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Mon, 11 Mar 2024 16:19:56 -0400
Subject: [PATCH v2 05/11] Use streaming I/O in VACUUM first pass.

Now vacuum's first pass, which HOT-prunes and records the TIDs of
non-removable dead tuples, uses the streaming read API by converting
heap_vac_scan_next_block() to a read stream callback.

Author: Melanie Plageman <melanieplage...@gmail.com>
---
 src/backend/access/heap/vacuumlazy.c | 80 +++++++++++++++++-----------
 1 file changed, 49 insertions(+), 31 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index de109acc89a..e30b6fc37c5 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -55,6 +55,7 @@
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #include "storage/lmgr.h"
+#include "storage/read_stream.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_rusage.h"
@@ -229,8 +230,9 @@ typedef struct LVSavedErrInfo
 
 /* non-export function prototypes */
 static void lazy_scan_heap(LVRelState *vacrel);
-static bool heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
-									 bool *all_visible_according_to_vm);
+static BlockNumber heap_vac_scan_next_block(ReadStream *stream,
+											void *callback_private_data,
+											void *per_buffer_data);
 static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis);
 static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf,
 								   BlockNumber blkno, Page page,
@@ -817,10 +819,11 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
 static void
 lazy_scan_heap(LVRelState *vacrel)
 {
+	Buffer		buf;
+	ReadStream *stream;
 	BlockNumber rel_pages = vacrel->rel_pages,
-				blkno,
 				next_fsm_block_to_vacuum = 0;
-	bool		all_visible_according_to_vm;
+	bool	   *all_visible_according_to_vm;
 
 	TidStore   *dead_items = vacrel->dead_items;
 	VacDeadItemsInfo *dead_items_info = vacrel->dead_items_info;
@@ -838,19 +841,33 @@ lazy_scan_heap(LVRelState *vacrel)
 	initprog_val[2] = dead_items_info->max_bytes;
 	pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
 
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+										vacrel->bstrategy,
+										vacrel->rel,
+										MAIN_FORKNUM,
+										heap_vac_scan_next_block,
+										vacrel,
+										sizeof(bool));
+
 	/* Initialize for the first heap_vac_scan_next_block() call */
 	vacrel->current_block = InvalidBlockNumber;
 	vacrel->next_unskippable_block = InvalidBlockNumber;
 	vacrel->next_unskippable_allvis = false;
 	vacrel->next_unskippable_vmbuffer = InvalidBuffer;
 
-	while (heap_vac_scan_next_block(vacrel, &blkno, &all_visible_according_to_vm))
+	while (BufferIsValid(buf = read_stream_next_buffer(stream,
+													   (void **) &all_visible_according_to_vm)))
 	{
-		Buffer		buf;
+		BlockNumber blkno;
 		Page		page;
 		bool		has_lpdead_items;
 		bool		got_cleanup_lock = false;
 
+		vacrel->blkno = blkno = BufferGetBlockNumber(buf);
+
+		CheckBufferIsPinnedOnce(buf);
+		page = BufferGetPage(buf);
+
 		vacrel->scanned_pages++;
 
 		/* Report as block scanned, update error traceback information */
@@ -916,10 +933,6 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		visibilitymap_pin(vacrel->rel, blkno, &vmbuffer);
 
-		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-								 vacrel->bstrategy);
-		page = BufferGetPage(buf);
-
 		/*
 		 * We need a buffer cleanup lock to prune HOT chains and defragment
 		 * the page in lazy_scan_prune.  But when it's not possible to acquire
@@ -975,7 +988,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		 */
 		if (got_cleanup_lock)
 			lazy_scan_prune(vacrel, buf, blkno, page,
-							vmbuffer, all_visible_according_to_vm,
+							vmbuffer, *all_visible_according_to_vm,
 							&has_lpdead_items);
 
 		/*
@@ -1029,7 +1042,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		ReleaseBuffer(vmbuffer);
 
 	/* report that everything is now scanned */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, rel_pages);
 
 	/* now we can compute the new value for pg_class.reltuples */
 	vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
@@ -1044,6 +1057,8 @@ lazy_scan_heap(LVRelState *vacrel)
 		Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples +
 		vacrel->missed_dead_tuples;
 
+	read_stream_end(stream);
+
 	/*
 	 * Do index vacuuming (call each index's ambulkdelete routine), then do
 	 * related heap vacuuming
@@ -1055,11 +1070,11 @@ lazy_scan_heap(LVRelState *vacrel)
 	 * Vacuum the remainder of the Free Space Map.  We must do this whether or
 	 * not there were indexes, and whether or not we bypassed index vacuuming.
 	 */
-	if (blkno > next_fsm_block_to_vacuum)
-		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, blkno);
+	if (rel_pages > next_fsm_block_to_vacuum)
+		FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, rel_pages);
 
 	/* report all blocks vacuumed */
-	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
+	pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages);
 
 	/* Do final index cleanup (call each index's amvacuumcleanup routine) */
 	if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
@@ -1069,14 +1084,14 @@ lazy_scan_heap(LVRelState *vacrel)
 /*
  *	heap_vac_scan_next_block() -- get next block for vacuum to process
  *
- * lazy_scan_heap() calls here every time it needs to get the next block to
- * prune and vacuum.  The function uses the visibility map, vacuum options,
- * and various thresholds to skip blocks which do not need to be processed and
- * sets blkno to the next block to process.
+ * The streaming read callback invokes heap_vac_scan_next_block() every time
+ * lazy_scan_heap() needs the next block to prune and vacuum.  The function
+ * uses the visibility map, vacuum options, and various thresholds to skip
+ * blocks which do not need to be processed and returns the next block to
+ * process or InvalidBlockNumber if there are no remaining blocks.
  *
- * The block number and visibility status of the next block to process are set
- * in *blkno and *all_visible_according_to_vm.  The return value is false if
- * there are no further blocks to process.
+ * The visibility status of the next block to process is set in the
+ * per_buffer_data.
  *
  * vacrel is an in/out parameter here.  Vacuum options and information about
  * the relation are read.  vacrel->skippedallvis is set if we skip a block
@@ -1084,11 +1099,14 @@ lazy_scan_heap(LVRelState *vacrel)
  * relfrozenxid in that case.  vacrel also holds information about the next
  * unskippable block, as bookkeeping for this function.
  */
-static bool
-heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
-						 bool *all_visible_according_to_vm)
+static BlockNumber
+heap_vac_scan_next_block(ReadStream *stream,
+						 void *callback_private_data,
+						 void *per_buffer_data)
 {
 	BlockNumber next_block;
+	LVRelState *vacrel = callback_private_data;
+	bool	   *all_visible_according_to_vm = per_buffer_data;
 
 	/* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */
 	next_block = vacrel->current_block + 1;
@@ -1101,8 +1119,8 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 			ReleaseBuffer(vacrel->next_unskippable_vmbuffer);
 			vacrel->next_unskippable_vmbuffer = InvalidBuffer;
 		}
-		*blkno = vacrel->rel_pages;
-		return false;
+		vacrel->current_block = vacrel->rel_pages;
+		return InvalidBlockNumber;
 	}
 
 	/*
@@ -1151,9 +1169,9 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 		 * but chose not to.  We know that they are all-visible in the VM,
 		 * otherwise they would've been unskippable.
 		 */
-		*blkno = vacrel->current_block = next_block;
+		vacrel->current_block = next_block;
 		*all_visible_according_to_vm = true;
-		return true;
+		return vacrel->current_block;
 	}
 	else
 	{
@@ -1163,9 +1181,9 @@ heap_vac_scan_next_block(LVRelState *vacrel, BlockNumber *blkno,
 		 */
 		Assert(next_block == vacrel->next_unskippable_block);
 
-		*blkno = vacrel->current_block = next_block;
+		vacrel->current_block = next_block;
 		*all_visible_according_to_vm = vacrel->next_unskippable_allvis;
-		return true;
+		return vacrel->current_block;
 	}
 }
 
-- 
2.44.0

From 2cdf825ab244eed63cbf03853f078c7fbedafabb Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 26 Apr 2024 08:32:44 +1200
Subject: [PATCH v2 06/11] Refactor tidstore.c memory management.

Previously, TidStoreIterateNext() would expand the set of offsets for
each block into a buffer that it overwrote each time.  In order to be
able to collect the offsets for multiple blocks before working with
them, change the contract.  Now, the offsets are obtained by a separate
call to TidStoreGetBlockOffsets(), which can be called at a later time,
and TidStoreIteratorResult objects are safe to copy and store in a
queue.

This will be used by a later patch, to avoid the need for expensive
extra copies of offset array and associated memory management.
---
 src/backend/access/common/tidstore.c          | 68 +++++++++----------
 src/backend/access/heap/vacuumlazy.c          |  9 ++-
 src/include/access/tidstore.h                 | 12 ++--
 .../modules/test_tidstore/test_tidstore.c     |  9 ++-
 4 files changed, 53 insertions(+), 45 deletions(-)

diff --git a/src/backend/access/common/tidstore.c b/src/backend/access/common/tidstore.c
index fb3949d69f6..c3c1987204b 100644
--- a/src/backend/access/common/tidstore.c
+++ b/src/backend/access/common/tidstore.c
@@ -147,9 +147,6 @@ struct TidStoreIter
 	TidStoreIterResult output;
 };
 
-static void tidstore_iter_extract_tids(TidStoreIter *iter, BlockNumber blkno,
-									   BlocktableEntry *page);
-
 /*
  * Create a TidStore. The TidStore will live in the memory context that is
  * CurrentMemoryContext at the time of this call. The TID storage, backed
@@ -486,13 +483,6 @@ TidStoreBeginIterate(TidStore *ts)
 	iter = palloc0(sizeof(TidStoreIter));
 	iter->ts = ts;
 
-	/*
-	 * We start with an array large enough to contain at least the offsets
-	 * from one completely full bitmap element.
-	 */
-	iter->output.max_offset = 2 * BITS_PER_BITMAPWORD;
-	iter->output.offsets = palloc(sizeof(OffsetNumber) * iter->output.max_offset);
-
 	if (TidStoreIsShared(ts))
 		iter->tree_iter.shared = shared_ts_begin_iterate(ts->tree.shared);
 	else
@@ -503,9 +493,9 @@ TidStoreBeginIterate(TidStore *ts)
 
 
 /*
- * Scan the TidStore and return the TIDs of the next block. The offsets in
- * each iteration result are ordered, as are the block numbers over all
- * iterations.
+ * Return a result that contains the next block number and that can be used to
+ * obtain the set of offsets by calling TidStoreGetBlockOffsets().  The result
+ * is copyable.
  */
 TidStoreIterResult *
 TidStoreIterateNext(TidStoreIter *iter)
@@ -521,10 +511,10 @@ TidStoreIterateNext(TidStoreIter *iter)
 	if (page == NULL)
 		return NULL;
 
-	/* Collect TIDs from the key-value pair */
-	tidstore_iter_extract_tids(iter, (BlockNumber) key, page);
+	iter->output.blkno = key;
+	iter->output.internal_page = page;
 
-	return &(iter->output);
+	return &iter->output;
 }
 
 /*
@@ -540,7 +530,6 @@ TidStoreEndIterate(TidStoreIter *iter)
 	else
 		local_ts_end_iterate(iter->tree_iter.local);
 
-	pfree(iter->output.offsets);
 	pfree(iter);
 }
 
@@ -575,16 +564,19 @@ TidStoreGetHandle(TidStore *ts)
 	return (dsa_pointer) shared_ts_get_handle(ts->tree.shared);
 }
 
-/* Extract TIDs from the given key-value pair */
-static void
-tidstore_iter_extract_tids(TidStoreIter *iter, BlockNumber blkno,
-						   BlocktableEntry *page)
+/*
+ * Given a TidStoreIterResult returned by TidStoreIterateNext(), extract the
+ * offset numbers.  Returns the number of offsets filled in, if <=
+ * max_offsets.  Otherwise, fills in as much as it can in the given space, and
+ * returns the size of the buffer that would be needed.
+ */
+int
+TidStoreGetBlockOffsets(TidStoreIterResult *result,
+						OffsetNumber *offsets,
+						int max_offsets)
 {
-	TidStoreIterResult *result = (&iter->output);
-	int			wordnum;
-
-	result->num_offsets = 0;
-	result->blkno = blkno;
+	BlocktableEntry *page = result->internal_page;
+	int			num_offsets = 0;
 
 	if (page->header.nwords == 0)
 	{
@@ -592,31 +584,33 @@ tidstore_iter_extract_tids(TidStoreIter *iter, BlockNumber blkno,
 		for (int i = 0; i < NUM_FULL_OFFSETS; i++)
 		{
 			if (page->header.full_offsets[i] != InvalidOffsetNumber)
-				result->offsets[result->num_offsets++] = page->header.full_offsets[i];
+			{
+				if (num_offsets < max_offsets)
+					offsets[num_offsets] = page->header.full_offsets[i];
+				num_offsets++;
+			}
 		}
 	}
 	else
 	{
-		for (wordnum = 0; wordnum < page->header.nwords; wordnum++)
+		for (int wordnum = 0; wordnum < page->header.nwords; wordnum++)
 		{
 			bitmapword	w = page->words[wordnum];
 			int			off = wordnum * BITS_PER_BITMAPWORD;
 
-			/* Make sure there is enough space to add offsets */
-			if ((result->num_offsets + BITS_PER_BITMAPWORD) > result->max_offset)
-			{
-				result->max_offset *= 2;
-				result->offsets = repalloc(result->offsets,
-										   sizeof(OffsetNumber) * result->max_offset);
-			}
-
 			while (w != 0)
 			{
 				if (w & 1)
-					result->offsets[result->num_offsets++] = (OffsetNumber) off;
+				{
+					if (num_offsets < max_offsets)
+						offsets[num_offsets] = (OffsetNumber) off;
+					num_offsets++;
+				}
 				off++;
 				w >>= 1;
 			}
 		}
 	}
+
+	return num_offsets;
 }
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index e30b6fc37c5..e8ad79b41f9 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2146,12 +2146,17 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		Buffer		buf;
 		Page		page;
 		Size		freespace;
+		OffsetNumber offsets[MaxOffsetNumber];
+		int			num_offsets;
 
 		vacuum_delay_point();
 
 		blkno = iter_result->blkno;
 		vacrel->blkno = blkno;
 
+		num_offsets = TidStoreGetBlockOffsets(iter_result, offsets, lengthof(offsets));
+		Assert(num_offsets <= lengthof(offsets));
+
 		/*
 		 * Pin the visibility map page in case we need to mark the page
 		 * all-visible.  In most cases this will be very cheap, because we'll
@@ -2163,8 +2168,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
 								 vacrel->bstrategy);
 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-		lazy_vacuum_heap_page(vacrel, blkno, buf, iter_result->offsets,
-							  iter_result->num_offsets, vmbuffer);
+		lazy_vacuum_heap_page(vacrel, blkno, buf, offsets,
+							  num_offsets, vmbuffer);
 
 		/* Now that we've vacuumed the page, record its available space */
 		page = BufferGetPage(buf);
diff --git a/src/include/access/tidstore.h b/src/include/access/tidstore.h
index 32aa9995193..d95cabd7b5e 100644
--- a/src/include/access/tidstore.h
+++ b/src/include/access/tidstore.h
@@ -20,13 +20,14 @@
 typedef struct TidStore TidStore;
 typedef struct TidStoreIter TidStoreIter;
 
-/* Result struct for TidStoreIterateNext */
+/*
+ * Result struct for TidStoreIterateNext.  This is copyable, but should be
+ * treated as opaque.  Call TidStoreGetOffsets() to obtain the offsets.
+ */
 typedef struct TidStoreIterResult
 {
 	BlockNumber blkno;
-	int			max_offset;
-	int			num_offsets;
-	OffsetNumber *offsets;
+	void	   *internal_page;
 } TidStoreIterResult;
 
 extern TidStore *TidStoreCreateLocal(size_t max_bytes, bool insert_only);
@@ -42,6 +43,9 @@ extern void TidStoreSetBlockOffsets(TidStore *ts, BlockNumber blkno, OffsetNumbe
 extern bool TidStoreIsMember(TidStore *ts, ItemPointer tid);
 extern TidStoreIter *TidStoreBeginIterate(TidStore *ts);
 extern TidStoreIterResult *TidStoreIterateNext(TidStoreIter *iter);
+extern int	TidStoreGetBlockOffsets(TidStoreIterResult *result,
+									OffsetNumber *offsets,
+									int max_offsets);
 extern void TidStoreEndIterate(TidStoreIter *iter);
 extern size_t TidStoreMemoryUsage(TidStore *ts);
 extern dsa_pointer TidStoreGetHandle(TidStore *ts);
diff --git a/src/test/modules/test_tidstore/test_tidstore.c b/src/test/modules/test_tidstore/test_tidstore.c
index 0a3a58722de..f46a5cf4968 100644
--- a/src/test/modules/test_tidstore/test_tidstore.c
+++ b/src/test/modules/test_tidstore/test_tidstore.c
@@ -242,9 +242,14 @@ check_set_block_offsets(PG_FUNCTION_ARGS)
 	iter = TidStoreBeginIterate(tidstore);
 	while ((iter_result = TidStoreIterateNext(iter)) != NULL)
 	{
-		for (int i = 0; i < iter_result->num_offsets; i++)
+		OffsetNumber offsets[MaxOffsetNumber];
+		int			num_offsets;
+
+		num_offsets = TidStoreGetBlockOffsets(iter_result, offsets, lengthof(offsets));
+		Assert(num_offsets <= lengthof(offsets));
+		for (int i = 0; i < num_offsets; i++)
 			ItemPointerSet(&(items.iter_tids[num_iter_tids++]), iter_result->blkno,
-						   iter_result->offsets[i]);
+						   offsets[i]);
 	}
 	TidStoreEndIterate(iter);
 	TidStoreUnlock(tidstore);
-- 
2.44.0

From abf3f5c8dec6a2dc1b43af6458d6d5ae25db6d6b Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Tue, 27 Feb 2024 14:35:36 -0500
Subject: [PATCH v2 07/11] Use streaming I/O in VACUUM second pass.

Now vacuum's second pass, which removes dead items referring to dead
tuples collected in the first pass, uses a read stream that looks ahead
in the TidStore.

Originally developed by Melanie, refactored to work with the new
TidStore by Thomas.

Author: Melanie Plageman <melanieplage...@gmail.com>
Author: Thomas Munro <thomas.mu...@gmail.com>
---
 src/backend/access/heap/vacuumlazy.c | 38 +++++++++++++++++++++++-----
 1 file changed, 32 insertions(+), 6 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index e8ad79b41f9..8e757c313ee 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2100,6 +2100,24 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	return allindexes;
 }
 
+static BlockNumber
+vacuum_reap_lp_read_stream_next(ReadStream *stream,
+								void *callback_private_data,
+								void *per_buffer_data)
+{
+	TidStoreIter *iter = callback_private_data;
+	TidStoreIterResult *iter_result;
+
+	iter_result = TidStoreIterateNext(iter);
+	if (iter_result == NULL)
+		return InvalidBlockNumber;
+
+	/* Save the TidStoreIterResult for later, so we can extract the offsets. */
+	memcpy(per_buffer_data, iter_result, sizeof(*iter_result));
+
+	return iter_result->blkno;
+}
+
 /*
  *	lazy_vacuum_heap_rel() -- second pass over the heap for two pass strategy
  *
@@ -2120,6 +2138,8 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 static void
 lazy_vacuum_heap_rel(LVRelState *vacrel)
 {
+	Buffer		buf;
+	ReadStream *stream;
 	BlockNumber vacuumed_pages = 0;
 	Buffer		vmbuffer = InvalidBuffer;
 	LVSavedErrInfo saved_err_info;
@@ -2140,10 +2160,18 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 							 InvalidBlockNumber, InvalidOffsetNumber);
 
 	iter = TidStoreBeginIterate(vacrel->dead_items);
-	while ((iter_result = TidStoreIterateNext(iter)) != NULL)
+	stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE,
+										vacrel->bstrategy,
+										vacrel->rel,
+										MAIN_FORKNUM,
+										vacuum_reap_lp_read_stream_next,
+										iter,
+										sizeof(TidStoreIterResult));
+
+	while (BufferIsValid(buf = read_stream_next_buffer(stream,
+													   (void **) &iter_result)))
 	{
 		BlockNumber blkno;
-		Buffer		buf;
 		Page		page;
 		Size		freespace;
 		OffsetNumber offsets[MaxOffsetNumber];
@@ -2151,8 +2179,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 
 		vacuum_delay_point();
 
-		blkno = iter_result->blkno;
-		vacrel->blkno = blkno;
+		vacrel->blkno = blkno = BufferGetBlockNumber(buf);
 
 		num_offsets = TidStoreGetBlockOffsets(iter_result, offsets, lengthof(offsets));
 		Assert(num_offsets <= lengthof(offsets));
@@ -2165,8 +2192,6 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		visibilitymap_pin(vacrel->rel, blkno, &vmbuffer);
 
 		/* We need a non-cleanup exclusive lock to mark dead_items unused */
-		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
-								 vacrel->bstrategy);
 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
 		lazy_vacuum_heap_page(vacrel, blkno, buf, offsets,
 							  num_offsets, vmbuffer);
@@ -2179,6 +2204,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		RecordPageWithFreeSpace(vacrel->rel, blkno, freespace);
 		vacuumed_pages++;
 	}
+	read_stream_end(stream);
 	TidStoreEndIterate(iter);
 
 	vacrel->blkno = InvalidBlockNumber;
-- 
2.44.0

From 8be88f30c831bf22c7306c5db988dda7e57a5c4d Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 27 Apr 2024 12:20:07 +1200
Subject: [PATCH v2 08/11] Use streaming I/O in BAS to do write-behind.

freelist.c traditionally peformed a sort of one-by-one write-behind, by
cleaning the oldest buffer in the ring at the last moment before
recycling it.

Introduce a more explicit kind of write-behind, that begins once the
consumer of buffers has dirtied a buffer and is ready to unpin it.
Instead of calling ReleaseBuffer(), a new call StrategyReleaseBuffer()
can be used to feed dirty buffers back to the strategy as soon as
possible, so that it can feed them into a write stream.  The write
stream will release the pin some time later, but an interface is
provided at the stratetgy level to order it to drop all pins (eg before
truncating a relation, when you don't want pins hiding in that
pipeline).

The main benefit today is that the write stream generates vectored I/O
up to io_combine_limit and some attempts at avoiding WAL flush stalls.
The write stream also insulates code from future work to support
asynchronous I/O.

The new write stream is enabled only for BAS_BULKWRITE and BAS_VACUUM,
since those strategy types are expected to crash into their dirty data,
but it won't actually do anything yet.  Later commits will add in
StrategyReleaseBuffer() to a few places that can benefit from this
optimization.
---
 src/backend/storage/buffer/freelist.c | 142 ++++++++++++++++++++++++++
 src/include/storage/bufmgr.h          |   4 +
 2 files changed, 146 insertions(+)

diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 19797de31a9..5b88bef6b71 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -20,6 +20,7 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/proc.h"
+#include "storage/write_stream.h"
 
 #define INT_ACCESS_ONCE(var)	((int)(*((volatile int *)&(var))))
 
@@ -82,6 +83,20 @@ typedef struct BufferAccessStrategyData
 	 */
 	int			current;
 
+	/*
+	 * Index of the write-behind slot in the ring.  If the caller indicates
+	 * that this buffer is dirty by calling StrategyReleaseBuffer(), we'll
+	 * queue it up to be cleaned by write_stream.
+	 */
+	int			write_behind;
+
+	/*
+	 * For strategies with write-behind logic.  Handles correspond to buffer
+	 * position in buffers[] array.
+	 */
+	WriteStream *write_stream;
+	WriteStreamWriteHandle *write_stream_handles;
+
 	/*
 	 * Array of buffer numbers.  InvalidBuffer (that is, zero) indicates we
 	 * have not yet selected a buffer for this ring slot.  For allocation
@@ -610,6 +625,34 @@ GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb)
 	strategy->btype = btype;
 	strategy->nbuffers = ring_buffers;
 
+	/*
+	 * Enable write-behind for the stategies expected to be used by code that
+	 * dirties buffers.  This will only actually have a useful effect if
+	 * callers use StrategyReleaseBuffer() instead of ReleaseBuffer() when
+	 * they are finished with each buffer, and if they read, dirty and release
+	 * them in the same order.
+	 */
+	if (strategy->btype == BAS_VACUUM || strategy->btype == BAS_BULKWRITE)
+	{
+		/*
+		 * We don't want the write stream to be constrained to flush the WAL
+		 * any more often than the worst case without the stream.  Allow it to
+		 * defer enough writes for the whole ring to be dirty and need to be
+		 * cleaned before recycling the oldest buffer.  The write stream will
+		 * try to clean buffers sooner than that, though, if concurrent WAL
+		 * activity allows earlier flushing.
+		 */
+		strategy->write_stream = write_stream_begin(0, NULL, ring_buffers);
+
+		/*
+		 * We need a way to make sure that write-behind has definitely
+		 * finished cleaning a buffer before we recycle it.  Record a handle
+		 * for each buffer, so we can wait for I/O to complete, if necessary.
+		 */
+		strategy->write_stream_handles =
+			palloc0(sizeof(WriteStreamWriteHandle) * ring_buffers);
+	}
+
 	return strategy;
 }
 
@@ -682,7 +725,22 @@ FreeAccessStrategy(BufferAccessStrategy strategy)
 {
 	/* don't crash if called on a "default" strategy */
 	if (strategy != NULL)
+	{
+		if (strategy->write_stream_handles)
+			pfree(strategy->write_stream_handles);
+		if (strategy->write_stream)
+		{
+			/*
+			 * If someone uses a temporary strategy and throws it away without
+			 * calling StrategyWaitAll(), we'll reset the write stream here so
+			 * that we don't generate extra writes.  That way we avoid
+			 * generating I/O when only a small amount of data is touched.
+			 */
+			write_stream_reset(strategy->write_stream);
+			write_stream_end(strategy->write_stream);
+		}
 		pfree(strategy);
+	}
 }
 
 /*
@@ -712,6 +770,15 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
 	if (bufnum == InvalidBuffer)
 		return NULL;
 
+	/*
+	 * If we were doing write-behind and the ring is very small, we might need
+	 * to force a write.  Usually we expect it to be done already, but we have
+	 * to make sure that it's no longer pinned by the write stream.
+	 */
+	if (strategy->write_stream)
+		write_stream_wait(strategy->write_stream,
+						  strategy->write_stream_handles[strategy->current]);
+
 	/*
 	 * If the buffer is pinned we cannot use it under any circumstances.
 	 *
@@ -814,3 +881,78 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r
 
 	return true;
 }
+
+/*
+ * Return a buffer to a strategy while it is still pinned.  This gives the
+ * strategy an opportunity to implement write-behind before unpinning it.
+ */
+void
+StrategyReleaseBuffer(BufferAccessStrategy strategy, Buffer buffer)
+{
+	/*
+	 * If enabled for this strategy, and this isn't a repeated release of the
+	 * same buffer, then we can try to implement write-behind.
+	 */
+	if (strategy &&
+		strategy->write_stream &&
+		strategy->buffers[strategy->write_behind] != buffer)
+	{
+		while (strategy->write_behind != strategy->current)
+		{
+			/* Advance to next slot. */
+			strategy->write_behind++;
+			if (strategy->write_behind == strategy->nbuffers)
+				strategy->write_behind = 0;
+			/* Have we found the right buffer? */
+			if (strategy->buffers[strategy->write_behind] == buffer)
+			{
+				/*
+				 * Start writing this buffer out.  The actual writing will
+				 * happen some time later, after I/O combining and deferring
+				 * for a while to give the WAL time to be flushed first.
+				 * Record the handle, so we can make sure it is finished
+				 * before reusing the buffer again.  Ownership of the pin is
+				 * transferred to the stream.
+				 */
+				strategy->write_stream_handles[strategy->write_behind] =
+					write_stream_write_buffer(strategy->write_stream, buffer);
+				return;
+			}
+		}
+	}
+
+	/* Write-behind is not possible.  Just release the buffer immediately. */
+	ReleaseBuffer(buffer);
+}
+
+/*
+ * Unlock a buffer, then return it still pinned to a strategy.
+ */
+void
+StrategyUnlockReleaseBuffer(BufferAccessStrategy strategy, Buffer buffer)
+{
+	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+	StrategyReleaseBuffer(strategy, buffer);
+}
+
+/*
+ * Abandon any write-behind work, releasing any pins held by the strategy.
+ * This is useful before vacuum drops buffers to truncate a relation, throwing
+ * away dirty data.
+ */
+void
+StrategyReset(BufferAccessStrategy strategy)
+{
+	if (strategy && strategy->write_stream)
+		write_stream_reset(strategy->write_stream);
+}
+
+/*
+ * Finish any write-behind work, releasing any pins held by the strategy.
+ */
+void
+StrategyWaitAll(BufferAccessStrategy strategy)
+{
+	if (strategy && strategy->write_stream)
+		write_stream_wait_all(strategy->write_stream);
+}
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 5db927bc497..b745024d50e 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -324,6 +324,10 @@ extern BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType b
 													  int ring_size_kb);
 extern int	GetAccessStrategyBufferCount(BufferAccessStrategy strategy);
 extern int	GetAccessStrategyPinLimit(BufferAccessStrategy strategy);
+extern void StrategyReleaseBuffer(BufferAccessStrategy strategy, Buffer buffer);
+extern void StrategyUnlockReleaseBuffer(BufferAccessStrategy strategy, Buffer buffer);
+extern void StrategyReset(BufferAccessStrategy strategy);
+extern void StrategyWaitAll(BufferAccessStrategy strategy);
 
 extern void FreeAccessStrategy(BufferAccessStrategy strategy);
 
-- 
2.44.0

From 0305b337c7f86f92d571e90789bfb9d021941d5f Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 27 Apr 2024 11:41:45 +1200
Subject: [PATCH v2 09/11] Use streaming I/O in COPY via write-behind.

Since COPY uses BAS_BULKWRITE, in order to enable streaming write-behind
we just have to hand dirty buffers back to the BufferAccessStrategy.
---
 src/backend/access/heap/heapam.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 4a4cf76269d..a8055087dca 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2559,7 +2559,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 							  VISIBILITYMAP_ALL_VISIBLE | VISIBILITYMAP_ALL_FROZEN);
 		}
 
-		UnlockReleaseBuffer(buffer);
+		StrategyUnlockReleaseBuffer(bistate ? bistate->strategy : NULL, buffer);
 		ndone += nthispage;
 
 		/*
-- 
2.44.0

From 32da8fe04cf6f131c45a35378458190d6f7d032c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 27 Apr 2024 12:18:15 +1200
Subject: [PATCH v2 10/11] Use streaming I/O in VACUUM via write-behind.

The actual streaming I/O is managed by the BufferAccessStrategy, but to
enable that we have to give it dirty buffers and be careful to force it
to unpin anything we might truncate.
---
 src/backend/access/heap/vacuumlazy.c | 9 ++++++---
 src/backend/commands/vacuum.c        | 5 +++++
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 8e757c313ee..1abf0831206 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -1016,7 +1016,7 @@ lazy_scan_heap(LVRelState *vacrel)
 		{
 			Size		freespace = PageGetHeapFreeSpace(page);
 
-			UnlockReleaseBuffer(buf);
+			StrategyUnlockReleaseBuffer(vacrel->bstrategy, buf);
 			RecordPageWithFreeSpace(vacrel->rel, blkno, freespace);
 
 			/*
@@ -1034,7 +1034,7 @@ lazy_scan_heap(LVRelState *vacrel)
 			}
 		}
 		else
-			UnlockReleaseBuffer(buf);
+			StrategyUnlockReleaseBuffer(vacrel->bstrategy, buf);
 	}
 
 	vacrel->blkno = InvalidBlockNumber;
@@ -2200,7 +2200,7 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		page = BufferGetPage(buf);
 		freespace = PageGetHeapFreeSpace(page);
 
-		UnlockReleaseBuffer(buf);
+		StrategyUnlockReleaseBuffer(vacrel->bstrategy, buf);
 		RecordPageWithFreeSpace(vacrel->rel, blkno, freespace);
 		vacuumed_pages++;
 	}
@@ -2687,6 +2687,9 @@ lazy_truncate_heap(LVRelState *vacrel)
 			return;
 		}
 
+		/* Throw away write-behind work, to get rid of held pins. */
+		StrategyReset(vacrel->bstrategy);
+
 		/*
 		 * Okay to truncate.
 		 */
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index b589279d49f..7a91f6f87e8 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -639,8 +639,10 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy,
 				analyze_rel(vrel->oid, vrel->relation, params,
 							vrel->va_cols, in_outer_xact, bstrategy);
 
+
 				if (use_own_xacts)
 				{
+					StrategyWaitAll(bstrategy);
 					PopActiveSnapshot();
 					CommitTransactionCommand();
 				}
@@ -2216,6 +2218,9 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params,
 	if (rel)
 		relation_close(rel, NoLock);
 
+	/* Make sure the stategy isn't holding any pins. */
+	StrategyWaitAll(bstrategy);
+
 	/*
 	 * Complete the transaction and free all temporary memory used.
 	 */
-- 
2.44.0

From f1ad3f4b80ad0f7bec0604fb7187880c367d0451 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 27 Apr 2024 13:57:55 +1200
Subject: [PATCH v2 11/11] Use streaming I/O in DELETE via write-behind. XXX

XXX This changes heapam.c to use BAS_BULKWRITE.  Perhaps we really want
an adaptive-size ring strategy, that starts as BAS_BULKREAD, but expands
towards BAS_BULKWRITE size when it hits dirty data?
---
 src/backend/access/heap/heapam.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a8055087dca..6e9720af7e1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -340,7 +340,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	{
 		/* During a rescan, keep the previous strategy object. */
 		if (scan->rs_strategy == NULL)
-			scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD);
+			scan->rs_strategy = GetAccessStrategy(BAS_BULKWRITE); /* XXX hack */
 	}
 	else
 	{
@@ -588,7 +588,7 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
 	/* release previous scan buffer, if any */
 	if (BufferIsValid(scan->rs_cbuf))
 	{
-		ReleaseBuffer(scan->rs_cbuf);
+		StrategyReleaseBuffer(scan->rs_strategy, scan->rs_cbuf);
 		scan->rs_cbuf = InvalidBuffer;
 	}
 
-- 
2.44.0

Reply via email to