From 7e2f610e53cfe097dbc041e09915b8fc5da98c4e Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavuz81@gmail.com>
Date: Tue, 5 Nov 2024 11:40:14 +0300
Subject: [PATCH v6 1/2] Optimize autoprewarm with read streams

We've measured 10% performance improvement, and this arranges to benefit
automatically from future optimizations to the read_stream subsystem.
---
 contrib/pg_prewarm/autoprewarm.c | 117 +++++++++++++++++++++++++++++--
 1 file changed, 112 insertions(+), 5 deletions(-)

diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c
index 73485a2323c..a21d9571dff 100644
--- a/contrib/pg_prewarm/autoprewarm.c
+++ b/contrib/pg_prewarm/autoprewarm.c
@@ -41,6 +41,7 @@
 #include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procsignal.h"
+#include "storage/read_stream.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/guc.h"
@@ -422,6 +423,67 @@ apw_load_buffers(void)
 						apw_state->prewarmed_blocks, num_elements)));
 }
 
+struct apw_read_stream_private
+{
+	bool		first_block;
+	int			max_pos;
+	int			pos;
+	BlockInfoRecord *block_info;
+	BlockNumber nblocks_in_fork;
+
+};
+
+static BlockNumber
+apw_read_stream_next_block(ReadStream *stream,
+						   void *callback_private_data,
+						   void *per_buffer_data)
+{
+	struct apw_read_stream_private *p = callback_private_data;
+	BlockInfoRecord *old_blk;
+	BlockInfoRecord *cur_blk;
+
+	/*
+	 * There may still be queued blocks in the stream even when no free
+	 * buffers are available in the buffer pool. This can lead to unnecessary
+	 * I/O operations and buffer evictions. One possible solution is to
+	 * compare the number of free buffers in the buffer pool with the number
+	 * of queued blocks in the stream. However, this approach is considered a
+	 * workaround and would add complexity with minimal benefit, as only a few
+	 * unnecessary I/O operations and buffer evictions are expected.
+	 * Therefore, this solution has not been implemented.
+	 */
+	if (!have_free_buffer())
+		return InvalidBlockNumber;
+
+	if (p->pos == p->max_pos)
+		return InvalidBlockNumber;
+
+	cur_blk = &(p->block_info[p->pos]);
+
+	if (cur_blk->blocknum >= p->nblocks_in_fork)
+		return InvalidBlockNumber;
+
+	if (p->first_block)
+	{
+		p->first_block = false;
+		p->pos++;
+		return cur_blk->blocknum;
+	}
+
+	Assert(p->pos > 0 && p->pos < p->max_pos);
+	old_blk = &(p->block_info[p->pos - 1]);
+
+	if (old_blk->database == cur_blk->database &&
+		old_blk->forknum == cur_blk->forknum &&
+		old_blk->filenumber == cur_blk->filenumber)
+	{
+		p->pos++;
+		return cur_blk->blocknum;
+	}
+
+	return InvalidBlockNumber;
+}
+
 /*
  * Prewarm all blocks for one database (and possibly also global objects, if
  * those got grouped with this database).
@@ -435,6 +497,8 @@ autoprewarm_database_main(Datum main_arg)
 	BlockNumber nblocks = 0;
 	BlockInfoRecord *old_blk = NULL;
 	dsm_segment *seg;
+	ReadStream *stream = NULL;
+	struct apw_read_stream_private p;
 
 	/* Establish signal handlers; once that's done, unblock signals. */
 	pqsignal(SIGTERM, die);
@@ -451,13 +515,16 @@ autoprewarm_database_main(Datum main_arg)
 	block_info = (BlockInfoRecord *) dsm_segment_address(seg);
 	pos = apw_state->prewarm_start_idx;
 
+	p.block_info = block_info;
+	p.max_pos = apw_state->prewarm_stop_idx;
+
 	/*
 	 * Loop until we run out of blocks to prewarm or until we run out of free
 	 * buffers.
 	 */
-	while (pos < apw_state->prewarm_stop_idx && have_free_buffer())
+	for (; pos < apw_state->prewarm_stop_idx && have_free_buffer(); pos++)
 	{
-		BlockInfoRecord *blk = &block_info[pos++];
+		BlockInfoRecord *blk = &block_info[pos];
 		Buffer		buf;
 
 		CHECK_FOR_INTERRUPTS();
@@ -470,6 +537,18 @@ autoprewarm_database_main(Datum main_arg)
 			old_blk->database != 0)
 			break;
 
+		/*
+		 * If stream needs to be created again, end it before closing the old
+		 * relation.
+		 */
+		if (stream && (old_blk == NULL ||
+					   old_blk->filenumber != blk->filenumber ||
+					   old_blk->forknum != blk->forknum))
+		{
+			Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+			read_stream_end(stream);
+		}
+
 		/*
 		 * As soon as we encounter a block of a new relation, close the old
 		 * relation. Note that rel will be NULL if try_relation_open failed
@@ -506,7 +585,10 @@ autoprewarm_database_main(Datum main_arg)
 			continue;
 		}
 
-		/* Once per fork, check for fork existence and size. */
+		/*
+		 * Once per fork, check for fork existence and size. Then create read
+		 * stream if it is suitable.
+		 */
 		if (old_blk == NULL ||
 			old_blk->filenumber != blk->filenumber ||
 			old_blk->forknum != blk->forknum)
@@ -518,7 +600,27 @@ autoprewarm_database_main(Datum main_arg)
 			if (blk->forknum > InvalidForkNumber &&
 				blk->forknum <= MAX_FORKNUM &&
 				smgrexists(RelationGetSmgr(rel), blk->forknum))
+			{
 				nblocks = RelationGetNumberOfBlocksInFork(rel, blk->forknum);
+
+				/* Create read stream. */
+				p.nblocks_in_fork = nblocks;
+				p.pos = pos;
+
+				/*
+				 * There is a special case for the first block in the
+				 * relation. We can't compare it with the previous block as
+				 * there is no previous block yet.
+				 */
+				p.first_block = true;
+				stream = read_stream_begin_relation(READ_STREAM_FULL,
+													NULL,
+													rel,
+													blk->forknum,
+													apw_read_stream_next_block,
+													&p,
+													0);
+			}
 			else
 				nblocks = 0;
 		}
@@ -532,8 +634,7 @@ autoprewarm_database_main(Datum main_arg)
 		}
 
 		/* Prewarm buffer. */
-		buf = ReadBufferExtended(rel, blk->forknum, blk->blocknum, RBM_NORMAL,
-								 NULL);
+		buf = read_stream_next_buffer(stream, NULL);
 		if (BufferIsValid(buf))
 		{
 			apw_state->prewarmed_blocks++;
@@ -543,6 +644,12 @@ autoprewarm_database_main(Datum main_arg)
 		old_blk = blk;
 	}
 
+	if (stream)
+	{
+		Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+		read_stream_end(stream);
+	}
+
 	dsm_detach(seg);
 
 	/* Release lock on previous relation. */
-- 
2.43.0

