On Wed, Apr 6, 2016 at 3:11 PM, Michael Paquier
<michael.paqu...@gmail.com> wrote:
> On Wed, Mar 23, 2016 at 12:45 PM, Michael Paquier
> <michael.paqu...@gmail.com> wrote:
>> On Wed, Mar 23, 2016 at 11:11 AM, David Steele <da...@pgmasters.net> wrote:
>>> I would prefer not to bump it to the next CF unless we decide this will
>>> not get fixed for 9.6.
>>
>> It may make sense to add that to the list of open items for 9.6
>> instead. That's not a feature.
>
> So I have moved this patch to the next CF for now, and will work on
> fixing it rather soonishly as an effort to stabilize 9.6 as well as
> back-branches.

Well, not that soon at the end, but I am back on that... I have not
completely reviewed all the code yet, and the case of index relation
referring to a relation optimized with truncate is still broken, but
for now here is a rebased patch if people are interested. I am going
to get as well a TAP tests out of my pocket to ease testing.
-- 
Michael
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 38bba16..bbc09cd 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -55,6 +55,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/namespace.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -2331,12 +2332,6 @@ FreeBulkInsertState(BulkInsertState bistate)
  * The new tuple is stamped with current transaction ID and the specified
  * command ID.
  *
- * If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not
- * logged in WAL, even for a non-temp relation.  Safe usage of this behavior
- * requires that we arrange that all new tuples go into new pages not
- * containing any tuples from other transactions, and that the relation gets
- * fsync'd before commit.  (See also heap_sync() comments)
- *
  * The HEAP_INSERT_SKIP_FSM option is passed directly to
  * RelationGetBufferForTuple, which see for more info.
  *
@@ -2440,7 +2435,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
+	if (HeapNeedsWAL(relation, buffer))
 	{
 		xl_heap_insert xlrec;
 		xl_heap_header xlhdr;
@@ -2639,12 +2634,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	int			ndone;
 	char	   *scratch = NULL;
 	Page		page;
-	bool		needwal;
 	Size		saveFreeSpace;
 	bool		need_tuple_data = RelationIsLogicallyLogged(relation);
 	bool		need_cids = RelationIsAccessibleInLogicalDecoding(relation);
 
-	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
@@ -2659,7 +2652,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	 * palloc() within a critical section is not safe, so we allocate this
 	 * beforehand.
 	 */
-	if (needwal)
+	if (RelationNeedsWAL(relation))
 		scratch = palloc(BLCKSZ);
 
 	/*
@@ -2727,7 +2720,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 			 * We don't use heap_multi_insert for catalog tuples yet, but
 			 * better be prepared...
 			 */
-			if (needwal && need_cids)
+			if (HeapNeedsWAL(relation, buffer) && need_cids)
 				log_heap_new_cid(relation, heaptup);
 		}
 
@@ -2747,7 +2740,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 		MarkBufferDirty(buffer);
 
 		/* XLOG stuff */
-		if (needwal)
+		if (HeapNeedsWAL(relation, buffer))
 		{
 			XLogRecPtr	recptr;
 			xl_heap_multi_insert *xlrec;
@@ -3261,7 +3254,7 @@ l1:
 	 * NB: heap_abort_speculative() uses the same xlog record and replay
 	 * routines.
 	 */
-	if (RelationNeedsWAL(relation))
+	if (HeapNeedsWAL(relation, buffer))
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
@@ -3982,7 +3975,7 @@ l2:
 
 		MarkBufferDirty(buffer);
 
-		if (RelationNeedsWAL(relation))
+		if (HeapNeedsWAL(relation, buffer))
 		{
 			xl_heap_lock xlrec;
 			XLogRecPtr	recptr;
@@ -4194,7 +4187,7 @@ l2:
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (HeapNeedsWAL(relation, buffer))
 	{
 		XLogRecPtr	recptr;
 
@@ -5148,7 +5141,7 @@ failed:
 	 * (Also, in a PITR log-shipping or 2PC environment, we have to have XLOG
 	 * entries for everything anyway.)
 	 */
-	if (RelationNeedsWAL(relation))
+	if (HeapNeedsWAL(relation, *buffer))
 	{
 		xl_heap_lock xlrec;
 		XLogRecPtr	recptr;
@@ -5825,7 +5818,7 @@ l4:
 		MarkBufferDirty(buf);
 
 		/* XLOG stuff */
-		if (RelationNeedsWAL(rel))
+		if (HeapNeedsWAL(rel, buf))
 		{
 			xl_heap_lock_updated xlrec;
 			XLogRecPtr	recptr;
@@ -5980,7 +5973,7 @@ heap_finish_speculative(Relation relation, HeapTuple tuple)
 	htup->t_ctid = tuple->t_self;
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (HeapNeedsWAL(relation, buffer))
 	{
 		xl_heap_confirm xlrec;
 		XLogRecPtr	recptr;
@@ -6112,7 +6105,7 @@ heap_abort_speculative(Relation relation, HeapTuple tuple)
 	 * The WAL records generated here match heap_delete().  The same recovery
 	 * routines are used.
 	 */
-	if (RelationNeedsWAL(relation))
+	if (HeapNeedsWAL(relation, buffer))
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
@@ -6218,7 +6211,7 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (HeapNeedsWAL(relation, buffer))
 	{
 		xl_heap_inplace xlrec;
 		XLogRecPtr	recptr;
@@ -9081,3 +9074,71 @@ heap_sync(Relation rel)
 		heap_close(toastrel, AccessShareLock);
 	}
 }
+
+/*
+ *	heap_register_sync	- register a heap to be synced to disk at commit
+ *
+ * This can be used to skip WAL-logging changes on a relation file that has
+ * been created in the same transaction. After calling this, any changes to
+ * the heap (including TOAST heap if any) in the same transaction will not be
+ * WAL-logged. Instead, the heap contents are flushed to disk at commit,
+ * like heap_sync() does.
+ *
+ * Like with heap_sync(), indexes are not touched.
+ */
+void
+heap_register_sync(Relation rel)
+{
+	/* non-WAL-logged tables never need fsync */
+	if (!RelationNeedsWAL(rel))
+		return;
+
+	smgrRegisterPendingSync(rel);
+	if (OidIsValid(rel->rd_rel->reltoastrelid))
+	{
+		Relation	toastrel;
+
+		toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock);
+		smgrRegisterPendingSync(toastrel);
+		heap_close(toastrel, AccessShareLock);
+	}
+}
+
+/*
+ * Do changes to given heap page need to be WAL-logged?
+ *
+ * This takes into account any previous heap_register_sync() requests.
+ *
+ * Note that it is required to use this before creating any WAL records for
+ * heap pages - it is not merely an optimization. WAL-logging a record,
+ * when we have already skipped a previous WAL record for the same page could
+ * lead to failure at WAL replay, as the "before" state expected by the
+ * record might not match what's on disk (this should only a be problem
+ * with full_page_writes=off, though).
+ */
+bool
+HeapNeedsWAL(Relation rel, Buffer buf)
+{
+	/* Temporary relations never need WAL */
+	if (!RelationNeedsWAL(rel))
+		return false;
+
+	/*
+	 * If we are going to fsync() the relation at COMMIT, and we have not
+	 * truncated the block away previously, and we have not emitted any WAL
+	 * records for this block yet, we can skip WAL-logging it.
+	 */
+	if (smgrIsSyncPending(rel->rd_node, BufferGetBlockNumber(buf)))
+	{
+		/*
+		 * If a pending fsync() will handle this page, its LSN should be
+		 * invalid. If it's not, we've already emitted a WAL record for this
+		 * block, and all subsequent changes to the block must be WAL-logged
+		 * too.
+		 */
+		Assert(PageGetLSN(BufferGetPage(buf)) == InvalidXLogRecPtr);
+		return false;
+	}
+
+	return true;
+}
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 6ff9251..3207134 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -260,7 +260,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 		/*
 		 * Emit a WAL HEAP_CLEAN record showing what we did
 		 */
-		if (RelationNeedsWAL(relation))
+		if (HeapNeedsWAL(relation, buffer))
 		{
 			XLogRecPtr	recptr;
 
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 3ad4a9f..fb07795 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -307,6 +307,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 		map[mapByte] |= (flags << mapOffset);
 		MarkBufferDirty(vmBuf);
 
+		/* XXX: Should we use HeapNeedsWAL here? */
 		if (RelationNeedsWAL(rel))
 		{
 			if (XLogRecPtrIsInvalid(recptr))
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 23f36ea..f66d9ab 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2007,6 +2007,9 @@ CommitTransaction(void)
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
+	/* Flush updates to relations that we didn't WAL-logged */
+	smgrDoPendingSyncs(true);
+
 	/*
 	 * Mark serializable transaction as complete for predicate locking
 	 * purposes.  This should be done as late as we can put it and still allow
@@ -2237,6 +2240,9 @@ PrepareTransaction(void)
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
+	/* Flush updates to relations that we didn't WAL-logged */
+	smgrDoPendingSyncs(true);
+
 	/*
 	 * Mark serializable transaction as complete for predicate locking
 	 * purposes.  This should be done as late as we can put it and still allow
@@ -2541,6 +2547,7 @@ AbortTransaction(void)
 	AtAbort_Notify();
 	AtEOXact_RelationMap(false);
 	AtAbort_Twophase();
+	smgrDoPendingSyncs(false);
 
 	/*
 	 * Advertise the fact that we aborted in pg_clog (assuming that we got as
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 0d8311c..54ff874 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -20,6 +20,7 @@
 #include "postgres.h"
 
 #include "access/visibilitymap.h"
+#include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xloginsert.h"
@@ -29,6 +30,7 @@
 #include "catalog/storage_xlog.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -64,6 +66,42 @@ typedef struct PendingRelDelete
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
 
 /*
+ * We also track relation files (RelFileNode values) that have been created
+ * in the same transaction, and that have been modified without WAL-logging
+ * the action. When we are about to begin a large operation on the relation,
+ * a PendingRelSync entry is created, and 'sync_above' is set to the current
+ * size of the relation. Any operations on blocks < sync_above need to be
+ * WAL-logged as usual, but for operations on higher blocks, WAL-logging is
+ * skipped. It's important that after WAL-logging has been skipped for a
+ * block, we don't WAL log any subsequent actions on the same block either.
+ * Replaying the WAl record of the subsequent action might fail otherwise,
+ * as the "before" state of the block might not match, as the earlier actions
+ * were not WAL-logged.
+ *
+ * If a relation is truncated (without creating a new relfilenode), and we
+ * emit a WAL record of the truncation, we cannot skip WAL-logging for that
+ * relation anymore, as replaying the truncation record will destroy all the
+ * data inserted after that. But if we have already decided to skip WAL-logging
+ * changes to a relation, and the relation is truncated, we don't need to
+ * WAL-log the truncation either.
+ *
+ * This mechanism is currently only used by heaps. Indexes are always
+ * WAL-logged. Also, this only applies for wal_level=minimal; with higher
+ * WAL levels we need the WAL for PITR/replication anyway.
+ */
+/* Relations that need to be fsync'd at commit */
+typedef struct PendingRelSync
+{
+	RelFileNode relnode;		/* relation created in same xact */
+	BlockNumber	sync_above;		/* WAL-logging skipped for blocks >= sync_above */
+	bool		truncated;		/* truncation WAL record was written */
+} PendingRelSync;
+
+static HTAB *pendingSyncs = NULL;
+
+static void createPendingSyncsHash(void);
+
+/*
  * RelationCreateStorage
  *		Create physical storage for a relation.
  *
@@ -226,6 +264,8 @@ RelationPreserveStorage(RelFileNode rnode, bool atCommit)
 void
 RelationTruncate(Relation rel, BlockNumber nblocks)
 {
+	PendingRelSync *pending = NULL;
+	bool		found;
 	bool		fsm;
 	bool		vm;
 
@@ -249,6 +289,17 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
 	if (vm)
 		visibilitymap_truncate(rel, nblocks);
 
+	if (!pendingSyncs)
+		createPendingSyncsHash();
+	pending = (PendingRelSync *) hash_search(pendingSyncs,
+											 (void *) &rel->rd_node,
+											 HASH_ENTER, &found);
+	if (!found)
+	{
+		pending->sync_above = InvalidBlockNumber;
+		pending->truncated = false;
+	}
+
 	/*
 	 * We WAL-log the truncation before actually truncating, which means
 	 * trouble if the truncation fails. If we then crash, the WAL replay
@@ -258,7 +309,8 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
 	 * failure to truncate, that might spell trouble at WAL replay, into a
 	 * certain PANIC.
 	 */
-	if (RelationNeedsWAL(rel))
+	if (RelationNeedsWAL(rel) &&
+		(pending->sync_above == InvalidBlockNumber || pending->sync_above < nblocks))
 	{
 		/*
 		 * Make an XLOG entry reporting the file truncation.
@@ -276,6 +328,9 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
 		lsn = XLogInsert(RM_SMGR_ID,
 						 XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
 
+		if (rel->rd_node.relNode >= FirstNormalObjectId)
+			elog(LOG, "WAL-logged truncation of rel %u to %u blocks", rel->rd_node.relNode, nblocks);
+
 		/*
 		 * Flush, because otherwise the truncation of the main relation might
 		 * hit the disk before the WAL record, and the truncation of the FSM
@@ -285,6 +340,8 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
 		 */
 		if (fsm || vm)
 			XLogFlush(lsn);
+
+		pending->truncated = true;
 	}
 
 	/* Do the real work */
@@ -419,6 +476,142 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 	return nrels;
 }
 
+/* create the hash table to track pending at-commit fsyncs */
+static void
+createPendingSyncsHash(void)
+{
+	/* First time through: initialize the hash table */
+	HASHCTL		ctl;
+
+	MemSet(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(RelFileNode);
+	ctl.entrysize = sizeof(PendingRelSync);
+	ctl.hash = tag_hash;
+	pendingSyncs = hash_create("pending relation sync table", 5,
+							   &ctl, HASH_ELEM | HASH_FUNCTION);
+}
+
+/*
+ * Remember that the given relation needs to be sync'd at commit, because
+ * we are going to skip WAL-logging subsequent actions to it.
+ */
+void
+smgrRegisterPendingSync(Relation rel)
+{
+	PendingRelSync *pending;
+	bool		found;
+	BlockNumber	nblocks;
+
+	nblocks = RelationGetNumberOfBlocks(rel);
+
+	if (!pendingSyncs)
+		createPendingSyncsHash();
+
+	/* Look up or create an entry */
+	pending = (PendingRelSync *) hash_search(pendingSyncs,
+											 (void *) &rel->rd_node,
+											 HASH_ENTER, &found);
+	if (!found)
+	{
+		pending->truncated = false;
+		pending->sync_above = nblocks;
+
+		if (rel->rd_node.relNode >= FirstNormalObjectId)
+			elog(LOG, "Registering new pending sync for rel %u at block %u", rel->rd_node.relNode, nblocks);
+
+	}
+	else if (pending->sync_above == InvalidBlockNumber)
+	{
+		if (rel->rd_node.relNode >= FirstNormalObjectId)
+			elog(LOG, "Registering pending sync for rel %u at block %u", rel->rd_node.relNode, nblocks);
+		pending->sync_above = nblocks;
+	}
+	else
+		if (rel->rd_node.relNode >= FirstNormalObjectId)
+			elog(LOG, "Not updating pending sync for rel %u at block %u (was %u)", rel->rd_node.relNode, nblocks, pending->sync_above);
+}
+
+/*
+ * Are we going to fsync() this relation at COMMIT, and hence don't need to
+ * WAL-log changes to the given block?
+ */
+bool
+smgrIsSyncPending(RelFileNode rnode, BlockNumber blkno)
+{
+	PendingRelSync *pending;
+	bool		found;
+
+	if (!pendingSyncs)
+		return false;
+
+	pending = (PendingRelSync *) hash_search(pendingSyncs,
+											 (void *) &rnode,
+											 HASH_FIND, &found);
+	if (!found)
+		return false;
+
+	/*
+	 * We have no fsync() pending for this relation, or we have (possibly)
+	 * already emitted WAL records for this block.
+	 */
+	if (pending->sync_above == InvalidBlockNumber ||
+		pending->sync_above > blkno)
+	{
+		if (rnode.relNode >= FirstNormalObjectId)
+			elog(LOG, "Not skipping WAL-logging for rel %u block %u, because sync_above is %u", rnode.relNode, blkno, pending->sync_above);
+		return false;
+	}
+
+	/*
+	 * We have emitted a truncation record for this block.
+	 */
+	if (pending->truncated)
+	{
+		if (rnode.relNode >= FirstNormalObjectId)
+			elog(LOG, "Not skipping WAL-logging for rel %u block %u, because it was truncated", rnode.relNode, blkno);
+		return false;
+	}
+
+	if (rnode.relNode >= FirstNormalObjectId)
+		elog(LOG, "Skipping WAL-logging for rel %u block %u", rnode.relNode, blkno);
+
+	return true;
+}
+
+/*
+ * Sync to disk any relations that we skipped WAL-logging for earlier.
+ */
+void
+smgrDoPendingSyncs(bool isCommit)
+{
+	if (!pendingSyncs)
+		return;
+
+	if (isCommit)
+	{
+		HASH_SEQ_STATUS status;
+		PendingRelSync *pending;
+
+		hash_seq_init(&status, pendingSyncs);
+
+		while ((pending = hash_seq_search(&status)) != NULL)
+		{
+			if (pending->sync_above != InvalidBlockNumber)
+			{
+				FlushRelFileNodeBuffers(pending->relnode, false);
+				/* FlushRelationBuffers will have opened rd_smgr */
+				smgrimmedsync(smgropen(pending->relnode, InvalidBackendId), MAIN_FORKNUM);
+
+				if (pending->relnode.relNode >= FirstNormalObjectId)
+					elog(LOG, "Syncing rel %u", pending->relnode.relNode);
+			}
+		}
+	}
+
+	hash_destroy(pendingSyncs);
+	pendingSyncs = NULL;
+}
+
 /*
  *	PostPrepare_smgr -- Clean up after a successful PREPARE
  *
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f45b330..01486da 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -26,6 +26,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/pg_type.h"
+#include "catalog/storage.h"
 #include "commands/copy.h"
 #include "commands/defrem.h"
 #include "commands/trigger.h"
@@ -2302,7 +2303,10 @@ CopyFrom(CopyState cstate)
 	{
 		hi_options |= HEAP_INSERT_SKIP_FSM;
 		if (!XLogIsNeeded())
+		{
+			heap_register_sync(cstate->rel);
 			hi_options |= HEAP_INSERT_SKIP_WAL;
+		}
 	}
 
 	/*
@@ -2551,11 +2555,11 @@ CopyFrom(CopyState cstate)
 	FreeExecutorState(estate);
 
 	/*
-	 * If we skipped writing WAL, then we need to sync the heap (but not
-	 * indexes since those use WAL anyway)
+	 * If we skipped writing WAL, then we will sync the heap at the end of
+	 * the transaction (we used to do it here, but it was later found out
+	 * that to be safe, we must avoid WAL-logging any subsequent actions on
+	 * the pages we skipped WAL for). Indexes always use WAL.
 	 */
-	if (hi_options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(cstate->rel);
 
 	return processed;
 }
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 231e92d..1b1246f 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -1462,7 +1462,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(onerel))
+	if (HeapNeedsWAL(onerel, buffer))
 	{
 		XLogRecPtr	recptr;
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 76ade37..d1e7bc8 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -452,6 +452,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 			bool *foundPtr);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
 static void AtProcExit_Buffers(int code, Datum arg);
+static void FlushRelFileNodeBuffers_internal(SMgrRelation smgr, bool islocal);
 static void CheckForBufferLeaks(void);
 static int	rnode_comparator(const void *p1, const void *p2);
 static int	buffertag_comparator(const void *p1, const void *p2);
@@ -3136,14 +3137,30 @@ FlushRelationBuffers(Relation rel)
 	/* Open rel at the smgr level if not already done */
 	RelationOpenSmgr(rel);
 
-	if (RelationUsesLocalBuffers(rel))
+	FlushRelFileNodeBuffers_internal(rel->rd_smgr, RelationUsesLocalBuffers(rel));
+}
+
+void
+FlushRelFileNodeBuffers(RelFileNode rnode, bool islocal)
+{
+	FlushRelFileNodeBuffers_internal(smgropen(rnode, InvalidBackendId), islocal);
+}
+
+static void
+FlushRelFileNodeBuffers_internal(SMgrRelation smgr, bool islocal)
+{
+	RelFileNode rnode = smgr->smgr_rnode.node;
+	int                     i;
+	BufferDesc *bufHdr;
+
+	if (islocal)
 	{
 		for (i = 0; i < NLocBuffer; i++)
 		{
 			uint32		buf_state;
 
 			bufHdr = GetLocalBufferDescriptor(i);
-			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+			if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 				((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
 				 (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 			{
@@ -3160,7 +3177,7 @@ FlushRelationBuffers(Relation rel)
 
 				PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
 
-				smgrwrite(rel->rd_smgr,
+				smgrwrite(smgr,
 						  bufHdr->tag.forkNum,
 						  bufHdr->tag.blockNum,
 						  localpage,
@@ -3190,18 +3207,18 @@ FlushRelationBuffers(Relation rel)
 		 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
 		 * and saves some cycles.
 		 */
-		if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode))
 			continue;
 
 		ReservePrivateRefCountEntry();
 
 		buf_state = LockBufHdr(bufHdr);
-		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+		if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 			(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-			FlushBuffer(bufHdr, rel->rd_smgr);
+			FlushBuffer(bufHdr, smgr);
 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 			UnpinBuffer(bufHdr, true);
 		}
@@ -3397,6 +3414,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 		bool		dirtied = false;
 		bool		delayChkpt = false;
 		uint32		buf_state;
+		RelFileNode rnode;
+		ForkNumber	forknum;
+		BlockNumber blknum;
 
 		/*
 		 * If we need to protect hint bit updates from torn writes, WAL-log a
@@ -3407,8 +3427,10 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 		 * We don't check full_page_writes here because that logic is included
 		 * when we call XLogInsert() since the value changes dynamically.
 		 */
+		BufferGetTag(buffer, &rnode, &forknum, &blknum);
 		if (XLogHintBitIsNeeded() &&
-			(pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
+			(pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) &&
+			!smgrIsSyncPending(rnode, blknum))
 		{
 			/*
 			 * If we're in recovery we cannot dirty a page because of a hint.
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b3a595c..06082d9 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,7 +25,7 @@
 
 
 /* "options" flag bits for heap_insert */
-#define HEAP_INSERT_SKIP_WAL	0x0001
+#define HEAP_INSERT_SKIP_WAL	0x0001 /* obsolete, not used anymore */
 #define HEAP_INSERT_SKIP_FSM	0x0002
 #define HEAP_INSERT_FROZEN		0x0004
 #define HEAP_INSERT_SPECULATIVE 0x0008
@@ -177,6 +177,7 @@ extern void simple_heap_delete(Relation relation, ItemPointer tid);
 extern void simple_heap_update(Relation relation, ItemPointer otid,
 				   HeapTuple tup);
 
+extern void heap_register_sync(Relation relation);
 extern void heap_sync(Relation relation);
 
 /* in heap/pruneheap.c */
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 06a8242..5418d71 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -378,6 +378,8 @@ extern void heap2_desc(StringInfo buf, XLogReaderState *record);
 extern const char *heap2_identify(uint8 info);
 extern void heap_xlog_logical_rewrite(XLogReaderState *r);
 
+extern bool HeapNeedsWAL(Relation rel, Buffer buf);
+
 extern XLogRecPtr log_heap_cleanup_info(RelFileNode rnode,
 					  TransactionId latestRemovedXid);
 extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index ef960da..c618c78 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -29,6 +29,9 @@ extern void RelationTruncate(Relation rel, BlockNumber nblocks);
  */
 extern void smgrDoPendingDeletes(bool isCommit);
 extern int	smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
+extern void smgrRegisterPendingSync(Relation rel);
+extern bool smgrIsSyncPending(RelFileNode rnode, BlockNumber blkno);
+extern void smgrDoPendingSyncs(bool isCommit);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 3d5dea7..0622dee 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -202,6 +202,7 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
 								ForkNumber forkNum);
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
+extern void FlushRelFileNodeBuffers(RelFileNode rel, bool islocal);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode,
 					   ForkNumber forkNum, BlockNumber firstDelBlock);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to