Sorry, what I have just sent was broken.

At Tue, 11 Apr 2017 17:33:41 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20170411.173341.257028732.horiguchi.kyot...@lab.ntt.co.jp>
> At Tue, 11 Apr 2017 09:56:06 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
> <horiguchi.kyot...@lab.ntt.co.jp> wrote in 
> <20170411.095606.245908357.horiguchi.kyot...@lab.ntt.co.jp>
> > Hello, thank you for looking this.
> > 
> > At Fri, 07 Apr 2017 20:38:35 -0400, Tom Lane <t...@sss.pgh.pa.us> wrote in 
> > <27309.1491611...@sss.pgh.pa.us>
> > > Alvaro Herrera <alvhe...@2ndquadrant.com> writes:
> > > > Interesting.  I wonder if it's possible that a relcache invalidation
> > > > would cause these values to get lost for some reason, because that would
> > > > be dangerous.
> > > 
> > > > I suppose the rationale is that this shouldn't happen because any
> > > > operation that does things this way must hold an exclusive lock on the
> > > > relation.  But that doesn't guarantee that the relcache entry is
> > > > completely stable,
> > > 
> > > It ABSOLUTELY is not safe.  Relcache flushes can happen regardless of
> > > how strong a lock you hold.
> > > 
> > >                   regards, tom lane
> > 
> > Ugh. Yes, relcache invalidation happens anytime and it resets the
> > added values. pg_stat_info deceived me that it can store
> > transient values. But I  came up with another thought.
> > 
> > The reason I proposed it was I thought that hash_search for every
> > buffer is not good. Instead, like pg_stat_info, we can link the
> 
> buffer => buffer modification
> 
> > pending-sync hash entry to Relation. This greately reduces the
> > frequency of hash-searching.
> > 
> > I'll post new patch in this way soon.
> 
> Here it is.

It contained tariling space and missing test script.  This is the
correct patch.

> - Relation has new members no_pending_sync and pending_sync that
>   works as instant cache of an entry in pendingSync hash.
> 
> - Commit-time synchronizing is restored as Michael's patch.
> 
> - If relfilenode is replaced, pending_sync for the old node is
>   removed. Anyway this is ignored on abort and meaningless on
>   commit.
> 
> - TAP test is renamed to 012 since some new files have been added.
> 
> Accessing pending sync hash occured on every calling of
> HeapNeedsWAL() (per insertion/update/freeze of a tuple) if any of
> accessing relations has pending sync.  Almost of them are
> eliminated as the result.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0c3e2b0..23a6d56 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -34,6 +34,28 @@
  *	  the POSTGRES heap access method used for all POSTGRES
  *	  relations.
  *
+ * WAL CONSIDERATIONS
+ *	  All heap operations are normally WAL-logged. but there are a few
+ *	  exceptions. Temporary and unlogged relations never need to be
+ *	  WAL-logged, but we can also skip WAL-logging for a table that was
+ *	  created in the same transaction, if we don't need WAL for PITR or
+ *	  WAL archival purposes (i.e. if wal_level=minimal), and we fsync()
+ *	  the file to disk at COMMIT instead.
+ *
+ *	  The same-relation optimization is not employed automatically on all
+ *	  updates to a table that was created in the same transacton, because
+ *	  for a small number of changes, it's cheaper to just create the WAL
+ *	  records than fsyncing() the whole relation at COMMIT. It is only
+ *	  worthwhile for (presumably) large operations like COPY, CLUSTER,
+ *	  or VACUUM FULL. Use heap_register_sync() to initiate such an
+ *	  operation; it will cause any subsequent updates to the table to skip
+ *	  WAL-logging, if possible, and cause the heap to be synced to disk at
+ *	  COMMIT.
+ *
+ *	  To make that work, all modifications to heap must use
+ *	  HeapNeedsWAL() to check if WAL-logging is needed in this transaction
+ *	  for the given block.
+ *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -56,6 +78,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/namespace.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -2356,12 +2379,6 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
  * The new tuple is stamped with current transaction ID and the specified
  * command ID.
  *
- * If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not
- * logged in WAL, even for a non-temp relation.  Safe usage of this behavior
- * requires that we arrange that all new tuples go into new pages not
- * containing any tuples from other transactions, and that the relation gets
- * fsync'd before commit.  (See also heap_sync() comments)
- *
  * The HEAP_INSERT_SKIP_FSM option is passed directly to
  * RelationGetBufferForTuple, which see for more info.
  *
@@ -2392,6 +2409,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
  * TID where the tuple was stored.  But note that any toasting of fields
  * within the tuple data is NOT reflected into *tup.
  */
+extern HTAB *pendingSyncs;
 Oid
 heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 			int options, BulkInsertState bistate)
@@ -2465,7 +2483,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_insert xlrec;
 		xl_heap_header xlhdr;
@@ -2664,12 +2682,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	int			ndone;
 	char	   *scratch = NULL;
 	Page		page;
-	bool		needwal;
 	Size		saveFreeSpace;
 	bool		need_tuple_data = RelationIsLogicallyLogged(relation);
 	bool		need_cids = RelationIsAccessibleInLogicalDecoding(relation);
 
-	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
@@ -2684,7 +2700,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	 * palloc() within a critical section is not safe, so we allocate this
 	 * beforehand.
 	 */
-	if (needwal)
+	if (RelationNeedsWAL(relation))
 		scratch = palloc(BLCKSZ);
 
 	/*
@@ -2719,6 +2735,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 		Buffer		vmbuffer = InvalidBuffer;
 		bool		all_visible_cleared = false;
 		int			nthispage;
+		bool		needwal;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -2730,6 +2747,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 										   InvalidBuffer, options, bistate,
 										   &vmbuffer, NULL);
 		page = BufferGetPage(buffer);
+		needwal = BufferNeedsWAL(relation, buffer);
 
 		/* NO EREPORT(ERROR) from here till changes are logged */
 		START_CRIT_SECTION();
@@ -3286,7 +3304,7 @@ l1:
 	 * NB: heap_abort_speculative() uses the same xlog record and replay
 	 * routines.
 	 */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
@@ -4250,7 +4268,8 @@ l2:
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer) ||
+		BufferNeedsWAL(relation, newbuf))
 	{
 		XLogRecPtr	recptr;
 
@@ -5141,7 +5160,7 @@ failed:
 	 * (Also, in a PITR log-shipping or 2PC environment, we have to have XLOG
 	 * entries for everything anyway.)
 	 */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, *buffer))
 	{
 		xl_heap_lock xlrec;
 		XLogRecPtr	recptr;
@@ -5843,7 +5862,7 @@ l4:
 		MarkBufferDirty(buf);
 
 		/* XLOG stuff */
-		if (RelationNeedsWAL(rel))
+		if (BufferNeedsWAL(rel, buf))
 		{
 			xl_heap_lock_updated xlrec;
 			XLogRecPtr	recptr;
@@ -5998,7 +6017,7 @@ heap_finish_speculative(Relation relation, HeapTuple tuple)
 	htup->t_ctid = tuple->t_self;
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_confirm xlrec;
 		XLogRecPtr	recptr;
@@ -6131,7 +6150,7 @@ heap_abort_speculative(Relation relation, HeapTuple tuple)
 	 * The WAL records generated here match heap_delete().  The same recovery
 	 * routines are used.
 	 */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
@@ -6240,7 +6259,7 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_inplace xlrec;
 		XLogRecPtr	recptr;
@@ -7354,7 +7373,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	XLogRecPtr	recptr;
 
 	/* Caller should not call me on a non-WAL-logged relation */
-	Assert(RelationNeedsWAL(reln));
+	Assert(BufferNeedsWAL(reln, buffer));
 
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
@@ -7402,7 +7421,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
 	XLogRecPtr	recptr;
 
 	/* Caller should not call me on a non-WAL-logged relation */
-	Assert(RelationNeedsWAL(reln));
+	Assert(BufferNeedsWAL(reln, buffer));
 	/* nor when there are no tuples to freeze */
 	Assert(ntuples > 0);
 
@@ -7487,7 +7506,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	int			bufflags;
 
 	/* Caller should not call me on a non-WAL-logged relation */
-	Assert(RelationNeedsWAL(reln));
+	Assert(BufferNeedsWAL(reln, newbuf) || BufferNeedsWAL(reln, oldbuf));
 
 	XLogBeginInsert();
 
@@ -7590,76 +7609,86 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self);
 	xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
 
+	XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
+
 	bufflags = REGBUF_STANDARD;
 	if (init)
 		bufflags |= REGBUF_WILL_INIT;
 	if (need_tuple_data)
 		bufflags |= REGBUF_KEEP_DATA;
 
-	XLogRegisterBuffer(0, newbuf, bufflags);
-	if (oldbuf != newbuf)
-		XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD);
-
-	XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
-
 	/*
 	 * Prepare WAL data for the new tuple.
 	 */
-	if (prefixlen > 0 || suffixlen > 0)
+	if (BufferNeedsWAL(reln, newbuf))
 	{
-		if (prefixlen > 0 && suffixlen > 0)
-		{
-			prefix_suffix[0] = prefixlen;
-			prefix_suffix[1] = suffixlen;
-			XLogRegisterBufData(0, (char *) &prefix_suffix, sizeof(uint16) * 2);
-		}
-		else if (prefixlen > 0)
-		{
-			XLogRegisterBufData(0, (char *) &prefixlen, sizeof(uint16));
-		}
-		else
+		XLogRegisterBuffer(0, newbuf, bufflags);
+
+		if ((prefixlen > 0 || suffixlen > 0))
 		{
-			XLogRegisterBufData(0, (char *) &suffixlen, sizeof(uint16));
+			if (prefixlen > 0 && suffixlen > 0)
+			{
+				prefix_suffix[0] = prefixlen;
+				prefix_suffix[1] = suffixlen;
+				XLogRegisterBufData(0, (char *) &prefix_suffix,
+									sizeof(uint16) * 2);
+			}
+			else if (prefixlen > 0)
+			{
+				XLogRegisterBufData(0, (char *) &prefixlen, sizeof(uint16));
+			}
+			else
+			{
+				XLogRegisterBufData(0, (char *) &suffixlen, sizeof(uint16));
+			}
 		}
-	}
 
-	xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
-	xlhdr.t_infomask = newtup->t_data->t_infomask;
-	xlhdr.t_hoff = newtup->t_data->t_hoff;
-	Assert(SizeofHeapTupleHeader + prefixlen + suffixlen <= newtup->t_len);
+		xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
+		xlhdr.t_infomask = newtup->t_data->t_infomask;
+		xlhdr.t_hoff = newtup->t_data->t_hoff;
+		Assert(SizeofHeapTupleHeader + prefixlen + suffixlen <= newtup->t_len);
 
-	/*
-	 * PG73FORMAT: write bitmap [+ padding] [+ oid] + data
-	 *
-	 * The 'data' doesn't include the common prefix or suffix.
-	 */
-	XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
-	if (prefixlen == 0)
-	{
-		XLogRegisterBufData(0,
-							((char *) newtup->t_data) + SizeofHeapTupleHeader,
-						  newtup->t_len - SizeofHeapTupleHeader - suffixlen);
-	}
-	else
-	{
 		/*
-		 * Have to write the null bitmap and data after the common prefix as
-		 * two separate rdata entries.
+		 * PG73FORMAT: write bitmap [+ padding] [+ oid] + data
+		 *
+		 * The 'data' doesn't include the common prefix or suffix.
 		 */
-		/* bitmap [+ padding] [+ oid] */
-		if (newtup->t_data->t_hoff - SizeofHeapTupleHeader > 0)
+		XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
+		if (prefixlen == 0)
 		{
 			XLogRegisterBufData(0,
 						   ((char *) newtup->t_data) + SizeofHeapTupleHeader,
-							 newtup->t_data->t_hoff - SizeofHeapTupleHeader);
+						  newtup->t_len - SizeofHeapTupleHeader - suffixlen);
 		}
+		else
+		{
+			/*
+			 * Have to write the null bitmap and data after the common prefix
+			 * as two separate rdata entries.
+			 */
+			/* bitmap [+ padding] [+ oid] */
+			if (newtup->t_data->t_hoff - SizeofHeapTupleHeader > 0)
+			{
+				XLogRegisterBufData(0,
+						   ((char *) newtup->t_data) + SizeofHeapTupleHeader,
+							 newtup->t_data->t_hoff - SizeofHeapTupleHeader);
+			}
 
-		/* data after common prefix */
-		XLogRegisterBufData(0,
+			/* data after common prefix */
+			XLogRegisterBufData(0,
 			  ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen,
 			 newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen);
+		}
 	}
 
+	/*
+	 * If the old and new tuple are on different pages, also register the old
+	 * page, so that a full-page image is created for it if necessary. We
+	 * don't need any extra information to replay changes to it.
+	 */
+	if (oldbuf != newbuf && BufferNeedsWAL(reln, oldbuf))
+		XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD);
+
 	/* We need to log a tuple identity */
 	if (need_tuple_data && old_key_tuple)
 	{
@@ -8578,8 +8607,13 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
 	 */
 
 	/* Deal with old tuple version */
-	oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
-									  &obuffer);
+	if (oldblk == newblk)
+		oldaction = XLogReadBufferForRedo(record, 0, &obuffer);
+	else if (XLogRecHasBlockRef(record, 1))
+		oldaction = XLogReadBufferForRedo(record, 1, &obuffer);
+	else
+		oldaction = BLK_DONE;
+
 	if (oldaction == BLK_NEEDS_REDO)
 	{
 		page = BufferGetPage(obuffer);
@@ -8633,6 +8667,8 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
 		PageInit(page, BufferGetPageSize(nbuffer), 0);
 		newaction = BLK_NEEDS_REDO;
 	}
+	else if (!XLogRecHasBlockRef(record, 0))
+		newaction = BLK_DONE;
 	else
 		newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
 
@@ -9069,9 +9105,16 @@ heap2_redo(XLogReaderState *record)
  *	heap_sync		- sync a heap, for use when no WAL has been written
  *
  * This forces the heap contents (including TOAST heap if any) down to disk.
- * If we skipped using WAL, and WAL is otherwise needed, we must force the
- * relation down to disk before it's safe to commit the transaction.  This
- * requires writing out any dirty buffers and then doing a forced fsync.
+ * If we did any changes to the heap bypassing the buffer manager, we must
+ * force the relation down to disk before it's safe to commit the
+ * transaction, because the direct modifications will not be flushed by
+ * the next checkpoint.
+ *
+ * We used to also use this after batch operations like COPY and CLUSTER,
+ * if we skipped using WAL and WAL is otherwise needed, but there were
+ * corner-cases involving other WAL-logged operations to the same
+ * relation, where that was not enough. heap_register_sync() should be
+ * used for that purpose instead.
  *
  * Indexes are not touched.  (Currently, index operations associated with
  * the commands that use this are WAL-logged and so do not need fsync.
@@ -9181,3 +9224,33 @@ heap_mask(char *pagedata, BlockNumber blkno)
 		}
 	}
 }
+
+/*
+ *	heap_register_sync	- register a heap to be synced to disk at commit
+ *
+ * This can be used to skip WAL-logging changes on a relation file that has
+ * been created in the same transaction. This makes note of the current size of
+ * the relation, and ensures that when the relation is extended, any changes
+ * to the new blocks in the heap, in the same transaction, will not be
+ * WAL-logged. Instead, the heap contents are flushed to disk at commit,
+ * like heap_sync() does.
+ *
+ * This does the same for the TOAST heap, if any. Indexes are not affected.
+ */
+void
+heap_register_sync(Relation rel)
+{
+	/* non-WAL-logged tables never need fsync */
+	if (!RelationNeedsWAL(rel))
+		return;
+
+	RecordPendingSync(rel);
+	if (OidIsValid(rel->rd_rel->reltoastrelid))
+	{
+		Relation	toastrel;
+
+		toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock);
+		RecordPendingSync(toastrel);
+		heap_close(toastrel, AccessShareLock);
+	}
+}
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index d69a266..4754278 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -20,6 +20,7 @@
 #include "access/htup_details.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -260,7 +261,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 		/*
 		 * Emit a WAL HEAP_CLEAN record showing what we did
 		 */
-		if (RelationNeedsWAL(relation))
+		if (BufferNeedsWAL(relation, buffer))
 		{
 			XLogRecPtr	recptr;
 
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d7f65a5..6462f44 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -649,9 +649,7 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 	}
 	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
 		heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
-										 HEAP_INSERT_SKIP_FSM |
-										 (state->rs_use_wal ?
-										  0 : HEAP_INSERT_SKIP_WAL));
+										 HEAP_INSERT_SKIP_FSM);
 	else
 		heaptup = tup;
 
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index e5616ce..933fa9c 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -88,6 +88,7 @@
 #include "access/heapam_xlog.h"
 #include "access/visibilitymap.h"
 #include "access/xlog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -307,7 +308,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 		map[mapByte] |= (flags << mapOffset);
 		MarkBufferDirty(vmBuf);
 
-		if (RelationNeedsWAL(rel))
+		if (BufferNeedsWAL(rel, heapBuf))
 		{
 			if (XLogRecPtrIsInvalid(recptr))
 			{
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 92b263a..313a03b 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2007,6 +2007,9 @@ CommitTransaction(void)
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
+	/* Flush updates to relations that we didn't WAL-logged */
+	smgrDoPendingSyncs(true);
+
 	/*
 	 * Mark serializable transaction as complete for predicate locking
 	 * purposes.  This should be done as late as we can put it and still allow
@@ -2238,6 +2241,9 @@ PrepareTransaction(void)
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
+	/* Flush updates to relations that we didn't WAL-logged */
+	smgrDoPendingSyncs(true);
+
 	/*
 	 * Mark serializable transaction as complete for predicate locking
 	 * purposes.  This should be done as late as we can put it and still allow
@@ -2545,6 +2551,7 @@ AbortTransaction(void)
 	AtAbort_Notify();
 	AtEOXact_RelationMap(false);
 	AtAbort_Twophase();
+	smgrDoPendingSyncs(false);	/* abandone pending syncs */
 
 	/*
 	 * Advertise the fact that we aborted in pg_xact (assuming that we got as
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index f677916..14df0b1 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -29,6 +29,7 @@
 #include "catalog/storage_xlog.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -64,6 +65,49 @@ typedef struct PendingRelDelete
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
 
 /*
+ * We also track relation files (RelFileNode values) that have been created
+ * in the same transaction, and that have been modified without WAL-logging
+ * the action (an optimization possible with wal_level=minimal). When we are
+ * about to skip WAL-logging, a PendingRelSync entry is created, and
+ * 'sync_above' is set to the current size of the relation. Any operations
+ * on blocks < sync_above need to be WAL-logged as usual, but for operations
+ * on higher blocks, WAL-logging is skipped.
+ *
+ * NB: after WAL-logging has been skipped for a block, we must not WAL-log
+ * any subsequent actions on the same block either. Replaying the WAL record
+ * of the subsequent action might fail otherwise, as the "before" state of
+ * the block might not match, as the earlier actions were not WAL-logged.
+ * Likewise, after we have WAL-logged an operation for a block, we must
+ * WAL-log any subsequent operations on the same page as well. Replaying
+ * a possible full-page-image from the earlier WAL record would otherwise
+ * revert the page to the old state, even if we sync the relation at end
+ * of transaction.
+ *
+ * If a relation is truncated (without creating a new relfilenode), and we
+ * emit a WAL record of the truncation, we can't skip WAL-logging for any
+ * of the truncated blocks anymore, as replaying the truncation record will
+ * destroy all the data inserted after that. But if we have already decided
+ * to skip WAL-logging changes to a relation, and the relation is truncated,
+ * we don't need to WAL-log the truncation either.
+ *
+ * This mechanism is currently only used by heaps. Indexes are always
+ * WAL-logged. Also, this only applies for wal_level=minimal; with higher
+ * WAL levels we need the WAL for PITR/replication anyway.
+ */
+typedef struct PendingRelSync
+{
+	RelFileNode relnode;		/* relation created in same xact */
+	BlockNumber sync_above;		/* WAL-logging skipped for blocks >=
+								 * sync_above */
+	BlockNumber truncated_to;	/* truncation WAL record was written */
+}	PendingRelSync;
+
+/* Relations that need to be fsync'd at commit */
+static HTAB *pendingSyncs = NULL;
+
+static void createPendingSyncsHash(void);
+
+/*
  * RelationCreateStorage
  *		Create physical storage for a relation.
  *
@@ -226,6 +270,8 @@ RelationPreserveStorage(RelFileNode rnode, bool atCommit)
 void
 RelationTruncate(Relation rel, BlockNumber nblocks)
 {
+	PendingRelSync *pending = NULL;
+	bool		found;
 	bool		fsm;
 	bool		vm;
 
@@ -260,37 +306,81 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
 	 */
 	if (RelationNeedsWAL(rel))
 	{
-		/*
-		 * Make an XLOG entry reporting the file truncation.
-		 */
-		XLogRecPtr	lsn;
-		xl_smgr_truncate xlrec;
-
-		xlrec.blkno = nblocks;
-		xlrec.rnode = rel->rd_node;
-		xlrec.flags = SMGR_TRUNCATE_ALL;
-
-		XLogBeginInsert();
-		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+		/* no_pending_sync is ignored since new entry is created here */
+		if (!rel->pending_sync)
+		{
+			if (!pendingSyncs)
+				createPendingSyncsHash();
+			elog(LOG, "RelationTruncate: accessing hash");
+			pending = (PendingRelSync *) hash_search(pendingSyncs,
+												 (void *) &rel->rd_node,
+												 HASH_ENTER, &found);
+			if (!found)
+			{
+				pending->sync_above = InvalidBlockNumber;
+				pending->truncated_to = InvalidBlockNumber;
+			}
 
-		lsn = XLogInsert(RM_SMGR_ID,
-						 XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+			rel->no_pending_sync= false;
+			rel->pending_sync = pending;
+		}
 
-		/*
-		 * Flush, because otherwise the truncation of the main relation might
-		 * hit the disk before the WAL record, and the truncation of the FSM
-		 * or visibility map. If we crashed during that window, we'd be left
-		 * with a truncated heap, but the FSM or visibility map would still
-		 * contain entries for the non-existent heap pages.
-		 */
-		if (fsm || vm)
-			XLogFlush(lsn);
+		if (rel->pending_sync->sync_above == InvalidBlockNumber ||
+			rel->pending_sync->sync_above < nblocks)
+		{
+			/*
+			 * Make an XLOG entry reporting the file truncation.
+			 */
+			XLogRecPtr		lsn;
+			xl_smgr_truncate xlrec;
+
+			xlrec.blkno = nblocks;
+			xlrec.rnode = rel->rd_node;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+			lsn = XLogInsert(RM_SMGR_ID,
+							 XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+			elog(DEBUG2, "WAL-logged truncation of rel %u/%u/%u to %u blocks",
+				 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+				 nblocks);
+
+			/*
+			 * Flush, because otherwise the truncation of the main relation
+			 * might hit the disk before the WAL record, and the truncation of
+			 * the FSM or visibility map. If we crashed during that window,
+			 * we'd be left with a truncated heap, but the FSM or visibility
+			 * map would still contain entries for the non-existent heap
+			 * pages.
+			 */
+			if (fsm || vm)
+				XLogFlush(lsn);
+
+			rel->pending_sync->truncated_to = nblocks;
+		}
 	}
 
 	/* Do the real work */
 	smgrtruncate(rel->rd_smgr, MAIN_FORKNUM, nblocks);
 }
 
+/* create the hash table to track pending at-commit fsyncs */
+static void
+createPendingSyncsHash(void)
+{
+	/* First time through: initialize the hash table */
+	HASHCTL		ctl;
+
+	MemSet(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(RelFileNode);
+	ctl.entrysize = sizeof(PendingRelSync);
+	ctl.hash = tag_hash;
+	pendingSyncs = hash_create("pending relation sync table", 5,
+							   &ctl, HASH_ELEM | HASH_FUNCTION);
+}
+
 /*
  *	smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
  *
@@ -369,6 +459,24 @@ smgrDoPendingDeletes(bool isCommit)
 }
 
 /*
+ * RelationRemovePendingSync() -- remove pendingSync entry for a relation
+ */
+void
+RelationRemovePendingSync(Relation rel)
+{
+	bool found;
+
+	rel->pending_sync = NULL;
+	rel->no_pending_sync = true;
+	if (pendingSyncs)
+	{
+		elog(LOG, "RelationRemovePendingSync: accessing hash");
+		hash_search(pendingSyncs, (void *) &rel->rd_node, HASH_REMOVE, &found);
+	}
+}
+
+
+/*
  * smgrGetPendingDeletes() -- Get a list of non-temp relations to be deleted.
  *
  * The return value is the number of relations scheduled for termination.
@@ -419,6 +527,166 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 	return nrels;
 }
 
+
+/*
+ * Remember that the given relation needs to be sync'd at commit, because we
+ * are going to skip WAL-logging subsequent actions to it.
+ */
+void
+RecordPendingSync(Relation rel)
+{
+	bool found = true;
+	BlockNumber nblocks;
+
+	Assert(RelationNeedsWAL(rel));
+
+	/* ignore no_pending_sync since new entry is created here */
+	if (!rel->pending_sync)
+	{
+		if (!pendingSyncs)
+			createPendingSyncsHash();
+
+		/* Look up or create an entry */
+		rel->no_pending_sync = false;
+		elog(LOG, "RecordPendingSync: accessing hash");
+		rel->pending_sync =
+			(PendingRelSync *) hash_search(pendingSyncs,
+										   (void *) &rel->rd_node,
+										   HASH_ENTER, &found);
+	}
+
+	nblocks = RelationGetNumberOfBlocks(rel);
+	if (!found)
+	{
+		rel->pending_sync->truncated_to = InvalidBlockNumber;
+		rel->pending_sync->sync_above = nblocks;
+
+		elog(DEBUG2,
+			 "registering new pending sync for rel %u/%u/%u at block %u",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 nblocks);
+
+	}
+	else if (rel->pending_sync->sync_above == InvalidBlockNumber)
+	{
+		elog(DEBUG2, "registering pending sync for rel %u/%u/%u at block %u",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 nblocks);
+		rel->pending_sync->sync_above = nblocks;
+	}
+	else
+		elog(DEBUG2,
+			 "pending sync for rel %u/%u/%u was already registered at block %u (new %u)",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 rel->pending_sync->sync_above, nblocks);
+}
+
+/*
+ * Do changes to given heap page need to be WAL-logged?
+ *
+ * This takes into account any previous RecordPendingSync() requests.
+ *
+ * Note that it is required to check this before creating any WAL records for
+ * heap pages - it is not merely an optimization! WAL-logging a record, when
+ * we have already skipped a previous WAL record for the same page could lead
+ * to failure at WAL replay, as the "before" state expected by the record
+ * might not match what's on disk. Also, if the heap was truncated earlier, we
+ * must WAL-log any changes to the once-truncated blocks, because replaying
+ * the truncation record will destroy them.
+ */
+bool
+BufferNeedsWAL(Relation rel, Buffer buf)
+{
+	BlockNumber blkno = InvalidBlockNumber;
+
+	if (!RelationNeedsWAL(rel))
+		return false;
+
+	elog(LOG, "BufferNeedsWAL: pendingSyncs = %p, no_pending_sync = %d", pendingSyncs, rel->no_pending_sync);
+	/* no further work if we know that we don't have pending sync */
+	if (!pendingSyncs || rel->no_pending_sync)
+		return true;
+
+	/* do the real work */
+	if (!rel->pending_sync)
+	{
+		bool found = false;
+
+		elog(LOG, "BufferNeedsWAL: accessing hash");
+		rel->pending_sync =
+			(PendingRelSync *) hash_search(pendingSyncs,
+										   (void *) &rel->rd_node,
+										   HASH_FIND, &found);
+		if (!found)
+		{
+			/* we don't have no one. don't access the hash no longer */
+			rel->no_pending_sync = true;
+			return true;
+		}
+	}
+
+	blkno = BufferGetBlockNumber(buf);
+	if (rel->pending_sync->sync_above == InvalidBlockNumber ||
+		rel->pending_sync->sync_above > blkno)
+	{
+		elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block %u, because sync_above is %u",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 blkno, rel->pending_sync->sync_above);
+		return true;
+	}
+
+	/*
+	 * We have emitted a truncation record for this block.
+	 */
+	if (rel->pending_sync->truncated_to != InvalidBlockNumber &&
+		rel->pending_sync->truncated_to <= blkno)
+	{
+		elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block %u, because it was truncated earlier in the same xact",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 blkno);
+		return true;
+	}
+
+	elog(DEBUG2, "skipping WAL-logging for rel %u/%u/%u block %u",
+		 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+		 blkno);
+
+	return false;
+}
+
+/*
+ * Sync to disk any relations that we skipped WAL-logging for earlier.
+ */
+void
+smgrDoPendingSyncs(bool isCommit)
+{
+	if (!pendingSyncs)
+		return;
+
+	if (isCommit)
+	{
+		HASH_SEQ_STATUS status;
+		PendingRelSync *pending;
+
+		hash_seq_init(&status, pendingSyncs);
+
+		while ((pending = hash_seq_search(&status)) != NULL)
+		{
+			if (pending->sync_above != InvalidBlockNumber)
+			{
+				FlushRelationBuffersWithoutRelCache(pending->relnode, false);
+				smgrimmedsync(smgropen(pending->relnode, InvalidBackendId), MAIN_FORKNUM);
+
+				elog(DEBUG2, "syncing rel %u/%u/%u", pending->relnode.spcNode,
+					 pending->relnode.dbNode, pending->relnode.relNode);
+			}
+		}
+	}
+
+	hash_destroy(pendingSyncs);
+	pendingSyncs = NULL;
+}
+
 /*
  *	PostPrepare_smgr -- Clean up after a successful PREPARE
  *
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index b5af2be..8aa7e7b 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2372,8 +2372,7 @@ CopyFrom(CopyState cstate)
 	 *	- data is being written to relfilenode created in this transaction
 	 * then we can skip writing WAL.  It's safe because if the transaction
 	 * doesn't commit, we'll discard the table (or the new relfilenode file).
-	 * If it does commit, we'll have done the heap_sync at the bottom of this
-	 * routine first.
+	 * If it does commit, commit will do heap_sync().
 	 *
 	 * As mentioned in comments in utils/rel.h, the in-same-transaction test
 	 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
@@ -2405,7 +2404,7 @@ CopyFrom(CopyState cstate)
 	{
 		hi_options |= HEAP_INSERT_SKIP_FSM;
 		if (!XLogIsNeeded())
-			hi_options |= HEAP_INSERT_SKIP_WAL;
+			heap_register_sync(cstate->rel);
 	}
 
 	/*
@@ -2782,11 +2781,11 @@ CopyFrom(CopyState cstate)
 	FreeExecutorState(estate);
 
 	/*
-	 * If we skipped writing WAL, then we need to sync the heap (but not
-	 * indexes since those use WAL anyway)
+	 * If we skipped writing WAL, then we will sync the heap at the end of
+	 * the transaction. (We used to do it here, but it was later found out
+	 * that to be safe, we must also avoid WAL-logging any subsequent
+	 * actions on the pages we skipped WAL for). Indexes always use WAL.
 	 */
-	if (hi_options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(cstate->rel);
 
 	return processed;
 }
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 06425cc..408495e 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -567,8 +567,9 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * We can skip WAL-logging the insertions, unless PITR or streaming
 	 * replication is in use. We can skip the FSM in any case.
 	 */
-	myState->hi_options = HEAP_INSERT_SKIP_FSM |
-		(XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
+	if (!XLogIsNeeded())
+		heap_register_sync(intoRelationDesc);
+	myState->hi_options = HEAP_INSERT_SKIP_FSM;
 	myState->bistate = GetBulkInsertState();
 
 	/* Not using WAL requires smgr_targblock be initially invalid */
@@ -617,9 +618,7 @@ intorel_shutdown(DestReceiver *self)
 
 	FreeBulkInsertState(myState->bistate);
 
-	/* If we skipped using WAL, must heap_sync before commit */
-	if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(myState->rel);
+	/* If we skipped using WAL, we will sync the relation at commit */
 
 	/* close rel, but keep lock until commit */
 	heap_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 9ffd91e..8b127e3 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -477,7 +477,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
 	if (!XLogIsNeeded())
-		myState->hi_options |= HEAP_INSERT_SKIP_WAL;
+		heap_register_sync(transientrel);
 	myState->bistate = GetBulkInsertState();
 
 	/* Not using WAL requires smgr_targblock be initially invalid */
@@ -520,9 +520,7 @@ transientrel_shutdown(DestReceiver *self)
 
 	FreeBulkInsertState(myState->bistate);
 
-	/* If we skipped using WAL, must heap_sync before commit */
-	if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(myState->transientrel);
+	/* If we skipped using WAL, we will sync the relation at commit */
 
 	/* close transientrel, but keep lock until commit */
 	heap_close(myState->transientrel, NoLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index abb262b..2fd210b 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4327,8 +4327,9 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		bistate = GetBulkInsertState();
 
 		hi_options = HEAP_INSERT_SKIP_FSM;
+
 		if (!XLogIsNeeded())
-			hi_options |= HEAP_INSERT_SKIP_WAL;
+			heap_register_sync(newrel);
 	}
 	else
 	{
@@ -4589,8 +4590,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		FreeBulkInsertState(bistate);
 
 		/* If we skipped writing WAL, then we need to sync the heap. */
-		if (hi_options & HEAP_INSERT_SKIP_WAL)
-			heap_sync(newrel);
 
 		heap_close(newrel, NoLock);
 	}
@@ -10510,11 +10509,12 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode)
 
 	/*
 	 * Create and copy all forks of the relation, and schedule unlinking of
-	 * old physical files.
+	 * old physical files. Pending syncs for the old node is no longer needed.
 	 *
 	 * NOTE: any conflict in relfilenode value will be caught in
 	 * RelationCreateStorage().
 	 */
+	RelationRemovePendingSync(rel);
 	RelationCreateStorage(newrnode, rel->rd_rel->relpersistence);
 
 	/* copy main fork */
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 5b43a66..f3dcf6e 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -893,7 +893,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 				 * page has been previously WAL-logged, and if not, do that
 				 * now.
 				 */
-				if (RelationNeedsWAL(onerel) &&
+				if (BufferNeedsWAL(onerel, buf) &&
 					PageGetLSN(page) == InvalidXLogRecPtr)
 					log_newpage_buffer(buf, true);
 
@@ -1120,7 +1120,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			}
 
 			/* Now WAL-log freezing if necessary */
-			if (RelationNeedsWAL(onerel))
+			if (BufferNeedsWAL(onerel, buf))
 			{
 				XLogRecPtr	recptr;
 
@@ -1480,7 +1480,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(onerel))
+	if (BufferNeedsWAL(onerel, buffer))
 	{
 		XLogRecPtr	recptr;
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 2109cbf..e991e9f 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -451,6 +451,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 			BufferAccessStrategy strategy,
 			bool *foundPtr);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
+static void FlushRelationBuffers_common(SMgrRelation smgr, bool islocal);
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int	rnode_comparator(const void *p1, const void *p2);
@@ -3147,20 +3148,41 @@ PrintPinnedBufs(void)
 void
 FlushRelationBuffers(Relation rel)
 {
-	int			i;
-	BufferDesc *bufHdr;
-
 	/* Open rel at the smgr level if not already done */
 	RelationOpenSmgr(rel);
 
-	if (RelationUsesLocalBuffers(rel))
+	FlushRelationBuffers_common(rel->rd_smgr, RelationUsesLocalBuffers(rel));
+}
+
+/*
+ * Like FlushRelationBuffers(), but the relation is specified by a
+ * RelFileNode
+ */
+void
+FlushRelationBuffersWithoutRelCache(RelFileNode rnode, bool islocal)
+{
+	FlushRelationBuffers_common(smgropen(rnode, InvalidBackendId), islocal);
+}
+
+/*
+ * Code shared between functions FlushRelationBuffers() and
+ * FlushRelationBuffersWithoutRelCache().
+ */
+static void
+FlushRelationBuffers_common(SMgrRelation smgr, bool islocal)
+{
+	RelFileNode rnode = smgr->smgr_rnode.node;
+	int			i;
+	BufferDesc *bufHdr;
+
+	if (islocal)
 	{
 		for (i = 0; i < NLocBuffer; i++)
 		{
 			uint32		buf_state;
 
 			bufHdr = GetLocalBufferDescriptor(i);
-			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+			if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 				((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
 				 (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 			{
@@ -3177,7 +3199,7 @@ FlushRelationBuffers(Relation rel)
 
 				PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
 
-				smgrwrite(rel->rd_smgr,
+				smgrwrite(smgr,
 						  bufHdr->tag.forkNum,
 						  bufHdr->tag.blockNum,
 						  localpage,
@@ -3207,18 +3229,18 @@ FlushRelationBuffers(Relation rel)
 		 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
 		 * and saves some cycles.
 		 */
-		if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode))
 			continue;
 
 		ReservePrivateRefCountEntry();
 
 		buf_state = LockBufHdr(bufHdr);
-		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+		if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 			(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-			FlushBuffer(bufHdr, rel->rd_smgr);
+			FlushBuffer(bufHdr, smgr);
 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 			UnpinBuffer(bufHdr, true);
 		}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index ddb9485..b6b0d78 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -72,6 +72,7 @@
 #include "optimizer/var.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rowsecurity.h"
+#include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 #include "utils/array.h"
@@ -418,6 +419,10 @@ AllocateRelationDesc(Form_pg_class relp)
 	/* which we mark as a reference-counted tupdesc */
 	relation->rd_att->tdrefcount = 1;
 
+	/* We don't know if pending sync for this relation exists so far */
+	relation->pending_sync = NULL;
+	relation->no_pending_sync = false;
+
 	MemoryContextSwitchTo(oldcxt);
 
 	return relation;
@@ -2032,6 +2037,10 @@ formrdesc(const char *relationName, Oid relationReltype,
 		relation->rd_rel->relhasindex = true;
 	}
 
+	/* We don't know if pending sync for this relation exists so far */
+	relation->pending_sync = NULL;
+	relation->no_pending_sync = false;
+
 	/*
 	 * add new reldesc to relcache
 	 */
@@ -3353,6 +3362,10 @@ RelationBuildLocalRelation(const char *relname,
 	else
 		rel->rd_rel->relfilenode = relfilenode;
 
+	/* newly built relation has no pending sync */
+	rel->no_pending_sync = true;
+	rel->pending_sync = NULL;
+
 	RelationInitLockInfo(rel);	/* see lmgr.c */
 
 	RelationInitPhysicalAddr(rel);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 7e85510..3967641 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,10 +25,9 @@
 
 
 /* "options" flag bits for heap_insert */
-#define HEAP_INSERT_SKIP_WAL	0x0001
-#define HEAP_INSERT_SKIP_FSM	0x0002
-#define HEAP_INSERT_FROZEN		0x0004
-#define HEAP_INSERT_SPECULATIVE 0x0008
+#define HEAP_INSERT_SKIP_FSM	0x0001
+#define HEAP_INSERT_FROZEN		0x0002
+#define HEAP_INSERT_SPECULATIVE 0x0004
 
 typedef struct BulkInsertStateData *BulkInsertState;
 
@@ -178,6 +177,7 @@ extern void simple_heap_delete(Relation relation, ItemPointer tid);
 extern void simple_heap_update(Relation relation, ItemPointer otid,
 				   HeapTuple tup);
 
+extern void heap_register_sync(Relation relation);
 extern void heap_sync(Relation relation);
 extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
 
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index fea96de..b9d485a 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -22,13 +22,16 @@ extern void RelationCreateStorage(RelFileNode rnode, char relpersistence);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationTruncate(Relation rel, BlockNumber nblocks);
-
+extern void RelationRemovePendingSync(Relation rel);
 /*
  * These functions used to be in storage/smgr/smgr.c, which explains the
  * naming
  */
 extern void smgrDoPendingDeletes(bool isCommit);
 extern int	smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
+extern void smgrDoPendingSyncs(bool isCommit);
+extern void RecordPendingSync(Relation rel);
+bool BufferNeedsWAL(Relation rel, Buffer buf);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 07a32d6..6ec2d26 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -190,6 +190,8 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
 								ForkNumber forkNum);
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
+extern void FlushRelationBuffersWithoutRelCache(RelFileNode rnode,
+									bool islocal);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode,
 					   ForkNumber forkNum, BlockNumber firstDelBlock);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index ab875bb..666273e 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -216,6 +216,14 @@ typedef struct RelationData
 
 	/* use "struct" here to avoid needing to include pgstat.h: */
 	struct PgStat_TableStatus *pgstat_info;		/* statistics collection area */
+
+	/*
+	 * no_pending_sync is true if this relation is kown not to have pending
+	 * syncs.  Elsewise searching for registered sync is required if
+	 * pending_sync is NULL.
+	 */
+	bool				   no_pending_sync;
+	struct PendingRelSync *pending_sync;
 } RelationData;
 
 
diff --git a/src/test/recovery/t/012_truncate_opt.pl b/src/test/recovery/t/012_truncate_opt.pl
new file mode 100644
index 0000000..baf5604
--- /dev/null
+++ b/src/test/recovery/t/012_truncate_opt.pl
@@ -0,0 +1,94 @@
+# Set of tests to check TRUNCATE optimizations with CREATE TABLE
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+my $node = get_new_node('master');
+$node->init;
+
+my $copy_file = $node->backup_dir . "copy_data.txt";
+
+$node->append_conf('postgresql.conf', qq{
+fsync = on
+wal_level = minimal
+});
+
+$node->start;
+
+# Create file containing data to COPY
+TestLib::append_to_file($copy_file, qq{copied row 1
+copied row 2
+copied row 3
+});
+
+# CREATE, INSERT, COPY, crash.
+#
+# If COPY inserts to the existing block, and is not WAL-logged, replaying
+# the implicit FPW of the INSERT record will destroy the COPY data.
+$node->psql('postgres', qq{
+BEGIN;
+CREATE TABLE test1(t text PRIMARY KEY);
+INSERT INTO test1 VALUES ('inserted row');
+COPY test1 FROM '$copy_file';
+COMMIT;
+});
+# Enforce recovery and check the state of table. There should be 4 rows.
+$node->stop('immediate');
+$node->start;
+my $ret = $node->safe_psql('postgres', 'SELECT count(*) FROM test1');
+is($ret, '4', 'SELECT reports 4 rows');
+# Clean up
+$node->safe_psql('postgres', 'DROP TABLE test1;');
+
+# CREATE, COPY, crash. Trigger in COPY that inserts more to same table.
+#
+# If the INSERTS from the trigger go to the same block we're copying to,
+# and the INSERTs are WAL-logged, WAL replay will fail when it tries to
+# replay the WAL record but the "before" image doesn't match, because not
+# all changes were WAL-logged.
+$node->psql('postgres', qq{
+BEGIN;
+CREATE TABLE test1(t text PRIMARY KEY);
+CREATE FUNCTION test1_beforetrig() RETURNS trigger LANGUAGE plpgsql as \$\$
+  BEGIN
+  IF new.t NOT LIKE 'triggered%' THEN
+    INSERT INTO test1 VALUES ('triggered ' || NEW.t);
+  END IF;
+  RETURN NEW;
+END;
+\$\$;
+CREATE TRIGGER test1_beforeinsert BEFORE INSERT ON test1
+FOR EACH ROW EXECUTE PROCEDURE test1_beforetrig();
+COPY test1 FROM '$copy_file';
+COMMIT;
+});
+# Enforce recovery and check the state of table. There should be 6
+# rows here.
+$node->stop('immediate');
+$node->start;
+$ret = $node->safe_psql('postgres', 'SELECT count(*) FROM test1');
+is($ret, '6', 'SELECT returns 6 rows');
+# Clean up
+$node->safe_psql('postgres', 'DROP TABLE test1;');
+$node->safe_psql('postgres', 'DROP FUNCTION test1_beforetrig();');
+
+# CREATE, TRUNCATE, COPY, crash.
+#
+# If we skip WAL-logging of the COPY, replaying the TRUNCATE record destroys
+# the newly inserted data.
+$node->psql('postgres', qq{
+BEGIN;
+CREATE TABLE test1(t text PRIMARY KEY);
+TRUNCATE test1;
+COPY test1 FROM '$copy_file';
+COMMIT;
+});
+# Enforce recovery and check the state of table. There should be 3
+# rows here.
+$node->stop('immediate');
+$node->start;
+$ret = $node->safe_psql('postgres', 'SELECT count(*) FROM test1');
+is($ret, '3', 'SELECT returns 3 rows');
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to