Here's a rebase.  I decided against committing this for v17 in the
end.  There's not much wrong with it AFAIK, except perhaps an
unprincipled chopping up of writes with large io_combine_limit due to
simplistic flow control, and I liked the idea of having a decent user
of smgrwritev() in the tree, and it probably makes CREATE INDEX a bit
faster, but... I'd like to try something more ambitious that
streamifies this and also the "main" writeback paths.  I shared some
patches for that that are counterparts to this, over at[1].

[1] 
https://www.postgresql.org/message-id/flat/CA%2BhUKGK1in4FiWtisXZ%2BJo-cNSbWjmBcPww3w3DBM%2BwhJTABXA%40mail.gmail.com
From 7ee50aae3d4eba0df5bce05c196f411abb0bd9ab Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmu...@postgresql.org>
Date: Mon, 8 Apr 2024 18:19:41 +1200
Subject: [PATCH v7 1/3] Use smgrwritev() for both overwriting and extending.

Since mdwrite() and mdextend() were very similar and both needed
vectored variants, merge them into a single interface.  This reduces
duplication and fills out the set of operations.  We still want to be
able to assert that callers know the difference between overwriting and
extending, and to activate slightly different behavior during recovery,
so add a new "flags" argument to control that.

The goal here is to provide the missing vectored interface without which
the new bulk write facility from commit 8af25652 can't write to the file
system in bulk.  A following commit will connect them together.

Like smgrwrite(), the traditional single-block smgrextend() function
with skipFsync boolean argument is now translated to smgrwritev() by
an inlinable wrapper function.

The smgrzeroextend() interface remains distinct; the idea was floated of
merging that too, but so far without consensus.

Reviewed-by: Heikki Linnakangas <hlinn...@iki.fi>
Reviewed-by: Andres Freund <and...@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c |  10 +--
 src/backend/storage/smgr/md.c       | 120 ++++++++--------------------
 src/backend/storage/smgr/smgr.c     | 100 ++++++++++-------------
 src/include/storage/md.h            |   4 +-
 src/include/storage/smgr.h          |  19 ++++-
 5 files changed, 94 insertions(+), 159 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 44836751b71..5166a839da8 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -4011,11 +4011,11 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum,
 	 *
 	 * In recovery, we cache the value returned by the first lseek(SEEK_END)
 	 * and the future writes keeps the cached value up-to-date. See
-	 * smgrextend. It is possible that the value of the first lseek is smaller
-	 * than the actual number of existing blocks in the file due to buggy
-	 * Linux kernels that might not have accounted for the recent write. But
-	 * that should be fine because there must not be any buffers after that
-	 * file size.
+	 * smgrzeroextend and smgrwritev. It is possible that the value of the
+	 * first lseek is smaller than the actual number of existing blocks in the
+	 * file due to buggy Linux kernels that might not have accounted for the
+	 * recent write. But that should be fine because there must not be any
+	 * buffers after that file size.
 	 */
 	for (i = 0; i < nforks; i++)
 	{
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index bf0f3ca76d1..73d077ca3ea 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -447,79 +447,10 @@ mdunlinkfork(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo)
 	pfree(path);
 }
 
-/*
- * mdextend() -- Add a block to the specified relation.
- *
- * The semantics are nearly the same as mdwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void *buffer, bool skipFsync)
-{
-	off_t		seekpos;
-	int			nbytes;
-	MdfdVec    *v;
-
-	/* If this build supports direct I/O, the buffer must be I/O aligned. */
-	if (PG_O_DIRECT != 0 && PG_IO_ALIGN_SIZE <= BLCKSZ)
-		Assert((uintptr_t) buffer == TYPEALIGN(PG_IO_ALIGN_SIZE, buffer));
-
-	/* This assert is too expensive to have on normally ... */
-#ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum >= mdnblocks(reln, forknum));
-#endif
-
-	/*
-	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
-	 * more --- we mustn't create a block whose number actually is
-	 * InvalidBlockNumber.  (Note that this failure should be unreachable
-	 * because of upstream checks in bufmgr.c.)
-	 */
-	if (blocknum == InvalidBlockNumber)
-		ereport(ERROR,
-				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
-				 errmsg("cannot extend file \"%s\" beyond %u blocks",
-						relpath(reln->smgr_rlocator, forknum),
-						InvalidBlockNumber)));
-
-	v = _mdfd_getseg(reln, forknum, blocknum, skipFsync, EXTENSION_CREATE);
-
-	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
-
-	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
-
-	if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
-	{
-		if (nbytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not extend file \"%s\": %m",
-							FilePathName(v->mdfd_vfd)),
-					 errhint("Check free disk space.")));
-		/* short write: complain appropriately */
-		ereport(ERROR,
-				(errcode(ERRCODE_DISK_FULL),
-				 errmsg("could not extend file \"%s\": wrote only %d of %d bytes at block %u",
-						FilePathName(v->mdfd_vfd),
-						nbytes, BLCKSZ, blocknum),
-				 errhint("Check free disk space.")));
-	}
-
-	if (!skipFsync && !SmgrIsTemp(reln))
-		register_dirty_segment(reln, forknum, v);
-
-	Assert(_mdnblocks(reln, forknum, v) <= ((BlockNumber) RELSEG_SIZE));
-}
-
 /*
  * mdzeroextend() -- Add new zeroed out blocks to the specified relation.
  *
- * Similar to mdextend(), except the relation can be extended by multiple
- * blocks at once and the added blocks will be filled with zeroes.
+ * The added blocks will be filled with zeroes.
  */
 void
 mdzeroextend(SMgrRelation reln, ForkNumber forknum,
@@ -919,20 +850,31 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
- *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use mdextend().
  */
 void
 mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		 const void **buffers, BlockNumber nblocks, bool skipFsync)
+		 const void **buffers, BlockNumber nblocks, int flags)
 {
 	/* This assert is too expensive to have on normally ... */
 #ifdef CHECK_WRITE_VS_EXTEND
-	Assert(blocknum < mdnblocks(reln, forknum));
+	if (flags & SMGR_WRITE_EXTEND)
+		Assert(blocknum >= mdnblocks(reln, forknum));
+	else
+		Assert(blocknum + nblocks <= mdnblocks(reln, forknum));
 #endif
 
+	/*
+	 * If a relation manages to grow to 2^32-1 blocks, refuse to extend it any
+	 * more --- we mustn't create a block whose number actually is
+	 * InvalidBlockNumber or larger.
+	 */
+	if ((uint64) blocknum + nblocks >= (uint64) InvalidBlockNumber)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("cannot extend file \"%s\" beyond %u blocks",
+						relpath(reln->smgr_rlocator, forknum),
+						InvalidBlockNumber)));
+
 	while (nblocks > 0)
 	{
 		struct iovec iov[PG_IOV_MAX];
@@ -944,7 +886,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		size_t		transferred_this_segment;
 		size_t		size_this_segment;
 
-		v = _mdfd_getseg(reln, forknum, blocknum, skipFsync,
+		v = _mdfd_getseg(reln, forknum, blocknum, flags & SMGR_WRITE_SKIP_FSYNC,
+						 (flags & SMGR_WRITE_EXTEND) ?
+						 EXTENSION_CREATE :
 						 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
 
 		seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
@@ -992,7 +936,9 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 
 				ereport(ERROR,
 						(errcode_for_file_access(),
-						 errmsg("could not write blocks %u..%u in file \"%s\": %m",
+						 errmsg((flags & SMGR_WRITE_EXTEND) ?
+								"could not extend blocks %u..%u in file \"%s\": %m" :
+								"could not write blocks %u..%u in file \"%s\": %m",
 								blocknum,
 								blocknum + nblocks_this_segment - 1,
 								FilePathName(v->mdfd_vfd)),
@@ -1010,7 +956,7 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 			iovcnt = compute_remaining_iovec(iov, iov, iovcnt, nbytes);
 		}
 
-		if (!skipFsync && !SmgrIsTemp(reln))
+		if ((flags & SMGR_WRITE_SKIP_FSYNC) == 0 && !SmgrIsTemp(reln))
 			register_dirty_segment(reln, forknum, v);
 
 		nblocks -= nblocks_this_segment;
@@ -1638,11 +1584,11 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 		{
 			/*
 			 * Normally we will create new segments only if authorized by the
-			 * caller (i.e., we are doing mdextend()).  But when doing WAL
-			 * recovery, create segments anyway; this allows cases such as
-			 * replaying WAL data that has a write into a high-numbered
-			 * segment of a relation that was later deleted. We want to go
-			 * ahead and create the segments so we can finish out the replay.
+			 * caller (i.e., we are extending).  But when doing WAL recovery,
+			 * create segments anyway; this allows cases such as replaying WAL
+			 * data that has a write into a high-numbered segment of a
+			 * relation that was later deleted. We want to go ahead and create
+			 * the segments so we can finish out the replay.
 			 *
 			 * We have to maintain the invariant that segments before the last
 			 * active segment are of size RELSEG_SIZE; therefore, if
@@ -1655,9 +1601,9 @@ _mdfd_getseg(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
 				char	   *zerobuf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE,
 													 MCXT_ALLOC_ZERO);
 
-				mdextend(reln, forknum,
-						 nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
-						 zerobuf, skipFsync);
+				smgrextend(reln, forknum,
+						   nextsegno * ((BlockNumber) RELSEG_SIZE) - 1,
+						   zerobuf, skipFsync);
 				pfree(zerobuf);
 			}
 			flags = O_CREAT;
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 62226d5dca7..92628a0339e 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -82,8 +82,6 @@ typedef struct f_smgr
 	bool		(*smgr_exists) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_unlink) (RelFileLocatorBackend rlocator, ForkNumber forknum,
 								bool isRedo);
-	void		(*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
-								BlockNumber blocknum, const void *buffer, bool skipFsync);
 	void		(*smgr_zeroextend) (SMgrRelation reln, ForkNumber forknum,
 									BlockNumber blocknum, int nblocks, bool skipFsync);
 	bool		(*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
@@ -94,7 +92,7 @@ typedef struct f_smgr
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
-								bool skipFsync);
+								int flags);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -114,7 +112,6 @@ static const f_smgr smgrsw[] = {
 		.smgr_create = mdcreate,
 		.smgr_exists = mdexists,
 		.smgr_unlink = mdunlink,
-		.smgr_extend = mdextend,
 		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_readv = mdreadv,
@@ -521,40 +518,11 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
 	pfree(rlocators);
 }
 
-
-/*
- * smgrextend() -- Add a new block to a file.
- *
- * The semantics are nearly the same as smgrwrite(): write at the
- * specified position.  However, this is to be used for the case of
- * extending a relation (i.e., blocknum is at or beyond the current
- * EOF).  Note that we assume writing a block beyond current EOF
- * causes intervening file space to become filled with zeroes.
- */
-void
-smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void *buffer, bool skipFsync)
-{
-	smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
-										 buffer, skipFsync);
-
-	/*
-	 * Normally we expect this to increase nblocks by one, but if the cached
-	 * value isn't as expected, just invalidate it so the next call asks the
-	 * kernel.
-	 */
-	if (reln->smgr_cached_nblocks[forknum] == blocknum)
-		reln->smgr_cached_nblocks[forknum] = blocknum + 1;
-	else
-		reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
-}
-
 /*
  * smgrzeroextend() -- Add new zeroed out blocks to a file.
  *
- * Similar to smgrextend(), except the relation can be extended by
- * multiple blocks at once and the added blocks will be filled with
- * zeroes.
+ * Similar to writing with SMGR_WRITE_EXTEND, except the blocks will be filled
+ * with zeroes.
  */
 void
 smgrzeroextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
@@ -607,9 +575,9 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 /*
  * smgrwritev() -- Write the supplied buffers out.
  *
- * This is to be used only for updating already-existing blocks of a
- * relation (ie, those before the current EOF).  To extend a relation,
- * use smgrextend().
+ * By default this is to be used only for updating already-existing blocks of
+ * a relation (ie, those before the current EOF).  To extend a relation,
+ * specify SMGR_WRITE_EXTEND in flags.
  *
  * This is not a synchronous write -- the block is not necessarily
  * on disk at return, only dumped out to the kernel.  However,
@@ -623,16 +591,29 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
  * checkpoint happened; that relies on the fact that no other backend can be
  * concurrently modifying the page.
  *
- * skipFsync indicates that the caller will make other provisions to
- * fsync the relation, so we needn't bother.  Temporary relations also
- * do not require fsync.
+ * SMGR_WRITE_SKIP_FSYNC indicates that the caller will make other provisions
+ * to fsync the relation, so we needn't bother.  Temporary relations also do
+ * not require fsync.
  */
 void
 smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
-		   const void **buffers, BlockNumber nblocks, bool skipFsync)
+		   const void **buffers, BlockNumber nblocks, int flags)
 {
 	smgrsw[reln->smgr_which].smgr_writev(reln, forknum, blocknum,
-										 buffers, nblocks, skipFsync);
+										 buffers, nblocks, flags);
+
+	if (flags & SMGR_WRITE_EXTEND)
+	{
+		/*
+		 * Normally we expect this to increase the fork size by nblocks, but
+		 * if the cached value isn't as expected, just invalidate it so the
+		 * next call asks the smgr implementation.
+		 */
+		if (reln->smgr_cached_nblocks[forknum] == blocknum)
+			reln->smgr_cached_nblocks[forknum] = blocknum + nblocks;
+		else
+			reln->smgr_cached_nblocks[forknum] = InvalidBlockNumber;
+	}
 }
 
 /*
@@ -743,14 +724,14 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
 /*
  * smgrregistersync() -- Request a relation to be sync'd at next checkpoint
  *
- * This can be used after calling smgrwrite() or smgrextend() with skipFsync =
- * true, to register the fsyncs that were skipped earlier.
+ * This can be used after calling smgrwritev() with SMGR_WRITE_SKIP_FSYNC,
+ * to register the fsyncs that were skipped earlier.
  *
  * Note: be mindful that a checkpoint could already have happened between the
- * smgrwrite or smgrextend calls and this!  In that case, the checkpoint
- * already missed fsyncing this relation, and you should use smgrimmedsync
- * instead.  Most callers should use the bulk loading facility in bulk_write.c
- * which handles all that.
+ * smgrwritev calls and this!  In that case, the checkpoint already missed
+ * fsyncing this relation, and you should use smgrimmedsync instead.  Most
+ * callers should use the bulk loading facility in bulk_write.c which handles
+ * all that.
  */
 void
 smgrregistersync(SMgrRelation reln, ForkNumber forknum)
@@ -764,17 +745,16 @@ smgrregistersync(SMgrRelation reln, ForkNumber forknum)
  * Synchronously force all previous writes to the specified relation
  * down to disk.
  *
- * This is useful for building completely new relations (eg, new
- * indexes).  Instead of incrementally WAL-logging the index build
- * steps, we can just write completed index pages to disk with smgrwrite
- * or smgrextend, and then fsync the completed index file before
- * committing the transaction.  (This is sufficient for purposes of
- * crash recovery, since it effectively duplicates forcing a checkpoint
- * for the completed index.  But it is *not* sufficient if one wishes
- * to use the WAL log for PITR or replication purposes: in that case
- * we have to make WAL entries as well.)
- *
- * The preceding writes should specify skipFsync = true to avoid
+ * This is useful for building completely new relations (eg, new indexes).
+ * Instead of incrementally WAL-logging the index build steps, we can just
+ * write completed index pages to disk with smgrwritev, and then fsync the
+ * completed index file before committing the transaction.  (This is
+ * sufficient for purposes of crash recovery, since it effectively duplicates
+ * forcing a checkpoint for the completed index.  But it is *not* sufficient
+ * if one wishes to use the WAL log for PITR or replication purposes: in that
+ * case we have to make WAL entries as well.)
+ *
+ * The preceding writes should specify SMGR_WRITE_SKIP_FSYNC to avoid
  * duplicative fsyncs.
  *
  * Note that you need to do FlushRelationBuffers() first if there is
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 620f10abdeb..5fcd6f47dfe 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -26,8 +26,6 @@ extern void mdclose(SMgrRelation reln, ForkNumber forknum);
 extern void mdcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
 extern void mdunlink(RelFileLocatorBackend rlocator, ForkNumber forknum, bool isRedo);
-extern void mdextend(SMgrRelation reln, ForkNumber forknum,
-					 BlockNumber blocknum, const void *buffer, bool skipFsync);
 extern void mdzeroextend(SMgrRelation reln, ForkNumber forknum,
 						 BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -36,7 +34,7 @@ extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
-					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+					 const void **buffers, BlockNumber nblocks, int flags);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index fc5f883ce14..2b8b72820cd 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -73,6 +73,9 @@ typedef SMgrRelationData *SMgrRelation;
 #define SmgrIsTemp(smgr) \
 	RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
 
+#define SMGR_WRITE_SKIP_FSYNC 0x01
+#define SMGR_WRITE_EXTEND 0x02
+
 extern void smgrinit(void);
 extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
 extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -86,8 +89,6 @@ extern void smgrreleaserellocator(RelFileLocatorBackend rlocator);
 extern void smgrcreate(SMgrRelation reln, ForkNumber forknum, bool isRedo);
 extern void smgrdosyncall(SMgrRelation *rels, int nrels);
 extern void smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo);
-extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
-					   BlockNumber blocknum, const void *buffer, bool skipFsync);
 extern void smgrzeroextend(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum, int nblocks, bool skipFsync);
 extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
@@ -98,7 +99,7 @@ extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
 extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffer, BlockNumber nblocks,
-					   bool skipFsync);
+					   int flags);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -121,7 +122,17 @@ static inline void
 smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 		  const void *buffer, bool skipFsync)
 {
-	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0);
+}
+
+static inline void
+smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+		   const void *buffer, bool skipFsync)
+{
+	smgrwritev(reln, forknum, blocknum, &buffer, 1,
+			   SMGR_WRITE_EXTEND |
+			   (skipFsync ? SMGR_WRITE_SKIP_FSYNC : 0));
 }
 
 #endif							/* SMGR_H */
-- 
2.44.0

From df9e2356d61c47f4a943d0a21ccc64faf6616cc0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmu...@postgresql.org>
Date: Mon, 8 Apr 2024 18:58:57 +1200
Subject: [PATCH v7 2/3] Use vectored I/O in CREATE INDEX.

Commit 8af25652's bulk_write.c was originally designed with the goal of
being able to use vectored writes (along with other batching
opportunities), but it couldn't do that at the time because the vectored
variant of smgrextend() didn't exist yet.  Now it does, so we can
connect the dots.

Concretely, CREATE INDEX commands for several index types now respect
the io_combine_limit setting, when bypassing the buffer pool and writing
new indexes directly to the storage manager.

Reviewed-by: Heikki Linnakangas <hlinn...@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
Discussion: https://postgr.es/m/30e8f366-58b3-b239-c521-422122dd5150%40iki.fi
---
 src/backend/storage/smgr/bulk_write.c | 93 ++++++++++++++++++++-------
 1 file changed, 69 insertions(+), 24 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 4a10ece4c39..45d66245860 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -8,7 +8,7 @@
  * the regular buffer manager and the bulk loading interface!
  *
  * We bypass the buffer manager to avoid the locking overhead, and call
- * smgrextend() directly.  A downside is that the pages will need to be
+ * smgrwritev() directly.  A downside is that the pages will need to be
  * re-read into shared buffers on first use after the build finishes.  That's
  * usually a good tradeoff for large relations, and for small relations, the
  * overhead isn't very significant compared to creating the relation in the
@@ -225,35 +225,79 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 	for (int i = 0; i < npending; i++)
 	{
-		BlockNumber blkno = pending_writes[i].blkno;
-		Page		page = pending_writes[i].buf->data;
-
+		Page		page;
+		const void *pages[MAX_IO_COMBINE_LIMIT];
+		BlockNumber blkno;
+		int			nblocks;
+		int			max_nblocks;
+
+		/* Prepare to write the first block. */
+		blkno = pending_writes[i].blkno;
+		page = pending_writes[i].buf->data;
 		PageSetChecksumInplace(page, blkno);
+		pages[0] = page;
+		nblocks = 1;
 
-		if (blkno >= bulkstate->pages_written)
+		/*
+		 * If we have to write pages nonsequentially, fill in the space with
+		 * zeroes until we come back and overwrite.  This is not logically
+		 * necessary on standard Unix filesystems (unwritten space will read
+		 * as zeroes anyway), but it should help to avoid fragmentation.  The
+		 * dummy pages aren't WAL-logged though.
+		 */
+		while (blkno > bulkstate->pages_written)
+		{
+			/* don't set checksum for all-zero page */
+			smgrextend(bulkstate->smgr, bulkstate->forknum,
+					   bulkstate->pages_written++,
+					   &zero_buffer,
+					   true);
+		}
+
+		if (blkno < bulkstate->pages_written)
 		{
 			/*
-			 * If we have to write pages nonsequentially, fill in the space
-			 * with zeroes until we come back and overwrite.  This is not
-			 * logically necessary on standard Unix filesystems (unwritten
-			 * space will read as zeroes anyway), but it should help to avoid
-			 * fragmentation.  The dummy pages aren't WAL-logged though.
+			 * We're overwriting.  Clamp at the existing size, because we
+			 * can't mix writing and extending in a single operation.
 			 */
-			while (blkno > bulkstate->pages_written)
-			{
-				/* don't set checksum for all-zero page */
-				smgrextend(bulkstate->smgr, bulkstate->forknum,
-						   bulkstate->pages_written++,
-						   &zero_buffer,
-						   true);
-			}
-
-			smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-			bulkstate->pages_written = pending_writes[i].blkno + 1;
+			max_nblocks = Min(io_combine_limit,
+							  bulkstate->pages_written - blkno);
+		}
+		else
+		{
+			/* We're extending. */
+			Assert(blkno == bulkstate->pages_written);
+			max_nblocks = io_combine_limit;
+		}
+
+		/* Collect as many consecutive blocks as we can. */
+		while (i + 1 < npending &&
+			   pending_writes[i + 1].blkno == blkno + nblocks &&
+			   nblocks < max_nblocks)
+		{
+			page = pending_writes[++i].buf->data;
+			PageSetChecksumInplace(page, pending_writes[i].blkno);
+			pages[nblocks++] = page;
+		}
+
+		/* Extend or overwrite. */
+		if (blkno == bulkstate->pages_written)
+		{
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC | SMGR_WRITE_EXTEND);
+			bulkstate->pages_written += nblocks;
 		}
 		else
-			smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
-		pfree(page);
+		{
+			Assert(blkno + nblocks <= bulkstate->pages_written);
+			smgrwritev(bulkstate->smgr, bulkstate->forknum, blkno,
+					   pages, nblocks,
+					   SMGR_WRITE_SKIP_FSYNC);
+		}
+
+		for (int j = 0; j < nblocks; ++j)
+			pfree(pending_writes[i - j].buf->data);
 	}
 
 	bulkstate->npending = 0;
@@ -277,7 +321,8 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
-	if (bulkstate->npending == MAX_PENDING_WRITES)
+	if (bulkstate->npending == MAX_PENDING_WRITES ||
+		bulkstate->npending == io_combine_limit)
 		smgr_bulk_flush(bulkstate);
 }
 
-- 
2.44.0

From 642e563a1aa8a4c27be2792dde558730edc79900 Mon Sep 17 00:00:00 2001
From: Thomas Munro <tmu...@postgresql.org>
Date: Mon, 8 Apr 2024 19:59:45 +1200
Subject: [PATCH v7 3/3] Use contiguous memory for bulk_write.c.

Instead of allocating buffers one at a time with palloc(), allocate an
array full of them up front, and then manage them in a FIFO freelist.
Aside from avoiding repeated allocator overheads, this means that
callers that tend to write sequential blocks will tend to fill up
sequential memory, which hopefully generates more efficient vectored
writes or simple non-vectored writes.  This implements an idea described
in the comments of commit 8af25652.

Reviewed-by: Heikki Linnakangas <hlinn...@iki.fi>
Discussion: https://postgr.es/m/CA%2BhUKGLx5bLwezZKAYB2O_qHj%3Dov10RpgRVY7e8TSJVE74oVjg%40mail.gmail.com
---
 src/backend/storage/smgr/bulk_write.c | 66 +++++++++++++++++++--------
 src/tools/pgindent/typedefs.list      |  1 +
 2 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/src/backend/storage/smgr/bulk_write.c b/src/backend/storage/smgr/bulk_write.c
index 45d66245860..6d16619f6be 100644
--- a/src/backend/storage/smgr/bulk_write.c
+++ b/src/backend/storage/smgr/bulk_write.c
@@ -36,6 +36,7 @@
 
 #include "access/xloginsert.h"
 #include "access/xlogrecord.h"
+#include "lib/ilist.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
 #include "storage/bulk_write.h"
@@ -47,9 +48,15 @@
 
 static const PGIOAlignedBlock zero_buffer = {{0}};	/* worth BLCKSZ */
 
+typedef union BufferSlot
+{
+	PGIOAlignedBlock buffer;
+	dlist_node	freelist_node;
+} BufferSlot;
+
 typedef struct PendingWrite
 {
-	BulkWriteBuffer buf;
+	BufferSlot *slot;
 	BlockNumber blkno;
 	bool		page_std;
 } PendingWrite;
@@ -59,6 +66,14 @@ typedef struct PendingWrite
  */
 struct BulkWriteState
 {
+	/*
+	 * This must come first so that it is correctly aligned for I/O.  We have
+	 * one extra slot for the write that has been allocated but not yet
+	 * submitted to smgr_bulk_write() yet.
+	 */
+	BufferSlot	buffer_slots[MAX_PENDING_WRITES + 1];
+	dlist_head	buffer_slots_freelist;
+
 	/* Information about the target relation we're writing */
 	SMgrRelation smgr;
 	ForkNumber	forknum;
@@ -73,8 +88,6 @@ struct BulkWriteState
 
 	/* The RedoRecPtr at the time that the bulk operation started */
 	XLogRecPtr	start_RedoRecPtr;
-
-	MemoryContext memcxt;
 };
 
 static void smgr_bulk_flush(BulkWriteState *bulkstate);
@@ -100,7 +113,7 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 {
 	BulkWriteState *state;
 
-	state = palloc(sizeof(BulkWriteState));
+	state = palloc_aligned(sizeof(BulkWriteState), PG_IO_ALIGN_SIZE, 0);
 	state->smgr = smgr;
 	state->forknum = forknum;
 	state->use_wal = use_wal;
@@ -110,11 +123,11 @@ smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
 
 	state->start_RedoRecPtr = GetRedoRecPtr();
 
-	/*
-	 * Remember the memory context.  We will use it to allocate all the
-	 * buffers later.
-	 */
-	state->memcxt = CurrentMemoryContext;
+	/* Set up the free-list of buffers. */
+	dlist_init(&state->buffer_slots_freelist);
+	for (int i = 0; i < lengthof(state->buffer_slots); ++i)
+		dlist_push_tail(&state->buffer_slots_freelist,
+						&state->buffer_slots[i].freelist_node);
 
 	return state;
 }
@@ -208,7 +221,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 		for (int i = 0; i < npending; i++)
 		{
 			blknos[i] = pending_writes[i].blkno;
-			pages[i] = pending_writes[i].buf->data;
+			pages[i] = pending_writes[i].slot->buffer.data;
 
 			/*
 			 * If any of the pages use !page_std, we log them all as such.
@@ -233,7 +246,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 
 		/* Prepare to write the first block. */
 		blkno = pending_writes[i].blkno;
-		page = pending_writes[i].buf->data;
+		page = pending_writes[i].slot->buffer.data;
 		PageSetChecksumInplace(page, blkno);
 		pages[0] = page;
 		nblocks = 1;
@@ -275,7 +288,7 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 			   pending_writes[i + 1].blkno == blkno + nblocks &&
 			   nblocks < max_nblocks)
 		{
-			page = pending_writes[++i].buf->data;
+			page = pending_writes[++i].slot->buffer.data;
 			PageSetChecksumInplace(page, pending_writes[i].blkno);
 			pages[nblocks++] = page;
 		}
@@ -296,8 +309,14 @@ smgr_bulk_flush(BulkWriteState *bulkstate)
 					   SMGR_WRITE_SKIP_FSYNC);
 		}
 
-		for (int j = 0; j < nblocks; ++j)
-			pfree(pending_writes[i - j].buf->data);
+		/*
+		 * Maintain FIFO ordering in the free list, so that users who write
+		 * blocks in sequential order tend to get sequential chunks of buffer
+		 * memory, which may be slight more efficient for vectored writes.
+		 */
+		for (int j = i - nblocks + 1; j <= i; ++j)
+			dlist_push_tail(&bulkstate->buffer_slots_freelist,
+							&pending_writes[j].slot->freelist_node);
 	}
 
 	bulkstate->npending = 0;
@@ -317,7 +336,7 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
 	PendingWrite *w;
 
 	w = &bulkstate->pending_writes[bulkstate->npending++];
-	w->buf = buf;
+	w->slot = (BufferSlot *) buf;
 	w->blkno = blocknum;
 	w->page_std = page_std;
 
@@ -332,12 +351,21 @@ smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer
  * There is no function to free the buffer.  When you pass it to
  * smgr_bulk_write(), it takes ownership and frees it when it's no longer
  * needed.
- *
- * This is currently implemented as a simple palloc, but could be implemented
- * using a ring buffer or larger chunks in the future, so don't rely on it.
  */
 BulkWriteBuffer
 smgr_bulk_get_buf(BulkWriteState *bulkstate)
 {
-	return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+	BufferSlot *slot;
+
+	if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+	{
+		smgr_bulk_flush(bulkstate);
+		if (dlist_is_empty(&bulkstate->buffer_slots_freelist))
+			elog(ERROR, "too many bulk write buffers used but not yet written");
+	}
+
+	slot = dlist_head_element(BufferSlot, freelist_node, &bulkstate->buffer_slots_freelist);
+	dlist_pop_head_node(&bulkstate->buffer_slots_freelist);
+
+	return &slot->buffer;
 }
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 704e61dcaa2..05669916c78 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -328,6 +328,7 @@ BufferDescPadded
 BufferHeapTupleTableSlot
 BufferLookupEnt
 BufferManagerRelation
+BufferSlot
 BufferStrategyControl
 BufferTag
 BufferUsage
-- 
2.44.0

Reply via email to