On Mon, May 3, 2021 at 5:24 PM Melanie Plageman
<melanieplage...@gmail.com> wrote:
> On Thu, Jan 21, 2021 at 5:51 PM Andres Freund <and...@anarazel.de> wrote:
> > On 2021-01-21 23:54:04 +0200, Heikki Linnakangas wrote:
> > > On 21/01/2021 22:36, Andres Freund wrote:
> > > >
> > > > Thinking through the correctness of replacing smgrimmedsync() with sync
> > > > requests, the potential problems that I can see are:
> > > >
> > > > 1) redo point falls between the log_newpage() and the
> > > >     write()/register_dirty_segment() in smgrextend/smgrwrite.
> > > > 2) redo point falls between write() and register_dirty_segment()
> > > >
> > > > But both of these are fine in the context of initially filling a newly
> > > > created relfilenode, as far as I can tell? Otherwise the current
> > > > smgrimmedsync() approach wouldn't be safe either, as far as I can tell?
> > >
> > > Hmm. If the redo point falls between write() and the
> > > register_dirty_segment(), and the checkpointer finishes the whole 
> > > checkpoint
> > > before register_dirty_segment(), you are not safe. That can't happen with
> > > write from the buffer manager, because the checkpointer would block 
> > > waiting
> > > for the flush of the buffer to finish.
> >
> > Hm, right.
> >
> > The easiest way to address that race would be to just record the redo
> > pointer in _bt_leafbuild() and continue to do the smgrimmedsync if a
> > checkpoint started since the start of the index build.
> >
> > Another approach would be to utilize PGPROC.delayChkpt, but I would
> > rather not unnecessarily expand the use of that.
> >
> > It's kind of interesting - in my aio branch I moved the
> > register_dirty_segment() to before the actual asynchronous write (due to
> > availability of the necessary data), which ought to be safe because of
> > the buffer interlocking. But that doesn't work here, or for other places
> > doing writes without going through s_b.  It'd be great if we could come
> > up with a general solution, but I don't immediately see anything great.
> >
> > The best I can come up with is adding helper functions to wrap some of
> > the complexity for "unbuffered" writes of doing an immedsync iff the
> > redo pointer changed. Something very roughly like
> >
> > typedef struct UnbufferedWriteState { XLogRecPtr redo; uint64 numwrites;} 
> > UnbufferedWriteState;
> > void unbuffered_prep(UnbufferedWriteState* state);
> > void unbuffered_write(UnbufferedWriteState* state, ...);
> > void unbuffered_extend(UnbufferedWriteState* state, ...);
> > void unbuffered_finish(UnbufferedWriteState* state);
> >
> > which wouldn't just do the dance to avoid the immedsync() if possible,
> > but also took care of PageSetChecksumInplace() (and PageEncryptInplace()
> > if we get that [1]).
> >
>
> Regarding the implementation, I think having an API to do these
> "unbuffered" or "direct" writes outside of shared buffers is a good
> idea. In this specific case, the proposed API would not change the code
> much. I would just wrap the small diffs I added to the beginning and end
> of _bt_load() in the API calls for unbuffered_prep() and
> unbuffered_finish() and then tuck away the second half of
> _bt_blwritepage() in unbuffered_write()/unbuffered_extend(). I figured I
> would do so after ensuring the correctness of the logic in this patch.
> Then I will work on a patch which implements the unbuffered_write() API
> and demonstrates its utility with at least a few of the most compelling
> most compelling use cases in the code.
>

I've taken a pass at writing the API for "direct" or "unbuffered" writes
and extends. It introduces the suggested functions: unbuffered_prep(),
unbuffered_finish(), unbuffered_write(), and unbuffered_extend().

This is a rough cut -- corrections welcome and encouraged!

unbuffered_prep() saves the xlog redo pointer at the time it is called.
Then, if the redo pointer hasn't changed by the time unbuffered_finish()
is called, the backend can avoid calling smgrimmedsync(). Note that this
only works if intervening calls to smgrwrite() and smgrextend() pass
skipFsync=False.

unbuffered_write() and unbuffered_extend() might be able to be used even
if unbuffered_prep() and unbuffered_finish() are not used -- for example
hash indexes do something I don't entirely understand in which they call
smgrextend() directly when allocating buckets but then initialize the
new bucket pages using the bufmgr machinery.

I also intend to move accounting of pages written and extended into the
unbuffered_write() and unbuffered_extend() functions using the functions
I propose in [1] to populate a new view, pg_stat_buffers. Then this
"unbuffered" IO would be counted in stats.

I picked the name "direct" for the directory in /src/backend/storage
because I thought that these functions are analogous to direct IO in
Linux -- in that they are doing IO without going through Postgres bufmgr
-- unPGbuffered, basically. Other suggestions were "raw" and "relIO".
Raw seemed confusing since raw device IO is pretty far from what is
happening here. RelIO didn't seem like it belonged next to bufmgr (to
me). However, direct and unbuffered will both soon become fraught
terminology with the introduction of AIO and direct IO to Postgres...

- Melanie

[1] 
https://www.postgresql.org/message-id/flat/20200124195226.lth52iydq2n2uilq%40alap3.anarazel.de
From f68147500e88741debdae730763fb57d42c6ab79 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplage...@gmail.com>
Date: Tue, 28 Sep 2021 14:51:11 -0400
Subject: [PATCH v1] Add unbuffered IO and avoid immed fsync

Replace unbuffered extends and writes
---
 src/backend/access/gist/gistbuild.c       | 16 +++----
 src/backend/access/hash/hashpage.c        |  9 ++--
 src/backend/access/heap/heapam_handler.c  | 16 ++++---
 src/backend/access/heap/rewriteheap.c     | 25 ++++-------
 src/backend/access/heap/visibilitymap.c   |  8 ++--
 src/backend/access/nbtree/nbtree.c        | 17 ++++---
 src/backend/access/nbtree/nbtsort.c       | 39 +++++-----------
 src/backend/access/spgist/spginsert.c     | 25 +++++------
 src/backend/access/transam/xlog.c         | 13 ++++++
 src/backend/catalog/storage.c             | 25 +++--------
 src/backend/storage/Makefile              |  2 +-
 src/backend/storage/direct/Makefile       | 17 +++++++
 src/backend/storage/direct/directmgr.c    | 55 +++++++++++++++++++++++
 src/backend/storage/freespace/freespace.c | 10 +++--
 src/include/access/xlog.h                 |  1 +
 15 files changed, 171 insertions(+), 107 deletions(-)
 create mode 100644 src/backend/storage/direct/Makefile
 create mode 100644 src/backend/storage/direct/directmgr.c

diff --git a/src/backend/access/gist/gistbuild.c b/src/backend/access/gist/gistbuild.c
index baad28c09f..34f712590c 100644
--- a/src/backend/access/gist/gistbuild.c
+++ b/src/backend/access/gist/gistbuild.c
@@ -43,6 +43,7 @@
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -91,6 +92,7 @@ typedef struct
 
 	int64		indtuples;		/* number of tuples indexed */
 
+	UnBufferedWriteState ub_wstate;
 	/*
 	 * Extra data structures used during a buffering build. 'gfbb' contains
 	 * information related to managing the build buffers. 'parentMap' is a
@@ -194,6 +196,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
 	buildstate.heaprel = heap;
 	buildstate.sortstate = NULL;
 	buildstate.giststate = initGISTstate(index);
+	buildstate.ub_wstate.smgr_rel = RelationGetSmgr(index);
 
 	/*
 	 * Create a temporary memory context that is reset once for each tuple
@@ -403,14 +406,14 @@ gist_indexsortbuild(GISTBuildState *state)
 	state->pages_allocated = 0;
 	state->pages_written = 0;
 	state->ready_num_pages = 0;
+	unbuffered_prep(&state->ub_wstate);
 
 	/*
 	 * Write an empty page as a placeholder for the root page. It will be
 	 * replaced with the real root page at the end.
 	 */
 	page = palloc0(BLCKSZ);
-	smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			   page, true);
+	unbuffered_extend(&state->ub_wstate, MAIN_FORKNUM, GIST_ROOT_BLKNO, page, true);
 	state->pages_allocated++;
 	state->pages_written++;
 
@@ -450,13 +453,12 @@ gist_indexsortbuild(GISTBuildState *state)
 
 	/* Write out the root */
 	PageSetLSN(pagestate->page, GistBuildLSN);
-	PageSetChecksumInplace(pagestate->page, GIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
-			  pagestate->page, true);
+	unbuffered_write(&state->ub_wstate, MAIN_FORKNUM, GIST_ROOT_BLKNO, pagestate->page);
 	if (RelationNeedsWAL(state->indexrel))
 		log_newpage(&state->indexrel->rd_node, MAIN_FORKNUM, GIST_ROOT_BLKNO,
 					pagestate->page, true);
 
+	unbuffered_finish(&state->ub_wstate, MAIN_FORKNUM);
 	pfree(pagestate->page);
 	pfree(pagestate);
 }
@@ -570,9 +572,7 @@ gist_indexsortbuild_flush_ready_pages(GISTBuildState *state)
 			elog(ERROR, "unexpected block number to flush GiST sorting build");
 
 		PageSetLSN(page, GistBuildLSN);
-		PageSetChecksumInplace(page, blkno);
-		smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, blkno, page,
-				   true);
+		unbuffered_extend(&state->ub_wstate, MAIN_FORKNUM, blkno, page, false);
 
 		state->pages_written++;
 	}
diff --git a/src/backend/access/hash/hashpage.c b/src/backend/access/hash/hashpage.c
index 159646c7c3..fcc0e28a36 100644
--- a/src/backend/access/hash/hashpage.c
+++ b/src/backend/access/hash/hashpage.c
@@ -32,6 +32,7 @@
 #include "access/hash_xlog.h"
 #include "miscadmin.h"
 #include "port/pg_bitutils.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/smgr.h"
@@ -990,8 +991,10 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 	PGAlignedBlock zerobuf;
 	Page		page;
 	HashPageOpaque ovflopaque;
+	UnBufferedWriteState ub_wstate;
 
 	lastblock = firstblock + nblocks - 1;
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 
 	/*
 	 * Check for overflow in block number calculation; if so, we cannot extend
@@ -1000,6 +1003,8 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 	if (lastblock < firstblock || lastblock == InvalidBlockNumber)
 		return false;
 
+	unbuffered_prep(&ub_wstate);
+
 	page = (Page) zerobuf.data;
 
 	/*
@@ -1024,9 +1029,7 @@ _hash_alloc_buckets(Relation rel, BlockNumber firstblock, uint32 nblocks)
 					zerobuf.data,
 					true);
 
-	PageSetChecksumInplace(page, lastblock);
-	smgrextend(RelationGetSmgr(rel), MAIN_FORKNUM, lastblock, zerobuf.data,
-			   false);
+	unbuffered_extend(&ub_wstate, MAIN_FORKNUM, lastblock, zerobuf.data, false);
 
 	return true;
 }
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 9befe012a9..fa4780e186 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -38,6 +38,7 @@
 #include "pgstat.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
@@ -575,6 +576,7 @@ heapam_relation_set_new_filenode(Relation rel,
 								 MultiXactId *minmulti)
 {
 	SMgrRelation srel;
+	UnBufferedWriteState ub_wstate;
 
 	/*
 	 * Initialize to the minimum XID that could put tuples in the table. We
@@ -594,15 +596,15 @@ heapam_relation_set_new_filenode(Relation rel,
 	*minmulti = GetOldestMultiXactId();
 
 	srel = RelationCreateStorage(*newrnode, persistence);
+	ub_wstate.smgr_rel = srel;
+	unbuffered_prep(&ub_wstate);
 
 	/*
 	 * If required, set up an init fork for an unlogged table so that it can
-	 * be correctly reinitialized on restart.  An immediate sync is required
-	 * even if the page has been logged, because the write did not go through
-	 * shared_buffers and therefore a concurrent checkpoint may have moved the
-	 * redo pointer past our xlog record.  Recovery may as well remove it
-	 * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
-	 * record. Therefore, logging is necessary even if wal_level=minimal.
+	 * be correctly reinitialized on restart.
+	 * Recovery may as well remove our xlog record while replaying, for
+	 * example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record. Therefore,
+	 * logging is necessary even if wal_level=minimal.
 	 */
 	if (persistence == RELPERSISTENCE_UNLOGGED)
 	{
@@ -611,7 +613,7 @@ heapam_relation_set_new_filenode(Relation rel,
 			   rel->rd_rel->relkind == RELKIND_TOASTVALUE);
 		smgrcreate(srel, INIT_FORKNUM, false);
 		log_smgrcreate(newrnode, INIT_FORKNUM);
-		smgrimmedsync(srel, INIT_FORKNUM);
+		unbuffered_finish(&ub_wstate, INIT_FORKNUM);
 	}
 
 	smgrclose(srel);
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 986a776bbd..6d1f4c8f6b 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -119,6 +119,7 @@
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/fd.h"
 #include "storage/procarray.h"
 #include "storage/smgr.h"
@@ -152,6 +153,7 @@ typedef struct RewriteStateData
 	HTAB	   *rs_old_new_tid_map; /* unmatched B tuples */
 	HTAB	   *rs_logical_mappings;	/* logical remapping files */
 	uint32		rs_num_rewrite_mappings;	/* # in memory mappings */
+	UnBufferedWriteState rs_unbuffered_wstate;
 }			RewriteStateData;
 
 /*
@@ -264,6 +266,9 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	state->rs_freeze_xid = freeze_xid;
 	state->rs_cutoff_multi = cutoff_multi;
 	state->rs_cxt = rw_cxt;
+	state->rs_unbuffered_wstate.smgr_rel = RelationGetSmgr(state->rs_new_rel);
+
+	unbuffered_prep(&state->rs_unbuffered_wstate);
 
 	/* Initialize hash tables used to track update chains */
 	hash_ctl.keysize = sizeof(TidHashKey);
@@ -324,21 +329,12 @@ end_heap_rewrite(RewriteState state)
 						state->rs_buffer,
 						true);
 
-		PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
-
-		smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-				   state->rs_blockno, (char *) state->rs_buffer, true);
+		unbuffered_extend(&state->rs_unbuffered_wstate, MAIN_FORKNUM,
+				state->rs_blockno, state->rs_buffer, false);
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is the same as in storage.c's RelationCopyStorage(): we're
-	 * writing data that's not in shared buffers, and so a CHECKPOINT
-	 * occurring during the rewriteheap operation won't have fsync'd data we
-	 * wrote before the checkpoint.
-	 */
 	if (RelationNeedsWAL(state->rs_new_rel))
-		smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
+		unbuffered_finish(&state->rs_unbuffered_wstate, MAIN_FORKNUM);
 
 	logical_end_heap_rewrite(state);
 
@@ -690,10 +686,7 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 			 * need for smgr to schedule an fsync for this write; we'll do it
 			 * ourselves in end_heap_rewrite.
 			 */
-			PageSetChecksumInplace(page, state->rs_blockno);
-
-			smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
-					   state->rs_blockno, (char *) page, true);
+			unbuffered_extend(&state->rs_unbuffered_wstate, MAIN_FORKNUM, state->rs_blockno, page, false);
 
 			state->rs_blockno++;
 			state->rs_buffer_valid = false;
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 114fbbdd30..0f357fc4ff 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -92,6 +92,7 @@
 #include "miscadmin.h"
 #include "port/pg_bitutils.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 #include "utils/inval.h"
@@ -616,7 +617,9 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
 	BlockNumber vm_nblocks_now;
 	PGAlignedBlock pg;
 	SMgrRelation reln;
+	UnBufferedWriteState ub_wstate;
 
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 	PageInit((Page) pg.data, BLCKSZ, 0);
 
 	/*
@@ -654,9 +657,8 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
 	/* Now extend the file */
 	while (vm_nblocks_now < vm_nblocks)
 	{
-		PageSetChecksumInplace((Page) pg.data, vm_nblocks_now);
-
-		smgrextend(reln, VISIBILITYMAP_FORKNUM, vm_nblocks_now, pg.data, false);
+		// TODO: aren't these pages empty? why checksum them
+		unbuffered_extend(&ub_wstate, VISIBILITYMAP_FORKNUM, vm_nblocks_now, (Page) pg.data, false);
 		vm_nblocks_now++;
 	}
 
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 40ad0956e0..c3e3418570 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -29,6 +29,7 @@
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "storage/condition_variable.h"
+#include "storage/directmgr.h"
 #include "storage/indexfsm.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
@@ -150,6 +151,11 @@ void
 btbuildempty(Relation index)
 {
 	Page		metapage;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = RelationGetSmgr(index);
+
+	unbuffered_prep(&wstate);
 
 	/* Construct metapage. */
 	metapage = (Page) palloc(BLCKSZ);
@@ -162,18 +168,15 @@ btbuildempty(Relation index)
 	 * XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE record.  Therefore, we need
 	 * this even when wal_level=minimal.
 	 */
-	PageSetChecksumInplace(metapage, BTREE_METAPAGE);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, BTREE_METAPAGE,
-			  (char *) metapage, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, BTREE_METAPAGE, metapage);
 	log_newpage(&RelationGetSmgr(index)->smgr_rnode.node, INIT_FORKNUM,
 				BTREE_METAPAGE, metapage, true);
 
 	/*
-	 * An immediate sync is required even if we xlog'd the page, because the
-	 * write did not go through shared_buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
+	 * Even though we xlog'd the page, a concurrent checkpoint may have moved
+	 * the redo pointer past our xlog record, so we may still need to fsync.
 	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, INIT_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 54c8eb1289..9cb9757875 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -57,6 +57,7 @@
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"		/* pgrminclude ignore */
 #include "utils/rel.h"
@@ -253,6 +254,7 @@ typedef struct BTWriteState
 	BlockNumber btws_pages_alloced; /* # pages allocated */
 	BlockNumber btws_pages_written; /* # pages written out */
 	Page		btws_zeropage;	/* workspace for filling zeroes */
+	UnBufferedWriteState ub_wstate;
 } BTWriteState;
 
 
@@ -560,6 +562,8 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 
 	wstate.heap = btspool->heap;
 	wstate.index = btspool->index;
+	wstate.ub_wstate.smgr_rel = RelationGetSmgr(btspool->index);
+	wstate.ub_wstate.redo = InvalidXLogRecPtr;
 	wstate.inskey = _bt_mkscankey(wstate.index, NULL);
 	/* _bt_mkscankey() won't set allequalimage without metapage */
 	wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
@@ -656,31 +660,19 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
 		if (!wstate->btws_zeropage)
 			wstate->btws_zeropage = (Page) palloc0(BLCKSZ);
 		/* don't set checksum for all-zero page */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM,
-				   wstate->btws_pages_written++,
-				   (char *) wstate->btws_zeropage,
-				   true);
+		unbuffered_extend(&wstate->ub_wstate, MAIN_FORKNUM, wstate->btws_pages_written++, wstate->btws_zeropage, true);
 	}
 
-	PageSetChecksumInplace(page, blkno);
 
-	/*
-	 * Now write the page.  There's no need for smgr to schedule an fsync for
-	 * this write; we'll do it ourselves before ending the build.
-	 */
+	/* Now write the page. Either we are extending the file... */
 	if (blkno == wstate->btws_pages_written)
 	{
-		/* extending the file... */
-		smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				   (char *) page, true);
+		unbuffered_extend(&wstate->ub_wstate, MAIN_FORKNUM, blkno, page, false);
 		wstate->btws_pages_written++;
 	}
+	/* or we are overwriting a block we zero-filled before. */
 	else
-	{
-		/* overwriting a block we zero-filled before */
-		smgrwrite(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
-				  (char *) page, true);
-	}
+		unbuffered_write(&wstate->ub_wstate, MAIN_FORKNUM, blkno, page);
 
 	pfree(page);
 }
@@ -1189,6 +1181,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	int64		tuples_done = 0;
 	bool		deduplicate;
 
+
+	unbuffered_prep(&wstate->ub_wstate);
 	deduplicate = wstate->inskey->allequalimage && !btspool->isunique &&
 		BTGetDeduplicateItems(wstate->index);
 
@@ -1415,17 +1409,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 	/* Close down final pages and write the metapage */
 	_bt_uppershutdown(wstate, state);
 
-	/*
-	 * When we WAL-logged index pages, we must nonetheless fsync index files.
-	 * Since we're building outside shared buffers, a CHECKPOINT occurring
-	 * during the build has no way to flush the previously written data to
-	 * disk (indeed it won't know the index even exists).  A crash later on
-	 * would replay WAL from the checkpoint, therefore it wouldn't replay our
-	 * earlier WAL entries. If we do not fsync those pages here, they might
-	 * still not be on disk when the crash occurs.
-	 */
 	if (wstate->btws_use_wal)
-		smgrimmedsync(RelationGetSmgr(wstate->index), MAIN_FORKNUM);
+		unbuffered_finish(&wstate->ub_wstate, MAIN_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/spgist/spginsert.c b/src/backend/access/spgist/spginsert.c
index cc4394b1c8..1aeb8bc714 100644
--- a/src/backend/access/spgist/spginsert.c
+++ b/src/backend/access/spgist/spginsert.c
@@ -25,6 +25,7 @@
 #include "catalog/index.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/directmgr.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -156,6 +157,10 @@ void
 spgbuildempty(Relation index)
 {
 	Page		page;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = RelationGetSmgr(index);
+	unbuffered_prep(&wstate);
 
 	/* Construct metapage. */
 	page = (Page) palloc(BLCKSZ);
@@ -168,36 +173,30 @@ spgbuildempty(Relation index)
 	 * of their existing content when the corresponding create records are
 	 * replayed.
 	 */
-	PageSetChecksumInplace(page, SPGIST_METAPAGE_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_METAPAGE_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_METAPAGE_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_METAPAGE_BLKNO, page, true);
 
 	/* Likewise for the root page. */
 	SpGistInitPage(page, SPGIST_LEAF);
 
-	PageSetChecksumInplace(page, SPGIST_ROOT_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_ROOT_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_ROOT_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_ROOT_BLKNO, page, true);
 
 	/* Likewise for the null-tuples root page. */
 	SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
 
-	PageSetChecksumInplace(page, SPGIST_NULL_BLKNO);
-	smgrwrite(RelationGetSmgr(index), INIT_FORKNUM, SPGIST_NULL_BLKNO,
-			  (char *) page, true);
+	unbuffered_write(&wstate, INIT_FORKNUM, SPGIST_NULL_BLKNO, page);
 	log_newpage(&(RelationGetSmgr(index))->smgr_rnode.node, INIT_FORKNUM,
 				SPGIST_NULL_BLKNO, page, true);
 
 	/*
-	 * An immediate sync is required even if we xlog'd the pages, because the
-	 * writes did not go through shared buffers and therefore a concurrent
-	 * checkpoint may have moved the redo pointer past our xlog record.
+	 * Because the writes did not go through shared buffers, if a concurrent
+	 * checkpoint moved the redo pointer past our xlog record, an immediate
+	 * sync is required even if we xlog'd the pages.
 	 */
-	smgrimmedsync(RelationGetSmgr(index), INIT_FORKNUM);
+	unbuffered_finish(&wstate, INIT_FORKNUM);
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749d..d11a928b62 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -12941,6 +12941,19 @@ CheckForStandbyTrigger(void)
 	return false;
 }
 
+bool RedoRecPtrChanged(XLogRecPtr comparator_ptr)
+{
+	XLogRecPtr ptr;
+	SpinLockAcquire(&XLogCtl->info_lck);
+	ptr = XLogCtl->RedoRecPtr;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	if (RedoRecPtr < ptr)
+		RedoRecPtr = ptr;
+
+	return RedoRecPtr != comparator_ptr;
+}
+
 /*
  * Remove the files signaling a standby promotion request.
  */
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index c5ad28d71f..c63085b1aa 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -28,6 +28,7 @@
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
 #include "miscadmin.h"
+#include "storage/directmgr.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
 #include "utils/hsearch.h"
@@ -420,6 +421,10 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 	bool		copying_initfork;
 	BlockNumber nblocks;
 	BlockNumber blkno;
+	UnBufferedWriteState wstate;
+
+	wstate.smgr_rel = dst;
+	unbuffered_prep(&wstate);
 
 	page = (Page) buf.data;
 
@@ -477,27 +482,11 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
 		if (use_wal)
 			log_newpage(&dst->smgr_rnode.node, forkNum, blkno, page, false);
 
-		PageSetChecksumInplace(page, blkno);
-
-		/*
-		 * Now write the page.  We say skipFsync = true because there's no
-		 * need for smgr to schedule an fsync for this write; we'll do it
-		 * ourselves below.
-		 */
-		smgrextend(dst, forkNum, blkno, buf.data, true);
+		unbuffered_extend(&wstate, forkNum, blkno, buf.data, false);
 	}
 
-	/*
-	 * When we WAL-logged rel pages, we must nonetheless fsync them.  The
-	 * reason is that since we're copying outside shared buffers, a CHECKPOINT
-	 * occurring during the copy has no way to flush the previously written
-	 * data to disk (indeed it won't know the new rel even exists).  A crash
-	 * later on would replay WAL from the checkpoint, therefore it wouldn't
-	 * replay our earlier WAL entries. If we do not fsync those pages here,
-	 * they might still not be on disk when the crash occurs.
-	 */
 	if (use_wal || copying_initfork)
-		smgrimmedsync(dst, forkNum);
+		unbuffered_finish(&wstate, forkNum);
 }
 
 /*
diff --git a/src/backend/storage/Makefile b/src/backend/storage/Makefile
index 8376cdfca2..501fae5f9d 100644
--- a/src/backend/storage/Makefile
+++ b/src/backend/storage/Makefile
@@ -8,6 +8,6 @@ subdir = src/backend/storage
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS     = buffer file freespace ipc large_object lmgr page smgr sync
+SUBDIRS     = buffer direct file freespace ipc large_object lmgr page smgr sync
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/direct/Makefile b/src/backend/storage/direct/Makefile
new file mode 100644
index 0000000000..d82bbed48c
--- /dev/null
+++ b/src/backend/storage/direct/Makefile
@@ -0,0 +1,17 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+#    Makefile for storage/direct
+#
+# IDENTIFICATION
+#    src/backend/storage/direct/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/storage/direct
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+OBJS = directmgr.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/direct/directmgr.c b/src/backend/storage/direct/directmgr.c
new file mode 100644
index 0000000000..d6f02487f8
--- /dev/null
+++ b/src/backend/storage/direct/directmgr.c
@@ -0,0 +1,55 @@
+/*-------------------------------------------------------------------------
+ *
+ * directmgr.c
+ *	  routines for managing unbuffered IO
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/direct/directmgr.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+
+#include "access/xlog.h"
+#include "storage/directmgr.h"
+#include "utils/rel.h"
+
+void unbuffered_prep(UnBufferedWriteState *wstate)
+{
+	wstate->redo = GetRedoRecPtr();
+}
+
+/*
+ * When writing data outside shared buffers, a concurrent CHECKPOINT can move
+ * the redo pointer past our WAL entries and won't flush our data to disk. If
+ * the database crashes before the data makes it to disk, our WAL won't be
+ * replayed and the data will be lost.
+ * Thus, if a CHECKPOINT begins between unbuffered_prep() and
+ * unbuffered_finish(), the backend must fsync the data itself.
+ */
+void unbuffered_finish(UnBufferedWriteState *wstate, ForkNumber forknum)
+{
+	if (RedoRecPtrChanged(wstate->redo))
+		smgrimmedsync(wstate->smgr_rel, forknum);
+}
+
+void
+unbuffered_write(UnBufferedWriteState *wstate, ForkNumber forknum, BlockNumber blocknum, Page page)
+{
+	PageSetChecksumInplace(page, blocknum);
+	smgrwrite(wstate->smgr_rel, forknum, blocknum, (char *) page, false);
+}
+
+void
+unbuffered_extend(UnBufferedWriteState *wstate, ForkNumber forknum, BlockNumber blocknum, Page page, bool empty)
+{
+	if (!empty)
+		PageSetChecksumInplace(page, blocknum);
+	smgrextend(wstate->smgr_rel, forknum, blocknum, (char *) page, false);
+}
+
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 09d4b16067..26abdfa589 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -26,6 +26,7 @@
 #include "access/htup_details.h"
 #include "access/xlogutils.h"
 #include "miscadmin.h"
+#include "storage/directmgr.h"
 #include "storage/freespace.h"
 #include "storage/fsm_internals.h"
 #include "storage/lmgr.h"
@@ -608,6 +609,9 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
 	BlockNumber fsm_nblocks_now;
 	PGAlignedBlock pg;
 	SMgrRelation reln;
+	UnBufferedWriteState ub_wstate;
+
+	ub_wstate.smgr_rel = RelationGetSmgr(rel);
 
 	PageInit((Page) pg.data, BLCKSZ, 0);
 
@@ -647,10 +651,8 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
 	/* Extend as needed. */
 	while (fsm_nblocks_now < fsm_nblocks)
 	{
-		PageSetChecksumInplace((Page) pg.data, fsm_nblocks_now);
-
-		smgrextend(reln, FSM_FORKNUM, fsm_nblocks_now,
-				   pg.data, false);
+		// TODO: why was it checksumming all zero pages?
+		unbuffered_extend(&ub_wstate, FSM_FORKNUM, fsm_nblocks_now, (Page) pg.data, false);
 		fsm_nblocks_now++;
 	}
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 5e2c94a05f..4a7b0d42de 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -314,6 +314,7 @@ extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
+extern bool RedoRecPtrChanged(XLogRecPtr comparator_ptr);
 extern void RemovePromoteSignalFiles(void);
 
 extern bool PromoteIsTriggered(void);
-- 
2.27.0

Reply via email to