From 19abe210241cf48142c2e1b2e3ae5a7ba34921e4 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavuz81@gmail.com>
Date: Sat, 29 Mar 2025 20:17:42 +0300
Subject: [PATCH v7 1/2] Optimize autoprewarm with read streams

We've measured 10% performance improvement, and this arranges to benefit
automatically from future optimizations to the read_stream subsystem.
---
 contrib/pg_prewarm/autoprewarm.c | 192 ++++++++++++++++++++++---------
 1 file changed, 139 insertions(+), 53 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 73485a2323c..68c99664de5 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -41,6 +41,7 @@
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
@@ -421,6 +422,93 @@ apw_load_buffers(void)
 				(errmsg("autoprewarm successfully prewarmed %d of %d previously-loaded blocks",
 						apw_state->prewarmed_blocks, num_elements)));
 }
+struct apw_read_stream_private
+{
+	BlockInfoRecord *block_info;
+	BlockNumber nblocks;
+	BlockNumber max_pos;
+	BlockNumber pos;
+	Oid			cur_database;
+	ForkNumber	cur_forknum;
+	RelFileNumber cur_filenumber;
+};
+
+static BlockNumber
+awp_read_stream_next_block(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	struct apw_read_stream_private *p = callback_private_data;
+	BlockInfoRecord cur_blk = p->block_info[p->pos];
+	BlockNumber blocknum = InvalidBlockNumber;
+
+	if (have_free_buffer() &&
+		p->pos < p->max_pos &&
+		cur_blk.blocknum < p->nblocks &&
+		cur_blk.database == p->cur_database &&
+		cur_blk.forknum == p->cur_forknum &&
+		cur_blk.filenumber == p->cur_filenumber)
+	{
+		blocknum = cur_blk.blocknum;
+	}
+
+	(p->pos)++;
+	return blocknum;
+}
+
+/*
+ * Helper function to prewarm buffers in a relation by using read streams.
+ */
+static unsigned int
+autoprewarm_prewarm_relation(Relation rel,
+							 BlockNumber pos,
+							 BlockNumber max_pos,
+							 BlockNumber nblocks_in_fork,
+							 BlockInfoRecord *block_info)
+{
+	struct apw_read_stream_private p;
+	ReadStream *stream;
+	unsigned int blocks_done = 0;
+	BlockInfoRecord first_block = block_info[pos];
+
+	p.pos = pos;
+	p.max_pos = max_pos;
+	p.block_info = block_info;
+	p.nblocks = nblocks_in_fork;
+	p.cur_database = first_block.database;
+	p.cur_forknum = first_block.forknum;
+	p.cur_filenumber = first_block.filenumber;
+
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										NULL,
+										rel,
+										first_block.forknum,
+										awp_read_stream_next_block,
+										&p,
+										0);
+
+	while (true)
+	{
+		Buffer		buf;
+
+		CHECK_FOR_INTERRUPTS();
+
+		buf = read_stream_next_buffer(stream, NULL);
+		if (BufferIsValid(buf))
+		{
+			ReleaseBuffer(buf);
+			++blocks_done;
+		}
+		else
+		{
+			Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+			read_stream_end(stream);
+			break;
+		}
+	}
+
+	return blocks_done;
+}
 
 /*
  * Prewarm all blocks for one database (and possibly also global objects, if
@@ -432,9 +520,10 @@ autoprewarm_database_main(Datum main_arg)
 	int			pos;
 	BlockInfoRecord *block_info;
 	Relation	rel = NULL;
-	BlockNumber nblocks = 0;
-	BlockInfoRecord *old_blk = NULL;
 	dsm_segment *seg;
+	Oid			cur_database;
+	Oid			cur_filenumber = InvalidOid;
+	BlockNumber nblocks_in_fork = InvalidBlockNumber;
 
 	/* Establish signal handlers; once that's done, unblock signals. */
 	pqsignal(SIGTERM, die);
@@ -451,32 +540,44 @@ autoprewarm_database_main(Datum main_arg)
 	block_info = (BlockInfoRecord *) dsm_segment_address(seg);
 	pos = apw_state->prewarm_start_idx;
 
-	/*
-	 * Loop until we run out of blocks to prewarm or until we run out of free
-	 * buffers.
-	 */
-	while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
+	cur_database = block_info[pos].database;
+
+	/* Loop until we run out of blocks to prewarm. */
+	while (pos < apw_state->prewarm_stop_idx)
 	{
-		BlockInfoRecord *blk = &block_info[pos++];
-		Buffer		buf;
+		BlockInfoRecord *blk = &block_info[pos];
 
-		CHECK_FOR_INTERRUPTS();
+		/* Loop until we run  we run out of free buffers. */
+		if (!have_free_buffer())
+			break;
 
 		/*
 		 * Quit if we've reached records for another database. If previous
 		 * blocks are of some global objects, then continue pre-warming.
 		 */
-		if (old_blk != NULL && old_blk->database != blk->database &&
-			old_blk->database != 0)
-			break;
+		if (cur_database != blk->database)
+		{
+			if (cur_database == 0)
+				cur_database = blk->database;
+			else
+				break;
+		}
+
+		/* Check whether blocknum is valid and within fork file size. */
+		if (cur_filenumber == blk->filenumber &&
+			blk->blocknum >= nblocks_in_fork)
+		{
+			/* Move to next forknum. */
+			pos++;
+			continue;
+		}
 
 		/*
 		 * As soon as we encounter a block of a new relation, close the old
 		 * relation. Note that rel will be NULL if try_relation_open failed
 		 * previously; in that case, there is nothing to close.
 		 */
-		if (old_blk != NULL && old_blk->filenumber != blk->filenumber &&
-			rel != NULL)
+		if (rel && cur_filenumber != blk->filenumber)
 		{
 			relation_close(rel, AccessShareLock);
 			rel = NULL;
@@ -487,60 +588,45 @@ autoprewarm_database_main(Datum main_arg)
 		 * Try to open each new relation, but only once, when we first
 		 * encounter it. If it's been dropped, skip the associated blocks.
 		 */
-		if (old_blk == NULL || old_blk->filenumber != blk->filenumber)
+		if (!rel && cur_filenumber != blk->filenumber)
 		{
 			Oid			reloid;
 
-			Assert(rel == NULL);
 			StartTransactionCommand();
 			reloid = RelidByRelfilenumber(blk->tablespace, blk->filenumber);
 			if (OidIsValid(reloid))
 				rel = try_relation_open(reloid, AccessShareLock);
 
-			if (!rel)
-				CommitTransactionCommand();
-		}
-		if (!rel)
-		{
-			old_blk = blk;
-			continue;
-		}
-
-		/* Once per fork, check for fork existence and size. */
-		if (old_blk == NULL ||
-			old_blk->filenumber != blk->filenumber ||
-			old_blk->forknum != blk->forknum)
-		{
 			/*
-			 * smgrexists is not safe for illegal forknum, hence check whether
-			 * the passed forknum is valid before using it in smgrexists.
+			 * Update cur_filenumber although relation may not be opened. If
+			 * not updated and if we can't open the relation when the file
+			 * number is changed; we will end up unnecessarily trying to open
+			 * relation for all the blocks that have the same file number.
 			 */
-			if (blk->forknum > InvalidForkNumber &&
-				blk->forknum <= MAX_FORKNUM &&
-				smgrexists(RelationGetSmgr(rel), blk->forknum))
-				nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
-			else
-				nblocks = 0;
-		}
+			cur_filenumber = blk->filenumber;
 
-		/* Check whether blocknum is valid and within fork file size. */
-		if (blk->blocknum >= nblocks)
-		{
-			/* Move to next forknum. */
-			old_blk = blk;
-			continue;
+			if (!rel)
+				CommitTransactionCommand();
 		}
 
-		/* Prewarm buffer. */
-		buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-								 NULL);
-		if (BufferIsValid(buf))
+		if (rel && smgrexists(RelationGetSmgr(rel), blk->forknum))
 		{
-			apw_state->prewarmed_blocks++;
-			ReleaseBuffer(buf);
+			unsigned int nblocks_processed;
+
+			nblocks_in_fork = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
+			nblocks_processed = autoprewarm_prewarm_relation(rel,
+															 pos,
+															 apw_state->prewarm_stop_idx,
+															 nblocks_in_fork,
+															 block_info);
+
+			apw_state->prewarmed_blocks += nblocks_processed;
+			/* Move pos forward by at least one */
+			pos += Max(nblocks_processed, 1);
+			continue;
 		}
 
-		old_blk = blk;
+		pos++;
 	}
 
 	dsm_detach(seg);
-- 
2.43.0

