Several places bypass the buffer manager and use direct smgrextend() calls to populate a new relation: Index AM build methods, rewriteheap.c and RelationCopyStorage(). There's fair amount of duplicated code to WAL-log the pages, calculate checksums, call smgrextend(), and finally call smgrimmedsync() if needed. The duplication is tedious and error-prone. For example, if we want to optimize by WAL-logging multiple pages in one record, that needs to be implemented in each AM separately. Currently only sorted GiST index build does that but it would be equally beneficial in all of those places.

And I believe we got the smgrimmedsync() logic slightly wrong in a number of places [1]. And it's not great for latency, we could let the checkpointer do the fsyncing lazily, like Robert mentioned in the same thread.

The attached patch centralizes that pattern to a new bulk writing facility, and changes all those AMs to use it. The facility buffers 32 pages and WAL-logs them in record, calculates checksums. You could imagine a lot of further optimizations, like writing those 32 pages in one vectored pvwrite() call [2], and not skipping the buffer manager when the relation is small. But the scope of this initial version is mostly to refactor the existing code.

One new optimization included here is to let the checkpointer do the fsyncing if possible. That gives a big speedup when e.g. restoring a schema-only dump with lots of relations.

[1] https://www.postgresql.org/message-id/58effc10-c160-b4a6-4eb7-384e95e6f9e3%40iki.fi

[2] https://www.postgresql.org/message-id/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6ut5tum2g...@mail.gmail.com

--
Heikki Linnakangas
Neon (https://neon.tech)
From 33f5cafc1512e8a004df2d506da71dfbbef5c60d Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Tue, 19 Sep 2023 18:09:34 +0300
Subject: [PATCH v1 1/1] Introduce a new bulk loading facility.

The new facility makes it easier to optimize bulk loading, as the
logic for buffering, WAL-logging, and syncing the relation only needs
to be implemented once. It's also less error-prone: We have had a
number of bugs in how a relation is fsync'd - or not - at the end of a
bulk loading operation. By centralizing that logic to one place, we
only need to write it correctly once.

The new facility is faster for small relations: Instead of of calling
smgrimmedsync(), we register the fsync to happen at next checkpoint,
which avoids the fsync latency. That can make a big difference if you
are e.g. restoring a schema-only dump with lots of relations.

It is also slightly more efficient with large relations, as the WAL
logging is performed multiple pages at a time. That avoids some WAL
header overhead. The sorted GiST index build did that already, this
moves the buffering to the new facility.

The changes to pageinspect GiST test needs an explanation: Before this
patch, the sorted GiST index build set the LSN on every page to the
special GistBuildLSN value, not the LSN of the WAL record, even though
they were WAL-logged. There was no particular need for it, it just
happened naturally when we wrote out the pages before WAL-logging
them. Now we WAL-log the pages first, like in B-tree build, so the
pages are stamped with the record's real LSN. When the build is not
WAL-logged, we still use GistBuildLSN. To make the test output
predictable, use an unlogged index.
---
 src/backend/access/gist/gistbuild.c   | 111 ++-------
 src/backend/access/heap/rewriteheap.c |  71 ++----
 src/backend/access/nbtree/nbtree.c    |  29 +--
 src/backend/access/nbtree/nbtsort.c   | 102 ++------
 src/backend/access/spgist/spginsert.c |  49 ++--
 src/backend/catalog/storage.c         |  38 +--
 src/backend/storage/smgr/Makefile     |   1 +
 src/backend/storage/smgr/bulk_write.c | 334 ++++++++++++++++++++++++++
 src/backend/storage/smgr/md.c         |  43 ++++
 src/backend/storage/smgr/meson.build  |   1 +
 src/backend/storage/smgr/smgr.c       |  31 +++
 src/include/storage/bulk_write.h      |  28 +++
 src/include/storage/md.h              |   1 +
 src/include/storage/smgr.h            |   1 +
 14 files changed, 535 insertions(+), 305 deletions(-)
 create mode 100644 src/backend/storage/smgr/bulk_write.c
 create mode 100644 src/include/storage/bulk_write.h

diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index 5e0c1447f92..950b6046ac5 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -43,7 +43,8 @@
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "storage/bufmgr.h"
-#include "storage/smgr.h"
+#include "storage/bulk_write.h"
+
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/tuplesort.h"
@@ -106,11 +107,8 @@ typedef struct
 	Tuplesortstate *sortstate;	/* state data for tuplesort.c */
 
 	BlockNumber pages_allocated;
-	BlockNumber pages_written;
 
-	int			ready_num_pages;
-	BlockNumber ready_blknos[XLR_MAX_BLOCK_ID];
-	Page		ready_pages[XLR_MAX_BLOCK_ID];
+	BulkWriteState *bulkw;
 } GISTBuildState;
 
 #define GIST_SORTED_BUILD_PAGE_NUM 4
@@ -142,7 +140,6 @@ static void gist_indexsortbuild_levelstate_add(GISTBuildState *state,
 											   IndexTuple itup);
 static void gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 												 GistSortedBuildLevelState *levelstate);
-static void gist_indexsortbuild_flush_ready_pages(GISTBuildState *state);
 
 static void gistInitBuffering(GISTBuildState *buildstate);
 static int	calculatePagesPerBuffer(GISTBuildState *buildstate, int levelStep);
@@ -407,21 +404,13 @@ gist_indexsortbuild(GISTBuildState *state)
 	GistSortedBuildLevelState *levelstate;
 	Page		page;
 
-	state->pages_allocated = 0;
-	state->pages_written = 0;
-	state->ready_num_pages = 0;
+	/* Reserve block 0 for the root page */
+	state->pages_allocated = 1;
 
-	/*
-	 * Write an empty page as a placeholder for the root page. It will be
-	 * replaced with the real root page at the end.
-	 */
-	page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO);
-	smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			   page, true);
-	state->pages_allocated++;
-	state->pages_written++;
+	state->bulkw = bulkw_start_rel(state->indexrel, MAIN_FORKNUM);
 
 	/* Allocate a temporary buffer for the first leaf page batch. */
+	page = bulkw_alloc_buf(state->bulkw);
 	levelstate = palloc0(sizeof(GistSortedBuildLevelState));
 	levelstate->pages[0] = page;
 	levelstate->parent = NULL;
@@ -455,31 +444,13 @@ gist_indexsortbuild(GISTBuildState *state)
 		levelstate = parent;
 	}
 
-	gist_indexsortbuild_flush_ready_pages(state);
-
 	/* Write out the root */
 	PageSetLSN(levelstate->pages[0], GistBuildLSN);
-	PageSetChecksumInplace(levelstate->pages[0], GIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			  levelstate->pages[0], true);
-	if (RelationNeedsWAL(state->indexrel))
-		log_newpage(&state->indexrel->rd_locator, MAIN_FORKNUM, GIST_ROOT_BLKNO,
-					levelstate->pages[0], true);
-
-	pfree(levelstate->pages[0]);
+	bulkw_write(state->bulkw, GIST_ROOT_BLKNO, levelstate->pages[0], true);
+
 	pfree(levelstate);
 
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
-	if (RelationNeedsWAL(state->indexrel))
-		smgrimmedsync(RelationGetSmgr(state->indexrel), MAIN_FORKNUM);
+	bulkw_finish(state->bulkw);
 }
 
 /*
@@ -510,7 +481,7 @@ gist_indexsortbuild_levelstate_add(GISTBuildState *state,
 
 		if (levelstate->pages[levelstate->current_page] == NULL)
 			levelstate->pages[levelstate->current_page] =
-				palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+				bulkw_alloc_buf(state->bulkw);
 
 		newPage = levelstate->pages[levelstate->current_page];
 		gistinitpage(newPage, old_page_flags);
@@ -580,7 +551,7 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 
 		/* Create page and copy data */
 		data = (char *) (dist->list);
-		target = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO);
+		target = bulkw_alloc_buf(state->bulkw);
 		gistinitpage(target, isleaf ? F_LEAF : 0);
 		for (int i = 0; i < dist->block.num; i++)
 		{
@@ -593,20 +564,6 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 		}
 		union_tuple = dist->itup;
 
-		if (state->ready_num_pages == XLR_MAX_BLOCK_ID)
-			gist_indexsortbuild_flush_ready_pages(state);
-
-		/*
-		 * The page is now complete. Assign a block number to it, and add it
-		 * to the list of finished pages. (We don't write it out immediately,
-		 * because we want to WAL-log the pages in batches.)
-		 */
-		blkno = state->pages_allocated++;
-		state->ready_blknos[state->ready_num_pages] = blkno;
-		state->ready_pages[state->ready_num_pages] = target;
-		state->ready_num_pages++;
-		ItemPointerSetBlockNumber(&(union_tuple->t_tid), blkno);
-
 		/*
 		 * Set the right link to point to the previous page. This is just for
 		 * debugging purposes: GiST only follows the right link if a page is
@@ -621,6 +578,15 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 		 */
 		if (levelstate->last_blkno)
 			GistPageGetOpaque(target)->rightlink = levelstate->last_blkno;
+
+		/*
+		 * The page is now complete. Assign a block number to it, and pass it
+		 * to the bulk writer.
+		 */
+		blkno = state->pages_allocated++;
+		PageSetLSN(target, GistBuildLSN);
+		bulkw_write(state->bulkw, blkno, target, true);
+		ItemPointerSetBlockNumber(&(union_tuple->t_tid), blkno);
 		levelstate->last_blkno = blkno;
 
 		/*
@@ -631,7 +597,7 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 		if (parent == NULL)
 		{
 			parent = palloc0(sizeof(GistSortedBuildLevelState));
-			parent->pages[0] = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+			parent->pages[0] = bulkw_alloc_buf(state->bulkw);
 			parent->parent = NULL;
 			gistinitpage(parent->pages[0], 0);
 
@@ -641,39 +607,6 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
 	}
 }
 
-static void
-gist_indexsortbuild_flush_ready_pages(GISTBuildState *state)
-{
-	if (state->ready_num_pages == 0)
-		return;
-
-	for (int i = 0; i < state->ready_num_pages; i++)
-	{
-		Page		page = state->ready_pages[i];
-		BlockNumber blkno = state->ready_blknos[i];
-
-		/* Currently, the blocks must be buffered in order. */
-		if (blkno != state->pages_written)
-			elog(ERROR, "unexpected block number to flush GiST sorting build");
-
-		PageSetLSN(page, GistBuildLSN);
-		PageSetChecksumInplace(page, blkno);
-		smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, blkno, page,
-				   true);
-
-		state->pages_written++;
-	}
-
-	if (RelationNeedsWAL(state->indexrel))
-		log_newpages(&state->indexrel->rd_locator, MAIN_FORKNUM, state->ready_num_pages,
-					 state->ready_blknos, state->ready_pages, true);
-
-	for (int i = 0; i < state->ready_num_pages; i++)
-		pfree(state->ready_pages[i]);
-
-	state->ready_num_pages = 0;
-}
-
 
 /*-------------------------------------------------------------------------
  * Routines for non-sorted build
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 424958912c7..9d7c7042e68 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -87,8 +87,8 @@
  * is optimized for bulk inserting a lot of tuples, knowing that we have
  * exclusive access to the heap.  raw_heap_insert builds new pages in
  * local storage.  When a page is full, or at the end of the process,
- * we insert it to WAL as a single record and then write it to disk
- * directly through smgr.  Note, however, that any data sent to the new
+ * we insert it to WAL as a single record and then write it to disk with
+ * the bulk smgr writer.  Note, however, that any data sent to the new
  * heap's TOAST table will go through the normal bufmgr.
  *
  *
@@ -119,9 +119,9 @@
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
+#include "storage/bulk_write.h"
 #include "storage/fd.h"
 #include "storage/procarray.h"
-#include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -133,9 +133,11 @@ typedef struct RewriteStateData
 {
 	Relation	rs_old_rel;		/* source heap */
 	Relation	rs_new_rel;		/* destination heap */
+
+	BulkWriteState *rs_bulkw;
+
 	Page		rs_buffer;		/* page currently being built */
 	BlockNumber rs_blockno;		/* block where page will go */
-	bool		rs_buffer_valid;	/* T if any tuples in buffer */
 	bool		rs_logical_rewrite; /* do we need to do logical rewriting */
 	TransactionId rs_oldest_xmin;	/* oldest xmin used by caller to determine
 									 * tuple visibility */
@@ -255,15 +257,16 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 
 	state->rs_old_rel = old_heap;
 	state->rs_new_rel = new_heap;
-	state->rs_buffer = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	state->rs_buffer = NULL;
 	/* new_heap needn't be empty, just locked */
 	state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
-	state->rs_buffer_valid = false;
 	state->rs_oldest_xmin = oldest_xmin;
 	state->rs_freeze_xid = freeze_xid;
 	state->rs_cutoff_multi = cutoff_multi;
 	state->rs_cxt = rw_cxt;
 
+	state->rs_bulkw = bulkw_start_rel(new_heap, MAIN_FORKNUM);
+
 	/* Initialize hash tables used to track update chains */
 	hash_ctl.keysize = sizeof(TidHashKey);
 	hash_ctl.entrysize = sizeof(UnresolvedTupData);
@@ -314,30 +317,13 @@ end_heap_rewrite(RewriteState state)
 	}
 
 	/* Write the last page, if any */
-	if (state->rs_buffer_valid)
+	if (state->rs_buffer)
 	{
-		if (RelationNeedsWAL(state->rs_new_rel))
-			log_newpage(&state->rs_new_rel->rd_locator,
-						MAIN_FORKNUM,
-						state->rs_blockno,
-						state->rs_buffer,
-						true);
-
-		PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
-
-		smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-				   state->rs_blockno, state->rs_buffer, true);
+		bulkw_write(state->rs_bulkw, state->rs_blockno, state->rs_buffer, true);
+		state->rs_buffer = NULL;
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is the same as in storage.c's RelationCopyStorage(): we're
-	 * writing data that's not in shared buffers, and so a CHECKPOINT
-	 * occurring during the rewriteheap operation won't have fsync'd data we
-	 * wrote before the checkpoint.
-	 */
-	if (RelationNeedsWAL(state->rs_new_rel))
-		smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
+	bulkw_finish(state->rs_bulkw);
 
 	logical_end_heap_rewrite(state);
 
@@ -611,7 +597,7 @@ rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
 static void
 raw_heap_insert(RewriteState state, HeapTuple tup)
 {
-	Page		page = state->rs_buffer;
+	Page		page;
 	Size		pageFreeSpace,
 				saveFreeSpace;
 	Size		len;
@@ -664,7 +650,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 												   HEAP_DEFAULT_FILLFACTOR);
 
 	/* Now we can check to see if there's enough free space already. */
-	if (state->rs_buffer_valid)
+	page = state->rs_buffer;
+	if (page)
 	{
 		pageFreeSpace = PageGetHeapFreeSpace(page);
 
@@ -675,35 +662,17 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 			 * contains a tuple.  Hence, unlike RelationGetBufferForTuple(),
 			 * enforce saveFreeSpace unconditionally.
 			 */
-
-			/* XLOG stuff */
-			if (RelationNeedsWAL(state->rs_new_rel))
-				log_newpage(&state->rs_new_rel->rd_locator,
-							MAIN_FORKNUM,
-							state->rs_blockno,
-							page,
-							true);
-
-			/*
-			 * Now write the page. We say skipFsync = true because there's no
-			 * need for smgr to schedule an fsync for this write; we'll do it
-			 * ourselves in end_heap_rewrite.
-			 */
-			PageSetChecksumInplace(page, state->rs_blockno);
-
-			smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-					   state->rs_blockno, page, true);
-
+			bulkw_write(state->rs_bulkw, state->rs_blockno, state->rs_buffer, true);
+			page = state->rs_buffer = NULL;
 			state->rs_blockno++;
-			state->rs_buffer_valid = false;
 		}
 	}
 
-	if (!state->rs_buffer_valid)
+	if (!page)
 	{
 		/* Initialize a new empty page */
+		page = state->rs_buffer = bulkw_alloc_buf(state->rs_bulkw);
 		PageInit(page, BLCKSZ, 0);
-		state->rs_buffer_valid = true;
 	}
 
 	/* And now we can insert the tuple into the page */
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 62bc9917f13..0d738df6f0e 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -29,11 +29,11 @@
 #include "nodes/execnodes.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
+#include "storage/bulk_write.h"
 #include "storage/condition_variable.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
-#include "storage/smgr.h"
 #include "utils/builtins.h"
 #include "utils/index_selfuncs.h"
 #include "utils/memutils.h"
@@ -152,32 +152,17 @@ void
 btbuildempty(Relation index)
 {
 	bool		allequalimage = _bt_allequalimage(index, false);
-	Buffer		metabuf;
 	Page		metapage;
+	BulkWriteState *bulkw;
 
-	/*
-	 * Initalize the metapage.
-	 *
-	 * Regular index build bypasses the buffer manager and uses smgr functions
-	 * directly, with an smgrimmedsync() call at the end.  That makes sense
-	 * when the index is large, but for an empty index, it's better to use the
-	 * buffer cache to avoid the smgrimmedsync().
-	 */
-	metabuf = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
-	Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
-	_bt_lockbuf(index, metabuf, BT_WRITE);
+	bulkw = bulkw_start_rel(index, INIT_FORKNUM);
 
-	START_CRIT_SECTION();
-
-	metapage = BufferGetPage(metabuf);
+	/* Construct metapage. */
+	metapage = bulkw_alloc_buf(bulkw);
 	_bt_initmetapage(metapage, P_NONE, 0, allequalimage);
-	MarkBufferDirty(metabuf);
-	log_newpage_buffer(metabuf, true);
-
-	END_CRIT_SECTION();
+	bulkw_write(bulkw, BTREE_METAPAGE, metapage, true);
 
-	_bt_unlockbuf(index, metabuf);
-	ReleaseBuffer(metabuf);
+	bulkw_finish(bulkw);
 }
 
 /*
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index c2665fce411..1b0591f551c 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -23,13 +23,8 @@
  * many upper pages if the keys are reasonable-size) without risking a lot of
  * cascading splits during early insertions.
  *
- * Formerly the index pages being built were kept in shared buffers, but
- * that is of no value (since other backends have no interest in them yet)
- * and it created locking problems for CHECKPOINT, because the upper-level
- * pages were held exclusive-locked for long periods.  Now we just build
- * the pages in local memory and smgrwrite or smgrextend them as we finish
- * them.  They will need to be re-read into shared buffers on first use after
- * the build finishes.
+ * We use the bulk smgr loading facility to bypass the buffer cache and
+ * WAL log the pages efficiently.
  *
  * This code isn't concerned about the FSM at all. The caller is responsible
  * for initializing that.
@@ -57,7 +52,7 @@
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pgstat.h"
-#include "storage/smgr.h"
+#include "storage/bulk_write.h"
 #include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/rel.h"
 #include "utils/sortsupport.h"
@@ -251,11 +246,9 @@ typedef struct BTWriteState
 {
 	Relation	heap;
 	Relation	index;
+	BulkWriteState *bulkw;
 	BTScanInsert inskey;		/* generic insertion scankey */
-	bool		btws_use_wal;	/* dump pages to WAL? */
 	BlockNumber btws_pages_alloced; /* # pages allocated */
-	BlockNumber btws_pages_written; /* # pages written out */
-	Page		btws_zeropage;	/* workspace for filling zeroes */
 } BTWriteState;
 
 
@@ -267,7 +260,7 @@ static void _bt_spool(BTSpool *btspool, ItemPointer self,
 static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2);
 static void _bt_build_callback(Relation index, ItemPointer tid, Datum *values,
 							   bool *isnull, bool tupleIsAlive, void *state);
-static Page _bt_blnewpage(uint32 level);
+static Page _bt_blnewpage(BTWriteState *wstate, uint32 level);
 static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
 static void _bt_slideleft(Page rightmostpage);
 static void _bt_sortaddtup(Page page, Size itemsize,
@@ -569,16 +562,17 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 	wstate.inskey = _bt_mkscankey(wstate.index, NULL);
 	/* _bt_mkscankey() won't set allequalimage without metapage */
 	wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
-	wstate.btws_use_wal = RelationNeedsWAL(wstate.index);
 
 	/* reserve the metapage */
 	wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
-	wstate.btws_pages_written = 0;
-	wstate.btws_zeropage = NULL;	/* until needed */
+
+	wstate.bulkw = bulkw_start_rel(wstate.index, MAIN_FORKNUM);
 
 	pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
 								 PROGRESS_BTREE_PHASE_LEAF_LOAD);
 	_bt_load(&wstate, btspool, btspool2);
+
+	bulkw_finish(wstate.bulkw);
 }
 
 /*
@@ -614,12 +608,12 @@ _bt_build_callback(Relation index,
  * allocate workspace for a new, clean btree page, not linked to any siblings.
  */
 static Page
-_bt_blnewpage(uint32 level)
+_bt_blnewpage(BTWriteState *wstate, uint32 level)
 {
 	Page		page;
 	BTPageOpaque opaque;
 
-	page = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	page = bulkw_alloc_buf(wstate->bulkw);
 
 	/* Zero the page and set up standard page header info */
 	_bt_pageinit(page, BLCKSZ);
@@ -643,54 +637,8 @@ _bt_blnewpage(uint32 level)
 static void
 _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 {
-	/* XLOG stuff */
-	if (wstate->btws_use_wal)
-	{
-		/* We use the XLOG_FPI record type for this */
-		log_newpage(&wstate->index->rd_locator, MAIN_FORKNUM, blkno, page, true);
-	}
-
-	/*
-	 * If we have to write pages nonsequentially, fill in the space with
-	 * zeroes until we come back and overwrite.  This is not logically
-	 * necessary on standard Unix filesystems (unwritten space will read as
-	 * zeroes anyway), but it should help to avoid fragmentation. The dummy
-	 * pages aren't WAL-logged though.
-	 */
-	while (blkno > wstate->btws_pages_written)
-	{
-		if (!wstate->btws_zeropage)
-			wstate->btws_zeropage = (Page) palloc_aligned(BLCKSZ,
-														  PG_IO_ALIGN_SIZE,
-														  MCXT_ALLOC_ZERO);
-		/* don't set checksum for all-zero page */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM,
-				   wstate->btws_pages_written++,
-				   wstate->btws_zeropage,
-				   true);
-	}
-
-	PageSetChecksumInplace(page, blkno);
-
-	/*
-	 * Now write the page.  There's no need for smgr to schedule an fsync for
-	 * this write; we'll do it ourselves before ending the build.
-	 */
-	if (blkno == wstate->btws_pages_written)
-	{
-		/* extending the file... */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				   page, true);
-		wstate->btws_pages_written++;
-	}
-	else
-	{
-		/* overwriting a block we zero-filled before */
-		smgrwrite(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				  page, true);
-	}
-
-	pfree(page);
+	bulkw_write(wstate->bulkw, blkno, page, true);
+	/* bulkw_write took ownership of 'page' */
 }
 
 /*
@@ -703,7 +651,7 @@ _bt_pagestate(BTWriteState *wstate, uint32 level)
 	BTPageState *state = (BTPageState *) palloc0(sizeof(BTPageState));
 
 	/* create initial page for level */
-	state->btps_page = _bt_blnewpage(level);
+	state->btps_page = _bt_blnewpage(wstate, level);
 
 	/* and assign it a page position */
 	state->btps_blkno = wstate->btws_pages_alloced++;
@@ -916,7 +864,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
 		IndexTuple	oitup;
 
 		/* Create new page of same level */
-		npage = _bt_blnewpage(state->btps_level);
+		npage = _bt_blnewpage(wstate, state->btps_level);
 
 		/* and assign it a page position */
 		nblkno = wstate->btws_pages_alloced++;
@@ -1028,8 +976,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
 		}
 
 		/*
-		 * Write out the old page.  We never need to touch it again, so we can
-		 * free the opage workspace too.
+		 * Write out the old page. _bt_blwritepage takes ownership of the
+		 * 'opage' buffer.
 		 */
 		_bt_blwritepage(wstate, opage, oblkno);
 
@@ -1163,7 +1111,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 		 */
 		_bt_slideleft(s->btps_page);
 		_bt_blwritepage(wstate, s->btps_page, s->btps_blkno);
-		s->btps_page = NULL;	/* writepage freed the workspace */
+		s->btps_page = NULL;	/* writepage took ownership of the buffer */
 	}
 
 	/*
@@ -1172,7 +1120,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
 	 * set to point to "P_NONE").  This changes the index to the "valid" state
 	 * by filling in a valid magic number in the metapage.
 	 */
-	metapage = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	metapage = bulkw_alloc_buf(wstate->bulkw);
 	_bt_initmetapage(metapage, rootblkno, rootlevel,
 					 wstate->inskey->allequalimage);
 	_bt_blwritepage(wstate, metapage, BTREE_METAPAGE);
@@ -1422,18 +1370,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 
 	/* Close down final pages and write the metapage */
 	_bt_uppershutdown(wstate, state);
-
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
-	if (wstate->btws_use_wal)
-		smgrimmedsync(RelationGetSmgr(wstate->index), MAIN_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index 4443f1918df..11fe932c26d 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -25,7 +25,7 @@
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
-#include "storage/smgr.h"
+#include "storage/bulk_write.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -155,42 +155,27 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 void
 spgbuildempty(Relation index)
 {
-	Buffer		metabuffer,
-				rootbuffer,
-				nullbuffer;
-
-	/*
-	 * Initialize the meta page and root pages
-	 */
-	metabuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
-	LockBuffer(metabuffer, BUFFER_LOCK_EXCLUSIVE);
-	rootbuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
-	LockBuffer(rootbuffer, BUFFER_LOCK_EXCLUSIVE);
-	nullbuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
-	LockBuffer(nullbuffer, BUFFER_LOCK_EXCLUSIVE);
-
-	Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO);
-	Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_ROOT_BLKNO);
-	Assert(BufferGetBlockNumber(nullbuffer) == SPGIST_NULL_BLKNO);
+	Page		page;
+	BulkWriteState *bulkw;
 
-	START_CRIT_SECTION();
+	bulkw = bulkw_start_rel(index, INIT_FORKNUM);
 
-	SpGistInitMetapage(BufferGetPage(metabuffer));
-	MarkBufferDirty(metabuffer);
-	SpGistInitBuffer(rootbuffer, SPGIST_LEAF);
-	MarkBufferDirty(rootbuffer);
-	SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS);
-	MarkBufferDirty(nullbuffer);
+	/* Construct metapage. */
+	page = (Page) bulkw_alloc_buf(bulkw);
+	SpGistInitMetapage(page);
+	bulkw_write(bulkw, SPGIST_METAPAGE_BLKNO, page, true);
 
-	log_newpage_buffer(metabuffer, true);
-	log_newpage_buffer(rootbuffer, true);
-	log_newpage_buffer(nullbuffer, true);
+	/* Likewise for the root page. */
+	page = (Page) bulkw_alloc_buf(bulkw);
+	SpGistInitPage(page, SPGIST_LEAF);
+	bulkw_write(bulkw, SPGIST_ROOT_BLKNO, page, true);
 
-	END_CRIT_SECTION();
+	/* Likewise for the null-tuples root page. */
+	page = (Page) bulkw_alloc_buf(bulkw);
+	SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
+	bulkw_write(bulkw, SPGIST_NULL_BLKNO, page, true);
 
-	UnlockReleaseBuffer(metabuffer);
-	UnlockReleaseBuffer(rootbuffer);
-	UnlockReleaseBuffer(nullbuffer);
+	bulkw_finish(bulkw);
 }
 
 /*
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 93f07e49b72..eb982b1a88d 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -28,6 +28,7 @@
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
 #include "miscadmin.h"
+#include "storage/bulk_write.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
 #include "utils/hsearch.h"
@@ -451,14 +452,11 @@ void
 RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 					ForkNumber forkNum, char relpersistence)
 {
-	PGIOAlignedBlock buf;
-	Page		page;
 	bool		use_wal;
 	bool		copying_initfork;
 	BlockNumber nblocks;
 	BlockNumber blkno;
-
-	page = (Page) buf.data;
+	BulkWriteState *bulkw;
 
 	/*
 	 * The init fork for an unlogged relation in many respects has to be
@@ -477,14 +475,19 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 	use_wal = XLogIsNeeded() &&
 		(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
 
+	bulkw = bulkw_start_smgr(dst, forkNum, use_wal);
+
 	nblocks = smgrnblocks(src, forkNum);
 
 	for (blkno = 0; blkno < nblocks; blkno++)
 	{
+		Page		page;
+
 		/* If we got a cancel signal during the copy of the data, quit */
 		CHECK_FOR_INTERRUPTS();
 
-		smgrread(src, forkNum, blkno, buf.data);
+		page = bulkw_alloc_buf(bulkw);
+		smgrread(src, forkNum, blkno, page);
 
 		if (!PageIsVerifiedExtended(page, blkno,
 									PIV_LOG_WARNING | PIV_REPORT_STAT))
@@ -511,30 +514,9 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 		 * page this is, so we have to log the full page including any unused
 		 * space.
 		 */
-		if (use_wal)
-			log_newpage(&dst->smgr_rlocator.locator, forkNum, blkno, page, false);
-
-		PageSetChecksumInplace(page, blkno);
-
-		/*
-		 * Now write the page.  We say skipFsync = true because there's no
-		 * need for smgr to schedule an fsync for this write; we'll do it
-		 * ourselves below.
-		 */
-		smgrextend(dst, forkNum, blkno, buf.data, true);
+		bulkw_write(bulkw, blkno, page, false);
 	}
-
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is that since we're copying outside shared buffers, a CHECKPOINT
-	 * occurring during the copy has no way to flush the previously written
-	 * data to disk (indeed it won't know the new rel even exists).  A crash
-	 * later on would replay WAL from the checkpoint, therefore it wouldn't
-	 * replay our earlier WAL entries. If we do not fsync those pages here,
-	 * they might still not be on disk when the crash occurs.
-	 */
-	if (use_wal || copying_initfork)
-		smgrimmedsync(dst, forkNum);
+	bulkw_finish(bulkw);
 }
 
 /*
diff --git a/src/backend/storage/smgr/Makefile b/src/backend/storage/smgr/Makefile
index 596b564656f..1d0b98764f9 100644
--- a/src/backend/storage/smgr/Makefile
+++ b/src/backend/storage/smgr/Makefile
@@ -13,6 +13,7 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = \
+	bulk_write.o \
 	md.o \
 	smgr.o
 
diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
new file mode 100644
index 00000000000..d9090979d65
--- /dev/null
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -0,0 +1,334 @@
+/*-------------------------------------------------------------------------
+ *
+ * bulk_write.c
+ *	  Efficiently and reliably populate a new relation
+ *
+ * The assumption is that no other backends access the relation while we are
+ * loading it, so we can take some shortcuts.  Alternatively, you can use the
+ * buffer manager as usual, if performance is not critical, but you must not
+ * mix operations through the buffer manager and the bulk loading interface at
+ * the same time.
+ *
+ * We bypass the buffer manager to avoid the locking overhead, and call
+ * smgrextend() directly.  A downside is that the pages will need to be
+ * re-read into shared buffers on first use after the build finishes.  That's
+ * usually a good tradeoff for large relations, and for small relations, the
+ * overhead isn't very significant compared to creating the relation in the
+ * first place.
+ *
+ * The pages are WAL-logged if needed.  To save on WAL header overhead, we
+ * WAL-log several pages in one record.
+ *
+ * One tricky point is that because we bypass the buffer manager, we need to
+ * register the relation for fsyncing at the next checkpoint ourselves, and
+ * make sure that the relation is correctly fsync by us or the checkpointer
+ * even if a checkpoint happens concurrently.
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/smgr/bulk_write.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/xloginsert.h"
+#include "access/xlogrecord.h"
+#include "storage/bufmgr.h"
+#include "storage/bufpage.h"
+#include "storage/bulk_write.h"
+#include "storage/proc.h"
+#include "storage/smgr.h"
+#include "utils/rel.h"
+
+#define MAX_BUFFERED_PAGES XLR_MAX_BLOCK_ID
+
+typedef struct BulkWriteBuffer
+{
+	Page		page;
+	BlockNumber blkno;
+	bool		page_std;
+	int16		order;
+} BulkWriteBuffer;
+
+/*
+ * Bulk writer state for one relation fork.
+ */
+typedef struct BulkWriteState
+{
+	/* Information about the target relation we're writing */
+	SMgrRelation smgr;
+	ForkNumber	forknum;
+	bool		use_wal;
+
+	/* We keep several pages buffered, and WAL-log them in batches */
+	int			nbuffered;
+	BulkWriteBuffer buffers[MAX_BUFFERED_PAGES];
+
+	/* Current size of the relation */
+	BlockNumber pages_written;
+
+	/* The RedoRecPtr at the time that the bulk operation started */
+	XLogRecPtr	start_RedoRecPtr;
+
+	Page		zeropage;		/* workspace for filling zeroes */
+
+	MemoryContext memcxt;
+} BulkWriteState;
+
+static void bulkw_flush(BulkWriteState *bulkw);
+
+/*
+ * Start a bulk write operation on a relation fork.
+ */
+BulkWriteState *
+bulkw_start_rel(Relation rel, ForkNumber forknum)
+{
+	return bulkw_start_smgr(RelationGetSmgr(rel),
+							forknum,
+							RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
+}
+
+/*
+ * Start a bulk write operation on a relation fork.
+ *
+ * This is like bulkw_start_rel, but can be used without a relcache entry.
+ */
+BulkWriteState *
+bulkw_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
+{
+	BulkWriteState *bulkw;
+
+	bulkw = palloc(sizeof(BulkWriteState));
+	bulkw->smgr = smgr;
+	bulkw->forknum = forknum;
+	bulkw->use_wal = use_wal;
+
+	bulkw->nbuffered = 0;
+	bulkw->pages_written = 0;
+
+	bulkw->start_RedoRecPtr = GetRedoRecPtr();
+
+	bulkw->zeropage = NULL;		/* until needed */
+
+	/*
+	 * Remember the memory context.  We will use it to allocate all the
+	 * buffers later.
+	 */
+	bulkw->memcxt = CurrentMemoryContext;
+
+	return bulkw;
+}
+
+/*
+ * Finish bulk write operation.
+ *
+ * This WAL-logs and flushes any remaining buffers to disk, and
+ * fsyncs the relation if needed.
+ */
+void
+bulkw_finish(BulkWriteState *bulkw)
+{
+	/* WAL-log and flush any remaining pages */
+	bulkw_flush(bulkw);
+
+	/*
+	 * When we wrote out the pages, we passed skipFsync=true to avoid the
+	 * overhead of registering all the writes with the checkpointer.  Register
+	 * the whole relation now.
+	 *
+	 * There is one hole in that idea: If a checkpoint occurred while we were
+	 * writing the pages, it already missed fsyncing the pages we had written
+	 * before the checkpoint started.  A crash later on would replay the WAL
+	 * starting from the checkpoint, therefore it wouldn't replay our earlier
+	 * WAL records.  So if a checkpoint started after the bulk write, fsync
+	 * the files now.
+	 */
+	if (!SmgrIsTemp(bulkw->smgr))
+	{
+		/*
+		 * Prevent a checkpoint from starting between the GetRedoRecPtr() and
+		 * smgrregistersync() calls.
+		 */
+		Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
+		MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+
+		if (bulkw->start_RedoRecPtr != GetRedoRecPtr())
+		{
+			/*
+			 * A checkpoint occurred and it didn't know about our writes, so
+			 * fsync() the relation ourselves.
+			 */
+			MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+			smgrimmedsync(bulkw->smgr, bulkw->forknum);
+			elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
+		}
+		else
+		{
+			smgrregistersync(bulkw->smgr, bulkw->forknum);
+			MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+		}
+	}
+}
+
+static int
+buffer_cmp(const void *a, const void *b)
+{
+	const BulkWriteBuffer *bufa = (const BulkWriteBuffer *) a;
+	const BulkWriteBuffer *bufb = (const BulkWriteBuffer *) b;
+
+	if (bufa->blkno == bufb->blkno)
+	{
+		if (bufa->order > bufb->order)
+			return 1;
+		else
+			return -1;
+	}
+	else if (bufa->blkno > bufb->blkno)
+	{
+		return 1;
+	}
+	else
+		return -1;
+}
+
+/*
+ * Write all buffered pages to disk.
+ */
+static void
+bulkw_flush(BulkWriteState *bulkw)
+{
+	int			nbuffered = bulkw->nbuffered;
+	BulkWriteBuffer *buffers = bulkw->buffers;
+
+	if (nbuffered == 0)
+		return;
+
+	if (nbuffered > 1)
+	{
+		int			o;
+
+		qsort(buffers, nbuffered, sizeof(BulkWriteBuffer), buffer_cmp);
+
+		/*
+		 * Eliminate duplicates, keeping the last write of each block.
+		 * (buffer_cmp uses 'order' as the last sort key)
+		 */
+		o = 0;
+		for (int i = 0; i < nbuffered; i++)
+		{
+			if (i < nbuffered - 1 && buffers[i + 1].blkno == buffers[i].blkno)
+			{
+				/* there is a later write of the same page, skip this one */
+				pfree(buffers[i].page);
+				continue;
+			}
+
+			if (i != o)
+				buffers[o] = buffers[i];
+			o++;
+		}
+		nbuffered = o;
+	}
+
+	if (bulkw->use_wal)
+	{
+		BlockNumber blknos[MAX_BUFFERED_PAGES];
+		Page		pages[MAX_BUFFERED_PAGES];
+		bool		page_std = true;
+
+		for (int i = 0; i < nbuffered; i++)
+		{
+			blknos[i] = buffers[i].blkno;
+			pages[i] = buffers[i].page;
+
+			/*
+			 * If any of the pages use !page_std, we log them all as such.
+			 * That's a bit wasteful, but in practice, a mix of standard and
+			 * non-standard page layout is rare.  None of the built-in AMs do
+			 * that.
+			 */
+			if (!buffers[i].page_std)
+				page_std = false;
+		}
+		log_newpages(&bulkw->smgr->smgr_rlocator.locator, bulkw->forknum, nbuffered, blknos, pages,
+					 page_std);
+	}
+
+	for (int i = 0; i < nbuffered; i++)
+	{
+		BlockNumber blkno = buffers[i].blkno;
+		Page		page = buffers[i].page;
+
+		PageSetChecksumInplace(page, blkno);
+
+		if (blkno >= bulkw->pages_written)
+		{
+			/*
+			 * If we have to write pages nonsequentially, fill in the space
+			 * with zeroes until we come back and overwrite.  This is not
+			 * logically necessary on standard Unix filesystems (unwritten
+			 * space will read as zeroes anyway), but it should help to avoid
+			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 */
+			while (blkno > bulkw->pages_written)
+			{
+				if (!bulkw->zeropage)
+					bulkw->zeropage = (Page) palloc_aligned(BLCKSZ,
+															PG_IO_ALIGN_SIZE,
+															MCXT_ALLOC_ZERO);
+
+				/* don't set checksum for all-zero page */
+				smgrextend(bulkw->smgr, bulkw->forknum,
+						   bulkw->pages_written++,
+						   bulkw->zeropage,
+						   true);
+			}
+
+			smgrextend(bulkw->smgr, bulkw->forknum, blkno, page, true);
+			bulkw->pages_written = buffers[i].blkno + 1;
+		}
+		else
+			smgrwrite(bulkw->smgr, bulkw->forknum, blkno, page, true);
+		pfree(page);
+	}
+
+	bulkw->nbuffered = 0;
+}
+
+/*
+ * Write out or buffer write of 'page'.
+ *
+ * NB: this takes ownership of 'page'
+ */
+void
+bulkw_write(BulkWriteState *bulkw, BlockNumber blocknum, Page page, bool page_std)
+{
+	bulkw->buffers[bulkw->nbuffered].page = page;
+	bulkw->buffers[bulkw->nbuffered].blkno = blocknum;
+	bulkw->buffers[bulkw->nbuffered].page_std = page_std;
+
+	/*
+	 * If the same page is written multiple times, 'order' is used to remember
+	 * the order of the writes, so that the last write wins.
+	 */
+	bulkw->buffers[bulkw->nbuffered].order = (int16) bulkw->nbuffered;
+
+	bulkw->nbuffered++;
+
+	if (bulkw->nbuffered == MAX_BUFFERED_PAGES)
+		bulkw_flush(bulkw);
+}
+
+/*
+ * Allocate a new buffer which can later be written with bulkw_write()
+ */
+Page
+bulkw_alloc_buf(BulkWriteState *bulkw)
+{
+	return MemoryContextAllocAligned(bulkw->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+}
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index fdecbad1709..343ee51048e 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -1082,6 +1082,49 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
 	}
 }
 
+/*
+ * mdregistersync() -- Mark whole relation as needing fsync
+ */
+void
+mdregistersync(SMgrRelation reln, ForkNumber forknum)
+{
+	int			segno;
+	int			min_inactive_seg;
+
+	/*
+	 * NOTE: mdnblocks makes sure we have opened all active segments, so that
+	 * the loop below will get them all!
+	 */
+	mdnblocks(reln, forknum);
+
+	min_inactive_seg = segno = reln->md_num_open_segs[forknum];
+
+	/*
+	 * Temporarily open inactive segments, then close them after sync.  There
+	 * may be some inactive segments left opened after error, but that is
+	 * harmless.  We don't bother to clean them up and take a risk of further
+	 * trouble.  The next mdclose() will soon close them.
+	 */
+	while (_mdfd_openseg(reln, forknum, segno, 0) != NULL)
+		segno++;
+
+	while (segno > 0)
+	{
+		MdfdVec    *v = &reln->md_seg_fds[forknum][segno - 1];
+
+		register_dirty_segment(reln, forknum, v);
+
+		/* Close inactive segments immediately */
+		if (segno > min_inactive_seg)
+		{
+			FileClose(v->mdfd_vfd);
+			_fdvec_resize(reln, forknum, segno - 1);
+		}
+
+		segno--;
+	}
+}
+
 /*
  * mdimmedsync() -- Immediately sync a relation to stable storage.
  *
diff --git a/src/backend/storage/smgr/meson.build b/src/backend/storage/smgr/meson.build
index e1ba6ed74b8..133622a6528 100644
--- a/src/backend/storage/smgr/meson.build
+++ b/src/backend/storage/smgr/meson.build
@@ -1,6 +1,7 @@
 # Copyright (c) 2022-2023, PostgreSQL Global Development Group
 
 backend_sources += files(
+  'bulk_write.c',
   'md.c',
   'smgr.c',
 )
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 5d0f3d515c3..9f7405e3c88 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -65,6 +65,7 @@ typedef struct f_smgr
 	void		(*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
 								  BlockNumber nblocks);
 	void		(*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
+	void		(*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -86,6 +87,7 @@ static const f_smgr smgrsw[] = {
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
 		.smgr_immedsync = mdimmedsync,
+		.smgr_registersync = mdregistersync,
 	}
 };
 
@@ -576,6 +578,14 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  * on disk at return, only dumped out to the kernel.  However,
  * provisions will be made to fsync the write before the next checkpoint.
  *
+ * NB: The mechanism to ensure fsync at next checkpoint assumes that there is
+ * something that prevents a concurrent checkpoint from "racing ahead" of the
+ * write.  One way to prevent that is by holding a lock on the buffer; the
+ * buffer manager's writes are protected by that.  The bulk writer facility in
+ * bulk_write.c checks the redo pointer and and calls smgrimmedsync() if a
+ * checkpoint happened; that relies on the fact that no other backend can be
+ * concurrently modify the page.
+ *
  * skipFsync indicates that the caller will make other provisions to
  * fsync the relation, so we needn't bother.  Temporary relations also
  * do not require fsync.
@@ -694,6 +704,24 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
 	}
 }
 
+/*
+ * smgrregistersync() -- Request a relation to be sync'd at next checkpoint
+ *
+ * This can be used after calling smgrwrite() or smgrextend() with skipFsync =
+ * true, to register the fsyncs that were skipped earlier.
+ *
+ * Note: be mindful that a checkpoint could already have happened between the
+ * smgrwrite or smgrextend calls and this!  In that case, the checkpoint
+ * already missed fsyncing this relation, and you should use smgrimmedsync
+ * instead.  Most callers should use the bulk loading facility in bulk_write.c
+ * instead, which handles all that.
+ */
+void
+smgrregistersync(SMgrRelation reln, ForkNumber forknum)
+{
+	smgrsw[reln->smgr_which].smgr_registersync(reln, forknum);
+}
+
 /*
  * smgrimmedsync() -- Force the specified relation to stable storage.
  *
@@ -716,6 +744,9 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
  * Note that you need to do FlushRelationBuffers() first if there is
  * any possibility that there are dirty buffers for the relation;
  * otherwise the sync is not very meaningful.
+ *
+ * Most callers should use the bulk loading facility in bulk_write.c
+ * instead of calling this directly.
  */
 void
 smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
diff --git a/src/include/storage/bulk_write.h b/src/include/storage/bulk_write.h
new file mode 100644
index 00000000000..c8eb7307178
--- /dev/null
+++ b/src/include/storage/bulk_write.h
@@ -0,0 +1,28 @@
+/*-------------------------------------------------------------------------
+ *
+ * bulk_write.h
+ *	  Efficiently and reliably populate a new relation
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/bulk_write.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef BULK_WRITE_H
+#define BULK_WRITE_H
+
+typedef struct BulkWriteState BulkWriteState;
+
+/* forward declared from smgr.h */
+struct SMgrRelationData;
+
+extern BulkWriteState *bulkw_start_rel(Relation rel, ForkNumber forknum);
+extern BulkWriteState *bulkw_start_smgr(struct SMgrRelationData *smgr, ForkNumber forknum, bool use_wal);
+extern Page bulkw_alloc_buf(BulkWriteState *bulkw);
+extern void bulkw_write(BulkWriteState *bulkw, BlockNumber blocknum, Page page, bool page_std);
+extern void bulkw_finish(BulkWriteState *bulkw);
+
+#endif							/* BULK_WRITE_H */
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 941879ee6a8..225701271d2 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -42,6 +42,7 @@ extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
 extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
+extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
 
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index a9a179aabac..cc5a91dc624 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -107,6 +107,7 @@ extern BlockNumber smgrnblocks_cached(SMgrRelation reln, ForkNumber forknum);
 extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
 						 int nforks, BlockNumber *nblocks);
 extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
+extern void smgrregistersync(SMgrRelation reln, ForkNumber forknum);
 extern void AtEOXact_SMgr(void);
 extern bool ProcessBarrierSmgrRelease(void);
 
-- 
2.39.2

Reply via email to