At Tue, 11 Apr 2017 09:56:06 +0900 (Tokyo Standard Time), Kyotaro HORIGUCHI 
<horiguchi.kyot...@lab.ntt.co.jp> wrote in 
<20170411.095606.245908357.horiguchi.kyot...@lab.ntt.co.jp>
> Hello, thank you for looking this.
> 
> At Fri, 07 Apr 2017 20:38:35 -0400, Tom Lane <t...@sss.pgh.pa.us> wrote in 
> <27309.1491611...@sss.pgh.pa.us>
> > Alvaro Herrera <alvhe...@2ndquadrant.com> writes:
> > > Interesting.  I wonder if it's possible that a relcache invalidation
> > > would cause these values to get lost for some reason, because that would
> > > be dangerous.
> > 
> > > I suppose the rationale is that this shouldn't happen because any
> > > operation that does things this way must hold an exclusive lock on the
> > > relation.  But that doesn't guarantee that the relcache entry is
> > > completely stable,
> > 
> > It ABSOLUTELY is not safe.  Relcache flushes can happen regardless of
> > how strong a lock you hold.
> > 
> >                     regards, tom lane
> 
> Ugh. Yes, relcache invalidation happens anytime and it resets the
> added values. pg_stat_info deceived me that it can store
> transient values. But I  came up with another thought.
> 
> The reason I proposed it was I thought that hash_search for every
> buffer is not good. Instead, like pg_stat_info, we can link the

buffer => buffer modification

> pending-sync hash entry to Relation. This greately reduces the
> frequency of hash-searching.
> 
> I'll post new patch in this way soon.

Here it is.

- Relation has new members no_pending_sync and pending_sync that
  works as instant cache of an entry in pendingSync hash.

- Commit-time synchronizing is restored as Michael's patch.

- If relfilenode is replaced, pending_sync for the old node is
  removed. Anyway this is ignored on abort and meaningless on
  commit.

- TAP test is renamed to 012 since some new files have been added.

Accessing pending sync hash occured on every calling of
HeapNeedsWAL() (per insertion/update/freeze of a tuple) if any of
accessing relations has pending sync.  Almost of them are
eliminated as the result.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0c3e2b0..aa1b97d 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -34,6 +34,28 @@
  *	  the POSTGRES heap access method used for all POSTGRES
  *	  relations.
  *
+ * WAL CONSIDERATIONS
+ *	  All heap operations are normally WAL-logged. but there are a few
+ *	  exceptions. Temporary and unlogged relations never need to be
+ *	  WAL-logged, but we can also skip WAL-logging for a table that was
+ *	  created in the same transaction, if we don't need WAL for PITR or
+ *	  WAL archival purposes (i.e. if wal_level=minimal), and we fsync()
+ *	  the file to disk at COMMIT instead.
+ *
+ *	  The same-relation optimization is not employed automatically on all
+ *	  updates to a table that was created in the same transacton, because
+ *	  for a small number of changes, it's cheaper to just create the WAL
+ *	  records than fsyncing() the whole relation at COMMIT. It is only
+ *	  worthwhile for (presumably) large operations like COPY, CLUSTER,
+ *	  or VACUUM FULL. Use heap_register_sync() to initiate such an
+ *	  operation; it will cause any subsequent updates to the table to skip
+ *	  WAL-logging, if possible, and cause the heap to be synced to disk at
+ *	  COMMIT.
+ *
+ *	  To make that work, all modifications to heap must use
+ *	  HeapNeedsWAL() to check if WAL-logging is needed in this transaction
+ *	  for the given block.
+ *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -56,6 +78,7 @@
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
 #include "catalog/namespace.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -2356,12 +2379,6 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
  * The new tuple is stamped with current transaction ID and the specified
  * command ID.
  *
- * If the HEAP_INSERT_SKIP_WAL option is specified, the new tuple is not
- * logged in WAL, even for a non-temp relation.  Safe usage of this behavior
- * requires that we arrange that all new tuples go into new pages not
- * containing any tuples from other transactions, and that the relation gets
- * fsync'd before commit.  (See also heap_sync() comments)
- *
  * The HEAP_INSERT_SKIP_FSM option is passed directly to
  * RelationGetBufferForTuple, which see for more info.
  *
@@ -2465,7 +2482,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_insert xlrec;
 		xl_heap_header xlhdr;
@@ -2664,12 +2681,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	int			ndone;
 	char	   *scratch = NULL;
 	Page		page;
-	bool		needwal;
 	Size		saveFreeSpace;
 	bool		need_tuple_data = RelationIsLogicallyLogged(relation);
 	bool		need_cids = RelationIsAccessibleInLogicalDecoding(relation);
 
-	needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
 	saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
 												   HEAP_DEFAULT_FILLFACTOR);
 
@@ -2684,7 +2699,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 	 * palloc() within a critical section is not safe, so we allocate this
 	 * beforehand.
 	 */
-	if (needwal)
+	if (RelationNeedsWAL(relation))
 		scratch = palloc(BLCKSZ);
 
 	/*
@@ -2719,6 +2734,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 		Buffer		vmbuffer = InvalidBuffer;
 		bool		all_visible_cleared = false;
 		int			nthispage;
+		bool		needwal;
 
 		CHECK_FOR_INTERRUPTS();
 
@@ -2730,6 +2746,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 										   InvalidBuffer, options, bistate,
 										   &vmbuffer, NULL);
 		page = BufferGetPage(buffer);
+		needwal = BufferNeedsWAL(relation, buffer);
 
 		/* NO EREPORT(ERROR) from here till changes are logged */
 		START_CRIT_SECTION();
@@ -3286,7 +3303,7 @@ l1:
 	 * NB: heap_abort_speculative() uses the same xlog record and replay
 	 * routines.
 	 */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
@@ -4250,7 +4267,8 @@ l2:
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer) ||
+		BufferNeedsWAL(relation, newbuf))
 	{
 		XLogRecPtr	recptr;
 
@@ -5141,7 +5159,7 @@ failed:
 	 * (Also, in a PITR log-shipping or 2PC environment, we have to have XLOG
 	 * entries for everything anyway.)
 	 */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, *buffer))
 	{
 		xl_heap_lock xlrec;
 		XLogRecPtr	recptr;
@@ -5843,7 +5861,7 @@ l4:
 		MarkBufferDirty(buf);
 
 		/* XLOG stuff */
-		if (RelationNeedsWAL(rel))
+		if (BufferNeedsWAL(rel, buf))
 		{
 			xl_heap_lock_updated xlrec;
 			XLogRecPtr	recptr;
@@ -5998,7 +6016,7 @@ heap_finish_speculative(Relation relation, HeapTuple tuple)
 	htup->t_ctid = tuple->t_self;
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_confirm xlrec;
 		XLogRecPtr	recptr;
@@ -6131,7 +6149,7 @@ heap_abort_speculative(Relation relation, HeapTuple tuple)
 	 * The WAL records generated here match heap_delete().  The same recovery
 	 * routines are used.
 	 */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_delete xlrec;
 		XLogRecPtr	recptr;
@@ -6240,7 +6258,7 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(relation))
+	if (BufferNeedsWAL(relation, buffer))
 	{
 		xl_heap_inplace xlrec;
 		XLogRecPtr	recptr;
@@ -7354,7 +7372,7 @@ log_heap_clean(Relation reln, Buffer buffer,
 	XLogRecPtr	recptr;
 
 	/* Caller should not call me on a non-WAL-logged relation */
-	Assert(RelationNeedsWAL(reln));
+	Assert(BufferNeedsWAL(reln, buffer));
 
 	xlrec.latestRemovedXid = latestRemovedXid;
 	xlrec.nredirected = nredirected;
@@ -7402,7 +7420,7 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
 	XLogRecPtr	recptr;
 
 	/* Caller should not call me on a non-WAL-logged relation */
-	Assert(RelationNeedsWAL(reln));
+	Assert(BufferNeedsWAL(reln, buffer));
 	/* nor when there are no tuples to freeze */
 	Assert(ntuples > 0);
 
@@ -7487,7 +7505,7 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	int			bufflags;
 
 	/* Caller should not call me on a non-WAL-logged relation */
-	Assert(RelationNeedsWAL(reln));
+	Assert(BufferNeedsWAL(reln, newbuf) || BufferNeedsWAL(reln, oldbuf));
 
 	XLogBeginInsert();
 
@@ -7590,76 +7608,86 @@ log_heap_update(Relation reln, Buffer oldbuf,
 	xlrec.new_offnum = ItemPointerGetOffsetNumber(&newtup->t_self);
 	xlrec.new_xmax = HeapTupleHeaderGetRawXmax(newtup->t_data);
 
+	XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
+
 	bufflags = REGBUF_STANDARD;
 	if (init)
 		bufflags |= REGBUF_WILL_INIT;
 	if (need_tuple_data)
 		bufflags |= REGBUF_KEEP_DATA;
 
-	XLogRegisterBuffer(0, newbuf, bufflags);
-	if (oldbuf != newbuf)
-		XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD);
-
-	XLogRegisterData((char *) &xlrec, SizeOfHeapUpdate);
-
 	/*
 	 * Prepare WAL data for the new tuple.
 	 */
-	if (prefixlen > 0 || suffixlen > 0)
+	if (BufferNeedsWAL(reln, newbuf))
 	{
-		if (prefixlen > 0 && suffixlen > 0)
-		{
-			prefix_suffix[0] = prefixlen;
-			prefix_suffix[1] = suffixlen;
-			XLogRegisterBufData(0, (char *) &prefix_suffix, sizeof(uint16) * 2);
-		}
-		else if (prefixlen > 0)
-		{
-			XLogRegisterBufData(0, (char *) &prefixlen, sizeof(uint16));
-		}
-		else
+		XLogRegisterBuffer(0, newbuf, bufflags);
+
+		if ((prefixlen > 0 || suffixlen > 0))
 		{
-			XLogRegisterBufData(0, (char *) &suffixlen, sizeof(uint16));
+			if (prefixlen > 0 && suffixlen > 0)
+			{
+				prefix_suffix[0] = prefixlen;
+				prefix_suffix[1] = suffixlen;
+				XLogRegisterBufData(0, (char *) &prefix_suffix,
+									sizeof(uint16) * 2);
+			}
+			else if (prefixlen > 0)
+			{
+				XLogRegisterBufData(0, (char *) &prefixlen, sizeof(uint16));
+			}
+			else
+			{
+				XLogRegisterBufData(0, (char *) &suffixlen, sizeof(uint16));
+			}
 		}
-	}
 
-	xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
-	xlhdr.t_infomask = newtup->t_data->t_infomask;
-	xlhdr.t_hoff = newtup->t_data->t_hoff;
-	Assert(SizeofHeapTupleHeader + prefixlen + suffixlen <= newtup->t_len);
+		xlhdr.t_infomask2 = newtup->t_data->t_infomask2;
+		xlhdr.t_infomask = newtup->t_data->t_infomask;
+		xlhdr.t_hoff = newtup->t_data->t_hoff;
+		Assert(SizeofHeapTupleHeader + prefixlen + suffixlen <= newtup->t_len);
 
-	/*
-	 * PG73FORMAT: write bitmap [+ padding] [+ oid] + data
-	 *
-	 * The 'data' doesn't include the common prefix or suffix.
-	 */
-	XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
-	if (prefixlen == 0)
-	{
-		XLogRegisterBufData(0,
-							((char *) newtup->t_data) + SizeofHeapTupleHeader,
-						  newtup->t_len - SizeofHeapTupleHeader - suffixlen);
-	}
-	else
-	{
 		/*
-		 * Have to write the null bitmap and data after the common prefix as
-		 * two separate rdata entries.
+		 * PG73FORMAT: write bitmap [+ padding] [+ oid] + data
+		 *
+		 * The 'data' doesn't include the common prefix or suffix.
 		 */
-		/* bitmap [+ padding] [+ oid] */
-		if (newtup->t_data->t_hoff - SizeofHeapTupleHeader > 0)
+		XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
+		if (prefixlen == 0)
 		{
 			XLogRegisterBufData(0,
 						   ((char *) newtup->t_data) + SizeofHeapTupleHeader,
-							 newtup->t_data->t_hoff - SizeofHeapTupleHeader);
+						  newtup->t_len - SizeofHeapTupleHeader - suffixlen);
 		}
+		else
+		{
+			/*
+			 * Have to write the null bitmap and data after the common prefix
+			 * as two separate rdata entries.
+			 */
+			/* bitmap [+ padding] [+ oid] */
+			if (newtup->t_data->t_hoff - SizeofHeapTupleHeader > 0)
+			{
+				XLogRegisterBufData(0,
+						   ((char *) newtup->t_data) + SizeofHeapTupleHeader,
+							 newtup->t_data->t_hoff - SizeofHeapTupleHeader);
+			}
 
-		/* data after common prefix */
-		XLogRegisterBufData(0,
+			/* data after common prefix */
+			XLogRegisterBufData(0,
 			  ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen,
 			 newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen);
+		}
 	}
 
+	/*
+	 * If the old and new tuple are on different pages, also register the old
+	 * page, so that a full-page image is created for it if necessary. We
+	 * don't need any extra information to replay changes to it.
+	 */
+	if (oldbuf != newbuf && BufferNeedsWAL(reln, oldbuf))
+		XLogRegisterBuffer(1, oldbuf, REGBUF_STANDARD);
+
 	/* We need to log a tuple identity */
 	if (need_tuple_data && old_key_tuple)
 	{
@@ -8578,8 +8606,13 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
 	 */
 
 	/* Deal with old tuple version */
-	oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1,
-									  &obuffer);
+	if (oldblk == newblk)
+		oldaction = XLogReadBufferForRedo(record, 0, &obuffer);
+	else if (XLogRecHasBlockRef(record, 1))
+		oldaction = XLogReadBufferForRedo(record, 1, &obuffer);
+	else
+		oldaction = BLK_DONE;
+
 	if (oldaction == BLK_NEEDS_REDO)
 	{
 		page = BufferGetPage(obuffer);
@@ -8633,6 +8666,8 @@ heap_xlog_update(XLogReaderState *record, bool hot_update)
 		PageInit(page, BufferGetPageSize(nbuffer), 0);
 		newaction = BLK_NEEDS_REDO;
 	}
+	else if (!XLogRecHasBlockRef(record, 0))
+		newaction = BLK_DONE;
 	else
 		newaction = XLogReadBufferForRedo(record, 0, &nbuffer);
 
@@ -9069,9 +9104,16 @@ heap2_redo(XLogReaderState *record)
  *	heap_sync		- sync a heap, for use when no WAL has been written
  *
  * This forces the heap contents (including TOAST heap if any) down to disk.
- * If we skipped using WAL, and WAL is otherwise needed, we must force the
- * relation down to disk before it's safe to commit the transaction.  This
- * requires writing out any dirty buffers and then doing a forced fsync.
+ * If we did any changes to the heap bypassing the buffer manager, we must
+ * force the relation down to disk before it's safe to commit the
+ * transaction, because the direct modifications will not be flushed by
+ * the next checkpoint.
+ *
+ * We used to also use this after batch operations like COPY and CLUSTER,
+ * if we skipped using WAL and WAL is otherwise needed, but there were
+ * corner-cases involving other WAL-logged operations to the same
+ * relation, where that was not enough. heap_register_sync() should be
+ * used for that purpose instead.
  *
  * Indexes are not touched.  (Currently, index operations associated with
  * the commands that use this are WAL-logged and so do not need fsync.
@@ -9181,3 +9223,33 @@ heap_mask(char *pagedata, BlockNumber blkno)
 		}
 	}
 }
+
+/*
+ *	heap_register_sync	- register a heap to be synced to disk at commit
+ *
+ * This can be used to skip WAL-logging changes on a relation file that has
+ * been created in the same transaction. This makes note of the current size of
+ * the relation, and ensures that when the relation is extended, any changes
+ * to the new blocks in the heap, in the same transaction, will not be
+ * WAL-logged. Instead, the heap contents are flushed to disk at commit,
+ * like heap_sync() does.
+ *
+ * This does the same for the TOAST heap, if any. Indexes are not affected.
+ */
+void
+heap_register_sync(Relation rel)
+{
+	/* non-WAL-logged tables never need fsync */
+	if (!RelationNeedsWAL(rel))
+		return;
+
+	RecordPendingSync(rel);
+	if (OidIsValid(rel->rd_rel->reltoastrelid))
+	{
+		Relation	toastrel;
+
+		toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock);
+		RecordPendingSync(toastrel);
+		heap_close(toastrel, AccessShareLock);
+	}
+}
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index d69a266..4754278 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -20,6 +20,7 @@
 #include "access/htup_details.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -260,7 +261,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 		/*
 		 * Emit a WAL HEAP_CLEAN record showing what we did
 		 */
-		if (RelationNeedsWAL(relation))
+		if (BufferNeedsWAL(relation, buffer))
 		{
 			XLogRecPtr	recptr;
 
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index d7f65a5..6462f44 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -649,9 +649,7 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 	}
 	else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
 		heaptup = toast_insert_or_update(state->rs_new_rel, tup, NULL,
-										 HEAP_INSERT_SKIP_FSM |
-										 (state->rs_use_wal ?
-										  0 : HEAP_INSERT_SKIP_WAL));
+										 HEAP_INSERT_SKIP_FSM);
 	else
 		heaptup = tup;
 
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index e5616ce..933fa9c 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -88,6 +88,7 @@
 #include "access/heapam_xlog.h"
 #include "access/visibilitymap.h"
 #include "access/xlog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -307,7 +308,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
 		map[mapByte] |= (flags << mapOffset);
 		MarkBufferDirty(vmBuf);
 
-		if (RelationNeedsWAL(rel))
+		if (BufferNeedsWAL(rel, heapBuf))
 		{
 			if (XLogRecPtrIsInvalid(recptr))
 			{
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 92b263a..361b50d 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2007,6 +2007,9 @@ CommitTransaction(void)
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
+	/* Flush updates to relations that we didn't WAL-logged */
+	smgrDoPendingSyncs(true);
+
 	/*
 	 * Mark serializable transaction as complete for predicate locking
 	 * purposes.  This should be done as late as we can put it and still allow
@@ -2238,6 +2241,9 @@ PrepareTransaction(void)
 	/* close large objects before lower-level cleanup */
 	AtEOXact_LargeObject(true);
 
+	/* Flush updates to relations that we didn't WAL-logged */
+	smgrDoPendingSyncs(true);
+
 	/*
 	 * Mark serializable transaction as complete for predicate locking
 	 * purposes.  This should be done as late as we can put it and still allow
@@ -2545,6 +2551,7 @@ AbortTransaction(void)
 	AtAbort_Notify();
 	AtEOXact_RelationMap(false);
 	AtAbort_Twophase();
+	smgrDoPendingSyncs(false);	/* abandon pending syncs */
 
 	/*
 	 * Advertise the fact that we aborted in pg_xact (assuming that we got as
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index f677916..1234325 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -29,6 +29,7 @@
 #include "catalog/storage_xlog.h"
 #include "storage/freespace.h"
 #include "storage/smgr.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -64,6 +65,49 @@ typedef struct PendingRelDelete
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
 
 /*
+ * We also track relation files (RelFileNode values) that have been created
+ * in the same transaction, and that have been modified without WAL-logging
+ * the action (an optimization possible with wal_level=minimal). When we are
+ * about to skip WAL-logging, a PendingRelSync entry is created, and
+ * 'sync_above' is set to the current size of the relation. Any operations
+ * on blocks < sync_above need to be WAL-logged as usual, but for operations
+ * on higher blocks, WAL-logging is skipped.
+ *
+ * NB: after WAL-logging has been skipped for a block, we must not WAL-log
+ * any subsequent actions on the same block either. Replaying the WAL record
+ * of the subsequent action might fail otherwise, as the "before" state of
+ * the block might not match, as the earlier actions were not WAL-logged.
+ * Likewise, after we have WAL-logged an operation for a block, we must
+ * WAL-log any subsequent operations on the same page as well. Replaying
+ * a possible full-page-image from the earlier WAL record would otherwise
+ * revert the page to the old state, even if we sync the relation at end
+ * of transaction.
+ *
+ * If a relation is truncated (without creating a new relfilenode), and we
+ * emit a WAL record of the truncation, we can't skip WAL-logging for any
+ * of the truncated blocks anymore, as replaying the truncation record will
+ * destroy all the data inserted after that. But if we have already decided
+ * to skip WAL-logging changes to a relation, and the relation is truncated,
+ * we don't need to WAL-log the truncation either.
+ *
+ * This mechanism is currently only used by heaps. Indexes are always
+ * WAL-logged. Also, this only applies for wal_level=minimal; with higher
+ * WAL levels we need the WAL for PITR/replication anyway.
+ */
+typedef struct PendingRelSync
+{
+	RelFileNode relnode;		/* relation created in same xact */
+	BlockNumber sync_above;		/* WAL-logging skipped for blocks >=
+								 * sync_above */
+	BlockNumber truncated_to;	/* truncation WAL record was written */
+}	PendingRelSync;
+
+/* Relations that need to be fsync'd at commit */
+static HTAB *pendingSyncs = NULL;
+
+static void createPendingSyncsHash(void);
+
+/*
  * RelationCreateStorage
  *		Create physical storage for a relation.
  *
@@ -116,6 +160,14 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence)
 	pending->nestLevel = GetCurrentTransactionNestLevel();
 	pending->next = pendingDeletes;
 	pendingDeletes = pending;
+
+	/* pending sync on this file is no longer needed */
+	if (pendingSyncs)
+	{
+		bool found;
+
+		hash_search(pendingSyncs, (void *) &rnode, HASH_REMOVE, &found);
+	}
 }
 
 /*
@@ -226,6 +278,8 @@ RelationPreserveStorage(RelFileNode rnode, bool atCommit)
 void
 RelationTruncate(Relation rel, BlockNumber nblocks)
 {
+	PendingRelSync *pending = NULL;
+	bool		found;
 	bool		fsm;
 	bool		vm;
 
@@ -260,37 +314,78 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
 	 */
 	if (RelationNeedsWAL(rel))
 	{
-		/*
-		 * Make an XLOG entry reporting the file truncation.
-		 */
-		XLogRecPtr	lsn;
-		xl_smgr_truncate xlrec;
-
-		xlrec.blkno = nblocks;
-		xlrec.rnode = rel->rd_node;
-		xlrec.flags = SMGR_TRUNCATE_ALL;
-
-		XLogBeginInsert();
-		XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+		if (!rel->pending_sync)
+		{
+			if (!pendingSyncs)
+				createPendingSyncsHash();
+			pending = (PendingRelSync *) hash_search(pendingSyncs,
+												 (void *) &rel->rd_node,
+												 HASH_ENTER, &found);
+			if (!found)
+			{
+				pending->sync_above = InvalidBlockNumber;
+				pending->truncated_to = InvalidBlockNumber;
+			}
 
-		lsn = XLogInsert(RM_SMGR_ID,
-						 XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+			rel->pending_sync = pending;
+		}
 
-		/*
-		 * Flush, because otherwise the truncation of the main relation might
-		 * hit the disk before the WAL record, and the truncation of the FSM
-		 * or visibility map. If we crashed during that window, we'd be left
-		 * with a truncated heap, but the FSM or visibility map would still
-		 * contain entries for the non-existent heap pages.
-		 */
-		if (fsm || vm)
-			XLogFlush(lsn);
+		if (rel->pending_sync->sync_above == InvalidBlockNumber ||
+			rel->pending_sync->sync_above < nblocks)
+		{
+			/*
+			 * Make an XLOG entry reporting the file truncation.
+			 */
+			XLogRecPtr		lsn;
+			xl_smgr_truncate xlrec;
+
+			xlrec.blkno = nblocks;
+			xlrec.rnode = rel->rd_node;
+
+			XLogBeginInsert();
+			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+			lsn = XLogInsert(RM_SMGR_ID,
+							 XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+			elog(DEBUG2, "WAL-logged truncation of rel %u/%u/%u to %u blocks",
+				 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+				 nblocks);
+
+			/*
+			 * Flush, because otherwise the truncation of the main relation
+			 * might hit the disk before the WAL record, and the truncation of
+			 * the FSM or visibility map. If we crashed during that window,
+			 * we'd be left with a truncated heap, but the FSM or visibility
+			 * map would still contain entries for the non-existent heap
+			 * pages.
+			 */
+			if (fsm || vm)
+				XLogFlush(lsn);
+
+			rel->pending_sync->truncated_to = nblocks;
+		}
 	}
 
 	/* Do the real work */
 	smgrtruncate(rel->rd_smgr, MAIN_FORKNUM, nblocks);
 }
 
+/* create the hash table to track pending at-commit fsyncs */
+static void
+createPendingSyncsHash(void)
+{
+	/* First time through: initialize the hash table */
+	HASHCTL		ctl;
+
+	MemSet(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(RelFileNode);
+	ctl.entrysize = sizeof(PendingRelSync);
+	ctl.hash = tag_hash;
+	pendingSyncs = hash_create("pending relation sync table", 5,
+							   &ctl, HASH_ELEM | HASH_FUNCTION);
+}
+
 /*
  *	smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
  *
@@ -419,6 +514,156 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
 	return nrels;
 }
 
+
+/*
+ * Remember that the given relation needs to be sync'd at commit, because we
+ * are going to skip WAL-logging subsequent actions to it.
+ */
+void
+RecordPendingSync(Relation rel)
+{
+	bool found = true;
+	BlockNumber nblocks;
+
+	Assert(RelationNeedsWAL(rel));
+
+	if (!rel->pending_sync)
+	{
+		if (!pendingSyncs)
+			createPendingSyncsHash();
+
+		/* Look up or create an entry */
+		rel->pending_sync =
+			(PendingRelSync *) hash_search(pendingSyncs,
+										   (void *) &rel->rd_node,
+										   HASH_ENTER, &found);
+	}		
+
+	nblocks = RelationGetNumberOfBlocks(rel);
+	if (!found)
+	{
+		rel->pending_sync->truncated_to = InvalidBlockNumber;
+		rel->pending_sync->sync_above = nblocks;
+
+		elog(DEBUG2,
+			 "registering new pending sync for rel %u/%u/%u at block %u",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 nblocks);
+
+	}
+	else if (rel->pending_sync->sync_above == InvalidBlockNumber)
+	{
+		elog(DEBUG2, "registering pending sync for rel %u/%u/%u at block %u",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 nblocks);
+		rel->pending_sync->sync_above = nblocks;
+	}
+	else
+		elog(DEBUG2,
+			 "pending sync for rel %u/%u/%u was already registered at block %u (new %u)",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 rel->pending_sync->sync_above, nblocks);
+}
+
+/*
+ * Do changes to given heap page need to be WAL-logged?
+ *
+ * This takes into account any previous RecordPendingSync() requests.
+ *
+ * Note that it is required to check this before creating any WAL records for
+ * heap pages - it is not merely an optimization! WAL-logging a record, when
+ * we have already skipped a previous WAL record for the same page could lead
+ * to failure at WAL replay, as the "before" state expected by the record
+ * might not match what's on disk. Also, if the heap was truncated earlier, we
+ * must WAL-log any changes to the once-truncated blocks, because replaying
+ * the truncation record will destroy them.
+ */
+bool
+BufferNeedsWAL(Relation rel, Buffer buf)
+{
+	BlockNumber blkno = InvalidBlockNumber;
+
+	if (!RelationNeedsWAL(rel))
+		return false;
+
+	/* WAL is needed if no pending syncs */
+	if (!rel->pending_sync)
+	{
+		bool found = false;
+
+		if (!pendingSyncs)
+			return true;
+
+		rel->pending_sync =
+			(PendingRelSync *) hash_search(pendingSyncs,
+										   (void *) &rel->rd_node,
+										   HASH_FIND, &found);
+		if (!found)
+			return true;
+	}
+		
+	blkno = BufferGetBlockNumber(buf);
+	if (rel->pending_sync->sync_above == InvalidBlockNumber ||
+		rel->pending_sync->sync_above > blkno)
+	{
+		elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block %u, because sync_above is %u",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 blkno, rel->pending_sync->sync_above);
+		return true;
+	}
+
+	/*
+	 * We have emitted a truncation record for this block.
+	 */
+	if (rel->pending_sync->truncated_to != InvalidBlockNumber &&
+		rel->pending_sync->truncated_to <= blkno)
+	{
+		elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block %u, because it was truncated earlier in the same xact",
+			 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+			 blkno);
+		return true;
+	}
+
+	elog(DEBUG2, "skipping WAL-logging for rel %u/%u/%u block %u",
+		 rel->rd_node.spcNode, rel->rd_node.dbNode, rel->rd_node.relNode,
+		 blkno);
+
+	return false;
+}
+
+/*
+ * Sync to disk any relations that we skipped WAL-logging for earlier.
+ */
+void
+smgrDoPendingSyncs(bool isCommit)
+{
+	if (!pendingSyncs)
+		return;
+
+	if (isCommit)
+	{
+		HASH_SEQ_STATUS status;
+		PendingRelSync *pending;
+
+		hash_seq_init(&status, pendingSyncs);
+
+		while ((pending = hash_seq_search(&status)) != NULL)
+		{
+			if (pending->sync_above != InvalidBlockNumber)
+			{
+				FlushRelationBuffersWithoutRelCache(pending->relnode, false);
+				smgrimmedsync(smgropen(pending->relnode, InvalidBackendId), MAIN_FORKNUM);
+
+				elog(DEBUG2, "syncing rel %u/%u/%u", pending->relnode.spcNode,
+					 pending->relnode.dbNode, pending->relnode.relNode);
+			}
+		}
+	}
+
+	hash_destroy(pendingSyncs);
+	pendingSyncs = NULL;
+}
+
 /*
  *	PostPrepare_smgr -- Clean up after a successful PREPARE
  *
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index b5af2be..8aa7e7b 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2372,8 +2372,7 @@ CopyFrom(CopyState cstate)
 	 *	- data is being written to relfilenode created in this transaction
 	 * then we can skip writing WAL.  It's safe because if the transaction
 	 * doesn't commit, we'll discard the table (or the new relfilenode file).
-	 * If it does commit, we'll have done the heap_sync at the bottom of this
-	 * routine first.
+	 * If it does commit, commit will do heap_sync().
 	 *
 	 * As mentioned in comments in utils/rel.h, the in-same-transaction test
 	 * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
@@ -2405,7 +2404,7 @@ CopyFrom(CopyState cstate)
 	{
 		hi_options |= HEAP_INSERT_SKIP_FSM;
 		if (!XLogIsNeeded())
-			hi_options |= HEAP_INSERT_SKIP_WAL;
+			heap_register_sync(cstate->rel);
 	}
 
 	/*
@@ -2782,11 +2781,11 @@ CopyFrom(CopyState cstate)
 	FreeExecutorState(estate);
 
 	/*
-	 * If we skipped writing WAL, then we need to sync the heap (but not
-	 * indexes since those use WAL anyway)
+	 * If we skipped writing WAL, then we will sync the heap at the end of
+	 * the transaction. (We used to do it here, but it was later found out
+	 * that to be safe, we must also avoid WAL-logging any subsequent
+	 * actions on the pages we skipped WAL for). Indexes always use WAL.
 	 */
-	if (hi_options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(cstate->rel);
 
 	return processed;
 }
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 06425cc..408495e 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -567,8 +567,9 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 * We can skip WAL-logging the insertions, unless PITR or streaming
 	 * replication is in use. We can skip the FSM in any case.
 	 */
-	myState->hi_options = HEAP_INSERT_SKIP_FSM |
-		(XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL);
+	if (!XLogIsNeeded())
+		heap_register_sync(intoRelationDesc);
+	myState->hi_options = HEAP_INSERT_SKIP_FSM;
 	myState->bistate = GetBulkInsertState();
 
 	/* Not using WAL requires smgr_targblock be initially invalid */
@@ -617,9 +618,7 @@ intorel_shutdown(DestReceiver *self)
 
 	FreeBulkInsertState(myState->bistate);
 
-	/* If we skipped using WAL, must heap_sync before commit */
-	if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(myState->rel);
+	/* If we skipped using WAL, we will sync the relation at commit */
 
 	/* close rel, but keep lock until commit */
 	heap_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 9ffd91e..8b127e3 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -477,7 +477,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	 */
 	myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
 	if (!XLogIsNeeded())
-		myState->hi_options |= HEAP_INSERT_SKIP_WAL;
+		heap_register_sync(transientrel);
 	myState->bistate = GetBulkInsertState();
 
 	/* Not using WAL requires smgr_targblock be initially invalid */
@@ -520,9 +520,7 @@ transientrel_shutdown(DestReceiver *self)
 
 	FreeBulkInsertState(myState->bistate);
 
-	/* If we skipped using WAL, must heap_sync before commit */
-	if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(myState->transientrel);
+	/* If we skipped using WAL, we will sync the relation at commit */
 
 	/* close transientrel, but keep lock until commit */
 	heap_close(myState->transientrel, NoLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index abb262b..ae69954 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4327,8 +4327,9 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		bistate = GetBulkInsertState();
 
 		hi_options = HEAP_INSERT_SKIP_FSM;
+
 		if (!XLogIsNeeded())
-			hi_options |= HEAP_INSERT_SKIP_WAL;
+			heap_register_sync(newrel);
 	}
 	else
 	{
@@ -4589,8 +4590,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		FreeBulkInsertState(bistate);
 
 		/* If we skipped writing WAL, then we need to sync the heap. */
-		if (hi_options & HEAP_INSERT_SKIP_WAL)
-			heap_sync(newrel);
 
 		heap_close(newrel, NoLock);
 	}
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 5b43a66..f3dcf6e 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -893,7 +893,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 				 * page has been previously WAL-logged, and if not, do that
 				 * now.
 				 */
-				if (RelationNeedsWAL(onerel) &&
+				if (BufferNeedsWAL(onerel, buf) &&
 					PageGetLSN(page) == InvalidXLogRecPtr)
 					log_newpage_buffer(buf, true);
 
@@ -1120,7 +1120,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
 			}
 
 			/* Now WAL-log freezing if necessary */
-			if (RelationNeedsWAL(onerel))
+			if (BufferNeedsWAL(onerel, buf))
 			{
 				XLogRecPtr	recptr;
 
@@ -1480,7 +1480,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
 	MarkBufferDirty(buffer);
 
 	/* XLOG stuff */
-	if (RelationNeedsWAL(onerel))
+	if (BufferNeedsWAL(onerel, buffer))
 	{
 		XLogRecPtr	recptr;
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 2109cbf..e991e9f 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -451,6 +451,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 			BufferAccessStrategy strategy,
 			bool *foundPtr);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
+static void FlushRelationBuffers_common(SMgrRelation smgr, bool islocal);
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int	rnode_comparator(const void *p1, const void *p2);
@@ -3147,20 +3148,41 @@ PrintPinnedBufs(void)
 void
 FlushRelationBuffers(Relation rel)
 {
-	int			i;
-	BufferDesc *bufHdr;
-
 	/* Open rel at the smgr level if not already done */
 	RelationOpenSmgr(rel);
 
-	if (RelationUsesLocalBuffers(rel))
+	FlushRelationBuffers_common(rel->rd_smgr, RelationUsesLocalBuffers(rel));
+}
+
+/*
+ * Like FlushRelationBuffers(), but the relation is specified by a
+ * RelFileNode
+ */
+void
+FlushRelationBuffersWithoutRelCache(RelFileNode rnode, bool islocal)
+{
+	FlushRelationBuffers_common(smgropen(rnode, InvalidBackendId), islocal);
+}
+
+/*
+ * Code shared between functions FlushRelationBuffers() and
+ * FlushRelationBuffersWithoutRelCache().
+ */
+static void
+FlushRelationBuffers_common(SMgrRelation smgr, bool islocal)
+{
+	RelFileNode rnode = smgr->smgr_rnode.node;
+	int			i;
+	BufferDesc *bufHdr;
+
+	if (islocal)
 	{
 		for (i = 0; i < NLocBuffer; i++)
 		{
 			uint32		buf_state;
 
 			bufHdr = GetLocalBufferDescriptor(i);
-			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+			if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 				((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
 				 (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 			{
@@ -3177,7 +3199,7 @@ FlushRelationBuffers(Relation rel)
 
 				PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
 
-				smgrwrite(rel->rd_smgr,
+				smgrwrite(smgr,
 						  bufHdr->tag.forkNum,
 						  bufHdr->tag.blockNum,
 						  localpage,
@@ -3207,18 +3229,18 @@ FlushRelationBuffers(Relation rel)
 		 * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
 		 * and saves some cycles.
 		 */
-		if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+		if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode))
 			continue;
 
 		ReservePrivateRefCountEntry();
 
 		buf_state = LockBufHdr(bufHdr);
-		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+		if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 			(buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-			FlushBuffer(bufHdr, rel->rd_smgr);
+			FlushBuffer(bufHdr, smgr);
 			LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
 			UnpinBuffer(bufHdr, true);
 		}
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index ddb9485..61ff7eb 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -72,6 +72,7 @@
 #include "optimizer/var.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rowsecurity.h"
+#include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 #include "utils/array.h"
@@ -418,6 +419,9 @@ AllocateRelationDesc(Form_pg_class relp)
 	/* which we mark as a reference-counted tupdesc */
 	relation->rd_att->tdrefcount = 1;
 
+	/* pending_sync is set as required later */
+	relation->pending_sync = NULL;
+
 	MemoryContextSwitchTo(oldcxt);
 
 	return relation;
@@ -3353,6 +3357,8 @@ RelationBuildLocalRelation(const char *relname,
 	else
 		rel->rd_rel->relfilenode = relfilenode;
 
+	rel->pending_sync = NULL;
+
 	RelationInitLockInfo(rel);	/* see lmgr.c */
 
 	RelationInitPhysicalAddr(rel);
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 7e85510..3967641 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -25,10 +25,9 @@
 
 
 /* "options" flag bits for heap_insert */
-#define HEAP_INSERT_SKIP_WAL	0x0001
-#define HEAP_INSERT_SKIP_FSM	0x0002
-#define HEAP_INSERT_FROZEN		0x0004
-#define HEAP_INSERT_SPECULATIVE 0x0008
+#define HEAP_INSERT_SKIP_FSM	0x0001
+#define HEAP_INSERT_FROZEN		0x0002
+#define HEAP_INSERT_SPECULATIVE 0x0004
 
 typedef struct BulkInsertStateData *BulkInsertState;
 
@@ -178,6 +177,7 @@ extern void simple_heap_delete(Relation relation, ItemPointer tid);
 extern void simple_heap_update(Relation relation, ItemPointer otid,
 				   HeapTuple tup);
 
+extern void heap_register_sync(Relation relation);
 extern void heap_sync(Relation relation);
 extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
 
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index fea96de..e8e49f1 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -29,6 +29,9 @@ extern void RelationTruncate(Relation rel, BlockNumber nblocks);
  */
 extern void smgrDoPendingDeletes(bool isCommit);
 extern int	smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
+extern void smgrDoPendingSyncs(bool isCommit);
+extern void RecordPendingSync(Relation rel);
+bool BufferNeedsWAL(Relation rel, Buffer buf);
 extern void AtSubCommit_smgr(void);
 extern void AtSubAbort_smgr(void);
 extern void PostPrepare_smgr(void);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 07a32d6..6ec2d26 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -190,6 +190,8 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
 								ForkNumber forkNum);
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
+extern void FlushRelationBuffersWithoutRelCache(RelFileNode rnode,
+									bool islocal);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode,
 					   ForkNumber forkNum, BlockNumber firstDelBlock);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index ab875bb..f802cc1 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -216,6 +216,8 @@ typedef struct RelationData
 
 	/* use "struct" here to avoid needing to include pgstat.h: */
 	struct PgStat_TableStatus *pgstat_info;		/* statistics collection area */
+
+	struct PendingRelSync *pending_sync;
 } RelationData;
 
 
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
deleted file mode 100644
index ccd5943..0000000
--- a/src/test/recovery/t/001_stream_rep.pl
+++ /dev/null
@@ -1,230 +0,0 @@
-# Minimal test testing streaming replication
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 28;
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->start;
-my $backup_name = 'my_backup';
-
-# Take backup
-$node_master->backup($backup_name);
-
-# Create streaming standby linking to master
-my $node_standby_1 = get_new_node('standby_1');
-$node_standby_1->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_1->start;
-
-# Take backup of standby 1 (not mandatory, but useful to check if
-# pg_basebackup works on a standby).
-$node_standby_1->backup($backup_name);
-
-# Take a second backup of the standby while the master is offline.
-$node_master->stop;
-$node_standby_1->backup('my_backup_2');
-$node_master->start;
-
-# Create second standby node linking to standby 1
-my $node_standby_2 = get_new_node('standby_2');
-$node_standby_2->init_from_backup($node_standby_1, $backup_name,
-	has_streaming => 1);
-$node_standby_2->start;
-
-# Create some content on master and check its presence in standby 1
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
-
-# Wait for standbys to catch up
-$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('insert'));
-$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('replay'));
-
-my $result =
-  $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-print "standby 1: $result\n";
-is($result, qq(1002), 'check streamed content on standby 1');
-
-$result =
-  $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-print "standby 2: $result\n";
-is($result, qq(1002), 'check streamed content on standby 2');
-
-# Check that only READ-only queries can run on standbys
-is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
-	3, 'read-only queries on standby 1');
-is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
-	3, 'read-only queries on standby 2');
-
-# Tests for connection parameter target_session_attrs
-note "testing connection parameter \"target_session_attrs\"";
-
-# Routine designed to run tests on the connection parameter
-# target_session_attrs with multiple nodes.
-sub test_target_session_attrs
-{
-	my $node1 = shift;
-	my $node2 = shift;
-	my $target_node = shift;
-	my $mode = shift;
-	my $status = shift;
-
-	my $node1_host = $node1->host;
-	my $node1_port = $node1->port;
-	my $node1_name = $node1->name;
-	my $node2_host = $node2->host;
-	my $node2_port = $node2->port;
-	my $node2_name = $node2->name;
-
-	my $target_name = $target_node->name;
-
-	# Build connection string for connection attempt.
-	my $connstr = "host=$node1_host,$node2_host ";
-	$connstr .= "port=$node1_port,$node2_port ";
-	$connstr .= "target_session_attrs=$mode";
-
-	# The client used for the connection does not matter, only the backend
-	# point does.
-	my ($ret, $stdout, $stderr) =
-		$node1->psql('postgres', 'SHOW port;', extra_params => ['-d', $connstr]);
-	is($status == $ret && $stdout eq $target_node->port, 1,
-	   "connect to node $target_name if mode \"$mode\" and $node1_name,$node2_name listed");
-}
-
-# Connect to master in "read-write" mode with master,standby1 list.
-test_target_session_attrs($node_master, $node_standby_1, $node_master,
-						  "read-write", 0);
-# Connect to master in "read-write" mode with standby1,master list.
-test_target_session_attrs($node_standby_1, $node_master, $node_master,
-						  "read-write", 0);
-# Connect to master in "any" mode with master,standby1 list.
-test_target_session_attrs($node_master, $node_standby_1, $node_master,
-						  "any", 0);
-# Connect to standby1 in "any" mode with standby1,master list.
-test_target_session_attrs($node_standby_1, $node_master, $node_standby_1,
-						  "any", 0);
-
-note "switching to physical replication slot";
-# Switch to using a physical replication slot. We can do this without a new
-# backup since physical slots can go backwards if needed. Do so on both
-# standbys. Since we're going to be testing things that affect the slot state,
-# also increase the standby feedback interval to ensure timely updates.
-my ($slotname_1, $slotname_2) = ('standby_1', 'standby_2');
-$node_master->append_conf('postgresql.conf', "max_replication_slots = 4\n");
-$node_master->restart;
-is($node_master->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_1');]), 0, 'physical slot created on master');
-$node_standby_1->append_conf('recovery.conf', "primary_slot_name = $slotname_1\n");
-$node_standby_1->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
-$node_standby_1->append_conf('postgresql.conf', "max_replication_slots = 4\n");
-$node_standby_1->restart;
-is($node_standby_1->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_2');]), 0, 'physical slot created on intermediate replica');
-$node_standby_2->append_conf('recovery.conf', "primary_slot_name = $slotname_2\n");
-$node_standby_2->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
-$node_standby_2->restart;
-
-sub get_slot_xmins
-{
-	my ($node, $slotname) = @_;
-	my $slotinfo = $node->slot($slotname);
-	return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'});
-}
-
-# There's no hot standby feedback and there are no logical slots on either peer
-# so xmin and catalog_xmin should be null on both slots.
-my ($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
-is($xmin, '', 'non-cascaded slot xmin null with no hs_feedback');
-is($catalog_xmin, '', 'non-cascaded slot xmin null with no hs_feedback');
-
-($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
-is($xmin, '', 'cascaded slot xmin null with no hs_feedback');
-is($catalog_xmin, '', 'cascaded slot xmin null with no hs_feedback');
-
-# Replication still works?
-$node_master->safe_psql('postgres', 'CREATE TABLE replayed(val integer);');
-
-sub replay_check
-{
-	my $newval = $node_master->safe_psql('postgres', 'INSERT INTO replayed(val) SELECT coalesce(max(val),0) + 1 AS newval FROM replayed RETURNING val');
-	$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('insert'));
-	$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('replay'));
-	$node_standby_1->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval])
-		or die "standby_1 didn't replay master value $newval";
-	$node_standby_2->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval])
-		or die "standby_2 didn't replay standby_1 value $newval";
-}
-
-replay_check();
-
-note "enabling hot_standby_feedback";
-# Enable hs_feedback. The slot should gain an xmin. We set the status interval
-# so we'll see the results promptly.
-$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
-$node_standby_1->reload;
-$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
-$node_standby_2->reload;
-replay_check();
-sleep(2);
-
-($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
-isnt($xmin, '', 'non-cascaded slot xmin non-null with hs feedback');
-is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback');
-
-($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
-isnt($xmin, '', 'cascaded slot xmin non-null with hs feedback');
-is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback');
-
-note "doing some work to advance xmin";
-for my $i (10000..11000) {
-	$node_master->safe_psql('postgres', qq[INSERT INTO tab_int VALUES ($i);]);
-}
-$node_master->safe_psql('postgres', 'VACUUM;');
-$node_master->safe_psql('postgres', 'CHECKPOINT;');
-
-my ($xmin2, $catalog_xmin2) = get_slot_xmins($node_master, $slotname_1);
-note "new xmin $xmin2, old xmin $xmin";
-isnt($xmin2, $xmin, 'non-cascaded slot xmin with hs feedback has changed');
-is($catalog_xmin2, '', 'non-cascaded slot xmin still null with hs_feedback unchanged');
-
-($xmin2, $catalog_xmin2) = get_slot_xmins($node_standby_1, $slotname_2);
-note "new xmin $xmin2, old xmin $xmin";
-isnt($xmin2, $xmin, 'cascaded slot xmin with hs feedback has changed');
-is($catalog_xmin2, '', 'cascaded slot xmin still null with hs_feedback unchanged');
-
-note "disabling hot_standby_feedback";
-# Disable hs_feedback. Xmin should be cleared.
-$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
-$node_standby_1->reload;
-$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
-$node_standby_2->reload;
-replay_check();
-sleep(2);
-
-($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
-is($xmin, '', 'non-cascaded slot xmin null with hs feedback reset');
-is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback reset');
-
-($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
-is($xmin, '', 'cascaded slot xmin null with hs feedback reset');
-is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback reset');
-
-note "re-enabling hot_standby_feedback and disabling while stopped";
-$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
-$node_standby_2->reload;
-
-$node_master->safe_psql('postgres', qq[INSERT INTO tab_int VALUES (11000);]);
-replay_check();
-
-$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
-$node_standby_2->stop;
-
-($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
-isnt($xmin, '', 'cascaded slot xmin non-null with postgres shut down');
-
-# Xmin from a previous run should be cleared on startup.
-$node_standby_2->start;
-
-($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
-is($xmin, '', 'cascaded slot xmin reset after startup with hs feedback reset');
diff --git a/src/test/recovery/t/002_archiving.pl b/src/test/recovery/t/002_archiving.pl
deleted file mode 100644
index 83b43bf..0000000
--- a/src/test/recovery/t/002_archiving.pl
+++ /dev/null
@@ -1,53 +0,0 @@
-# test for archiving with hot standby
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 1;
-use File::Copy;
-
-# Initialize master node, doing archives
-my $node_master = get_new_node('master');
-$node_master->init(
-	has_archiving    => 1,
-	allows_streaming => 1);
-my $backup_name = 'my_backup';
-
-# Start it
-$node_master->start;
-
-# Take backup for slave
-$node_master->backup($backup_name);
-
-# Initialize standby node from backup, fetching WAL from archives
-my $node_standby = get_new_node('standby');
-$node_standby->init_from_backup($node_master, $backup_name,
-	has_restoring => 1);
-$node_standby->append_conf(
-	'postgresql.conf', qq(
-wal_retrieve_retry_interval = '100ms'
-));
-$node_standby->start;
-
-# Create some content on master
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-my $current_lsn =
-  $node_master->safe_psql('postgres', "SELECT pg_current_wal_location();");
-
-# Force archiving of WAL file to make it present on master
-$node_master->safe_psql('postgres', "SELECT pg_switch_wal()");
-
-# Add some more content, it should not be present on standby
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-
-# Wait until necessary replay has been done on standby
-my $caughtup_query =
-  "SELECT '$current_lsn'::pg_lsn <= pg_last_wal_replay_location()";
-$node_standby->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby to catch up";
-
-my $result =
-  $node_standby->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-is($result, qq(1000), 'check content from archives');
diff --git a/src/test/recovery/t/003_recovery_targets.pl b/src/test/recovery/t/003_recovery_targets.pl
deleted file mode 100644
index b7b0caa..0000000
--- a/src/test/recovery/t/003_recovery_targets.pl
+++ /dev/null
@@ -1,146 +0,0 @@
-# Test for recovery targets: name, timestamp, XID
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 9;
-
-# Create and test a standby from given backup, with a certain
-# recovery target.
-sub test_recovery_standby
-{
-	my $test_name       = shift;
-	my $node_name       = shift;
-	my $node_master     = shift;
-	my $recovery_params = shift;
-	my $num_rows        = shift;
-	my $until_lsn       = shift;
-
-	my $node_standby = get_new_node($node_name);
-	$node_standby->init_from_backup($node_master, 'my_backup',
-		has_restoring => 1);
-
-	foreach my $param_item (@$recovery_params)
-	{
-		$node_standby->append_conf(
-			'recovery.conf',
-			qq($param_item
-));
-	}
-
-	$node_standby->start;
-
-	# Wait until standby has replayed enough data
-	my $caughtup_query =
-	  "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_location()";
-	$node_standby->poll_query_until('postgres', $caughtup_query)
-	  or die "Timed out while waiting for standby to catch up";
-
-	# Create some content on master and check its presence in standby
-	my $result =
-	  $node_standby->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-	is($result, qq($num_rows), "check standby content for $test_name");
-
-	# Stop standby node
-	$node_standby->teardown_node;
-}
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(has_archiving => 1, allows_streaming => 1);
-
-# Start it
-$node_master->start;
-
-# Create data before taking the backup, aimed at testing
-# recovery_target = 'immediate'
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-my $lsn1 =
-  $node_master->safe_psql('postgres', "SELECT pg_current_wal_location();");
-
-# Take backup from which all operations will be run
-$node_master->backup('my_backup');
-
-# Insert some data with used as a replay reference, with a recovery
-# target TXID.
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-my $ret = $node_master->safe_psql('postgres',
-	"SELECT pg_current_wal_location(), txid_current();");
-my ($lsn2, $recovery_txid) = split /\|/, $ret;
-
-# More data, with recovery target timestamp
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(2001,3000))");
-$ret = $node_master->safe_psql('postgres',
-	"SELECT pg_current_wal_location(), now();");
-my ($lsn3, $recovery_time) = split /\|/, $ret;
-
-# Even more data, this time with a recovery target name
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(3001,4000))");
-my $recovery_name = "my_target";
-my $lsn4 =
-  $node_master->safe_psql('postgres', "SELECT pg_current_wal_location();");
-$node_master->safe_psql('postgres',
-	"SELECT pg_create_restore_point('$recovery_name');");
-
-# And now for a recovery target LSN
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(4001,5000))");
-my $recovery_lsn = $node_master->safe_psql('postgres', "SELECT pg_current_wal_location()");
-my $lsn5 =
-  $node_master->safe_psql('postgres', "SELECT pg_current_wal_location();");
-
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(5001,6000))");
-
-# Force archiving of WAL file
-$node_master->safe_psql('postgres', "SELECT pg_switch_wal()");
-
-# Test recovery targets
-my @recovery_params = ("recovery_target = 'immediate'");
-test_recovery_standby('immediate target',
-	'standby_1', $node_master, \@recovery_params, "1000", $lsn1);
-@recovery_params = ("recovery_target_xid = '$recovery_txid'");
-test_recovery_standby('XID', 'standby_2', $node_master, \@recovery_params,
-	"2000", $lsn2);
-@recovery_params = ("recovery_target_time = '$recovery_time'");
-test_recovery_standby('time', 'standby_3', $node_master, \@recovery_params,
-	"3000", $lsn3);
-@recovery_params = ("recovery_target_name = '$recovery_name'");
-test_recovery_standby('name', 'standby_4', $node_master, \@recovery_params,
-	"4000", $lsn4);
-@recovery_params = ("recovery_target_lsn = '$recovery_lsn'");
-test_recovery_standby('LSN', 'standby_5', $node_master, \@recovery_params,
-	"5000", $lsn5);
-
-# Multiple targets
-# Last entry has priority (note that an array respects the order of items
-# not hashes).
-@recovery_params = (
-	"recovery_target_name = '$recovery_name'",
-	"recovery_target_xid  = '$recovery_txid'",
-	"recovery_target_time = '$recovery_time'");
-test_recovery_standby('name + XID + time',
-	'standby_6', $node_master, \@recovery_params, "3000", $lsn3);
-@recovery_params = (
-	"recovery_target_time = '$recovery_time'",
-	"recovery_target_name = '$recovery_name'",
-	"recovery_target_xid  = '$recovery_txid'");
-test_recovery_standby('time + name + XID',
-	'standby_7', $node_master, \@recovery_params, "2000", $lsn2);
-@recovery_params = (
-	"recovery_target_xid  = '$recovery_txid'",
-	"recovery_target_time = '$recovery_time'",
-	"recovery_target_name = '$recovery_name'");
-test_recovery_standby('XID + time + name',
-	'standby_8', $node_master, \@recovery_params, "4000", $lsn4);
-@recovery_params = (
-	"recovery_target_xid  = '$recovery_txid'",
-	"recovery_target_time = '$recovery_time'",
-	"recovery_target_name = '$recovery_name'",
-	"recovery_target_lsn = '$recovery_lsn'",);
-test_recovery_standby('XID + time + name + LSN',
-	'standby_9', $node_master, \@recovery_params, "5000", $lsn5);
diff --git a/src/test/recovery/t/004_timeline_switch.pl b/src/test/recovery/t/004_timeline_switch.pl
deleted file mode 100644
index 7c6587a..0000000
--- a/src/test/recovery/t/004_timeline_switch.pl
+++ /dev/null
@@ -1,62 +0,0 @@
-# Test for timeline switch
-# Ensure that a cascading standby is able to follow a newly-promoted standby
-# on a new timeline.
-use strict;
-use warnings;
-use File::Path qw(rmtree);
-use PostgresNode;
-use TestLib;
-use Test::More tests => 1;
-
-$ENV{PGDATABASE} = 'postgres';
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->start;
-
-# Take backup
-my $backup_name = 'my_backup';
-$node_master->backup($backup_name);
-
-# Create two standbys linking to it
-my $node_standby_1 = get_new_node('standby_1');
-$node_standby_1->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_1->start;
-my $node_standby_2 = get_new_node('standby_2');
-$node_standby_2->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_2->start;
-
-# Create some content on master
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-
-# Wait until standby has replayed enough data on standby 1
-$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('write'));
-
-# Stop and remove master, and promote standby 1, switching it to a new timeline
-$node_master->teardown_node;
-$node_standby_1->promote;
-
-# Switch standby 2 to replay from standby 1
-rmtree($node_standby_2->data_dir . '/recovery.conf');
-my $connstr_1 = $node_standby_1->connstr;
-$node_standby_2->append_conf(
-	'recovery.conf', qq(
-primary_conninfo='$connstr_1 application_name=@{[$node_standby_2->name]}'
-standby_mode=on
-recovery_target_timeline='latest'
-));
-$node_standby_2->restart;
-
-# Insert some data in standby 1 and check its presence in standby 2
-# to ensure that the timeline switch has been done.
-$node_standby_1->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('write'));
-
-my $result =
-  $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-is($result, qq(2000), 'check content of standby 2');
diff --git a/src/test/recovery/t/005_replay_delay.pl b/src/test/recovery/t/005_replay_delay.pl
deleted file mode 100644
index cd9e8f5..0000000
--- a/src/test/recovery/t/005_replay_delay.pl
+++ /dev/null
@@ -1,69 +0,0 @@
-# Checks for recovery_min_apply_delay
-use strict;
-use warnings;
-
-use PostgresNode;
-use TestLib;
-use Test::More tests => 1;
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->start;
-
-# And some content
-$node_master->safe_psql('postgres',
-	"CREATE TABLE tab_int AS SELECT generate_series(1, 10) AS a");
-
-# Take backup
-my $backup_name = 'my_backup';
-$node_master->backup($backup_name);
-
-# Create streaming standby from backup
-my $node_standby = get_new_node('standby');
-my $delay        = 3;
-$node_standby->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby->append_conf(
-	'recovery.conf', qq(
-recovery_min_apply_delay = '${delay}s'
-));
-$node_standby->start;
-
-# Make new content on master and check its presence in standby depending
-# on the delay applied above. Before doing the insertion, get the
-# current timestamp that will be used as a comparison base. Even on slow
-# machines, this allows to have a predictable behavior when comparing the
-# delay between data insertion moment on master and replay time on standby.
-my $master_insert_time = time();
-$node_master->safe_psql('postgres',
-	"INSERT INTO tab_int VALUES (generate_series(11, 20))");
-
-# Now wait for replay to complete on standby. We're done waiting when the
-# slave has replayed up to the previously saved master LSN.
-my $until_lsn =
-  $node_master->safe_psql('postgres', "SELECT pg_current_wal_location()");
-
-my $remaining = 90;
-while ($remaining-- > 0)
-{
-
-	# Done waiting?
-	my $replay_status = $node_standby->safe_psql('postgres',
-		"SELECT (pg_last_wal_replay_location() - '$until_lsn'::pg_lsn) >= 0"
-	);
-	last if $replay_status eq 't';
-
-	# No, sleep some more.
-	my $sleep = $master_insert_time + $delay - time();
-	$sleep = 1 if $sleep < 1;
-	sleep $sleep;
-}
-
-die "Maximum number of attempts reached ($remaining remain)"
-  if $remaining < 0;
-
-# This test is successful if and only if the LSN has been applied with at least
-# the configured apply delay.
-ok(time() - $master_insert_time >= $delay,
-	"standby applies WAL only after replication delay");
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
deleted file mode 100644
index bf9b50a..0000000
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ /dev/null
@@ -1,104 +0,0 @@
-# Testing of logical decoding using SQL interface and/or pg_recvlogical
-#
-# Most logical decoding tests are in contrib/test_decoding. This module
-# is for work that doesn't fit well there, like where server restarts
-# are required.
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 16;
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->append_conf(
-		'postgresql.conf', qq(
-wal_level = logical
-));
-$node_master->start;
-my $backup_name = 'master_backup';
-
-$node_master->safe_psql('postgres', qq[CREATE TABLE decoding_test(x integer, y text);]);
-
-$node_master->safe_psql('postgres', qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]);
-
-$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
-
-# Basic decoding works
-my($result) = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
-is(scalar(my @foobar = split /^/m, $result), 12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
-
-# If we immediately crash the server we might lose the progress we just made
-# and replay the same changes again. But a clean shutdown should never repeat
-# the same changes when we use the SQL decoding interface.
-$node_master->restart('fast');
-
-# There are no new writes, so the result should be empty.
-$result = $node_master->safe_psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
-chomp($result);
-is($result, '', 'Decoding after fast restart repeats no rows');
-
-# Insert some rows and verify that we get the same results from pg_recvlogical
-# and the SQL interface.
-$node_master->safe_psql('postgres', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]);
-
-my $expected = q{BEGIN
-table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
-table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
-table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
-table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
-COMMIT};
-
-my $stdout_sql = $node_master->safe_psql('postgres', qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]);
-is($stdout_sql, $expected, 'got expected output from SQL decoding session');
-
-my $endpos = $node_master->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;");
-print "waiting to replay $endpos\n";
-
-my $stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
-chomp($stdout_recv);
-is($stdout_recv, $expected, 'got same expected output from pg_recvlogical decoding session');
-
-$stdout_recv = $node_master->pg_recvlogical_upto('postgres', 'test_slot', $endpos, 10, 'include-xids' => '0', 'skip-empty-xacts' => '1');
-chomp($stdout_recv);
-is($stdout_recv, '', 'pg_recvlogical acknowledged changes, nothing pending on slot');
-
-$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
-
-is($node_master->psql('otherdb', "SELECT location FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY location DESC LIMIT 1;"), 3,
-	'replaying logical slot from another database fails');
-
-$node_master->safe_psql('otherdb', qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]);
-
-# make sure you can't drop a slot while active
-my $pg_recvlogical = IPC::Run::start(['pg_recvlogical', '-d', $node_master->connstr('otherdb'), '-S', 'otherdb_slot', '-f', '-', '--start']);
-$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)");
-is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 3,
-	'dropping a DB with inactive logical slots fails');
-$pg_recvlogical->kill_kill;
-is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
-	'logical slot still exists');
-
-$node_master->poll_query_until('otherdb', "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)");
-is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 0,
-	'dropping a DB with inactive logical slots succeeds');
-is($node_master->slot('otherdb_slot')->{'slot_name'}, undef,
-	'logical slot was actually dropped with DB');
-
-# Restarting a node with wal_level = logical that has existing
-# slots must succeed, but decoding from those slots must fail.
-$node_master->safe_psql('postgres', 'ALTER SYSTEM SET wal_level = replica');
-is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'logical', 'wal_level is still logical before restart');
-$node_master->restart;
-is($node_master->safe_psql('postgres', 'SHOW wal_level'), 'replica', 'wal_level is replica');
-isnt($node_master->slot('test_slot')->{'catalog_xmin'}, '0',
-	'restored slot catalog_xmin is nonzero');
-is($node_master->psql('postgres', qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]), 3,
-	'reading from slot with wal_level < logical fails');
-is($node_master->psql('postgres', q[SELECT pg_drop_replication_slot('test_slot')]), 0,
-	'can drop logical slot while wal_level = replica');
-is($node_master->slot('test_slot')->{'catalog_xmin'}, '', 'slot was dropped');
-
-# done with the node
-$node_master->stop;
diff --git a/src/test/recovery/t/007_sync_rep.pl b/src/test/recovery/t/007_sync_rep.pl
deleted file mode 100644
index e11b428..0000000
--- a/src/test/recovery/t/007_sync_rep.pl
+++ /dev/null
@@ -1,205 +0,0 @@
-# Minimal test testing synchronous replication sync_state transition
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 11;
-
-# Query checking sync_priority and sync_state of each standby
-my $check_sql =
-"SELECT application_name, sync_priority, sync_state FROM pg_stat_replication ORDER BY application_name;";
-
-# Check that sync_state of each standby is expected.
-# If $setting is given, synchronous_standby_names is set to it and
-# the configuration file is reloaded before the test.
-sub test_sync_state
-{
-	my ($self, $expected, $msg, $setting) = @_;
-
-	if (defined($setting))
-	{
-		$self->psql('postgres',
-			"ALTER SYSTEM SET synchronous_standby_names = '$setting';");
-		$self->reload;
-	}
-
-	my $timeout_max = 30;
-	my $timeout     = 0;
-	my $result;
-
-	# A reload may take some time to take effect on busy machines,
-	# hence use a loop with a timeout to give some room for the test
-	# to pass.
-	while ($timeout < $timeout_max)
-	{
-		$result = $self->safe_psql('postgres', $check_sql);
-
-		last if ($result eq $expected);
-
-		$timeout++;
-		sleep 1;
-	}
-
-	is($result, $expected, $msg);
-}
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-$node_master->start;
-my $backup_name = 'master_backup';
-
-# Take backup
-$node_master->backup($backup_name);
-
-# Create standby1 linking to master
-my $node_standby_1 = get_new_node('standby1');
-$node_standby_1->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_1->start;
-
-# Create standby2 linking to master
-my $node_standby_2 = get_new_node('standby2');
-$node_standby_2->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_2->start;
-
-# Create standby3 linking to master
-my $node_standby_3 = get_new_node('standby3');
-$node_standby_3->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_3->start;
-
-# Check that sync_state is determined correctly when
-# synchronous_standby_names is specified in old syntax.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|2|potential
-standby3|0|async),
-	'old syntax of synchronous_standby_names',
-	'standby1,standby2');
-
-# Check that all the standbys are considered as either sync or
-# potential when * is specified in synchronous_standby_names.
-# Note that standby1 is chosen as sync standby because
-# it's stored in the head of WalSnd array which manages
-# all the standbys though they have the same priority.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|1|potential
-standby3|1|potential),
-	'asterisk in synchronous_standby_names',
-	'*');
-
-# Stop and start standbys to rearrange the order of standbys
-# in WalSnd array. Now, if standbys have the same priority,
-# standby2 is selected preferentially and standby3 is next.
-$node_standby_1->stop;
-$node_standby_2->stop;
-$node_standby_3->stop;
-
-$node_standby_2->start;
-$node_standby_3->start;
-
-# Specify 2 as the number of sync standbys.
-# Check that two standbys are in 'sync' state.
-test_sync_state(
-	$node_master, qq(standby2|2|sync
-standby3|3|sync),
-	'2 synchronous standbys',
-	'2(standby1,standby2,standby3)');
-
-# Start standby1
-$node_standby_1->start;
-
-# Create standby4 linking to master
-my $node_standby_4 = get_new_node('standby4');
-$node_standby_4->init_from_backup($node_master, $backup_name,
-	has_streaming => 1);
-$node_standby_4->start;
-
-# Check that standby1 and standby2 whose names appear earlier in
-# synchronous_standby_names are considered as sync. Also check that
-# standby3 appearing later represents potential, and standby4 is
-# in 'async' state because it's not in the list.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|2|sync
-standby3|3|potential
-standby4|0|async),
-	'2 sync, 1 potential, and 1 async');
-
-# Check that sync_state of each standby is determined correctly
-# when num_sync exceeds the number of names of potential sync standbys
-# specified in synchronous_standby_names.
-test_sync_state(
-	$node_master, qq(standby1|0|async
-standby2|4|sync
-standby3|3|sync
-standby4|1|sync),
-	'num_sync exceeds the num of potential sync standbys',
-	'6(standby4,standby0,standby3,standby2)');
-
-# The setting that * comes before another standby name is acceptable
-# but does not make sense in most cases. Check that sync_state is
-# chosen properly even in case of that setting.
-# The priority of standby2 should be 2 because it matches * first.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|2|sync
-standby3|2|potential
-standby4|2|potential),
-	'asterisk comes before another standby name',
-	'2(standby1,*,standby2)');
-
-# Check that the setting of '2(*)' chooses standby2 and standby3 that are stored
-# earlier in WalSnd array as sync standbys.
-test_sync_state(
-	$node_master, qq(standby1|1|potential
-standby2|1|sync
-standby3|1|sync
-standby4|1|potential),
-	'multiple standbys having the same priority are chosen as sync',
-	'2(*)');
-
-# Stop Standby3 which is considered in 'sync' state.
-$node_standby_3->stop;
-
-# Check that the state of standby1 stored earlier in WalSnd array than
-# standby4 is transited from potential to sync.
-test_sync_state(
-	$node_master, qq(standby1|1|sync
-standby2|1|sync
-standby4|1|potential),
-	'potential standby found earlier in array is promoted to sync');
-
-# Check that standby1 and standby2 are chosen as sync standbys
-# based on their priorities.
-test_sync_state(
-$node_master, qq(standby1|1|sync
-standby2|2|sync
-standby4|0|async),
-'priority-based sync replication specified by FIRST keyword',
-'FIRST 2(standby1, standby2)');
-
-# Check that all the listed standbys are considered as candidates
-# for sync standbys in a quorum-based sync replication.
-test_sync_state(
-$node_master, qq(standby1|1|quorum
-standby2|2|quorum
-standby4|0|async),
-'2 quorum and 1 async',
-'ANY 2(standby1, standby2)');
-
-# Start Standby3 which will be considered in 'quorum' state.
-$node_standby_3->start;
-
-# Check that the setting of 'ANY 2(*)' chooses all standbys as
-# candidates for quorum sync standbys.
-test_sync_state(
-$node_master, qq(standby1|1|quorum
-standby2|1|quorum
-standby3|1|quorum
-standby4|1|quorum),
-'all standbys are considered as candidates for quorum sync standbys',
-'ANY 2(*)');
diff --git a/src/test/recovery/t/008_fsm_truncation.pl b/src/test/recovery/t/008_fsm_truncation.pl
deleted file mode 100644
index 8aa8a4f..0000000
--- a/src/test/recovery/t/008_fsm_truncation.pl
+++ /dev/null
@@ -1,92 +0,0 @@
-# Test WAL replay of FSM changes.
-#
-# FSM changes don't normally need to be WAL-logged, except for truncation.
-# The FSM mustn't return a page that doesn't exist (anymore).
-use strict;
-use warnings;
-
-use PostgresNode;
-use TestLib;
-use Test::More tests => 1;
-
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1);
-
-$node_master->append_conf('postgresql.conf', qq{
-fsync = on
-wal_log_hints = on
-max_prepared_transactions = 5
-autovacuum = off
-});
-
-# Create a master node and its standby, initializing both with some data
-# at the same time.
-$node_master->start;
-
-$node_master->backup('master_backup');
-my $node_standby = get_new_node('standby');
-$node_standby->init_from_backup($node_master, 'master_backup',
-	has_streaming => 1);
-$node_standby->start;
-
-$node_master->psql('postgres', qq{
-create table testtab (a int, b char(100));
-insert into testtab select generate_series(1,1000), 'foo';
-insert into testtab select generate_series(1,1000), 'foo';
-delete from testtab where ctid > '(8,0)';
-});
-
-# Take a lock on the table to prevent following vacuum from truncating it
-$node_master->psql('postgres', qq{
-begin;
-lock table testtab in row share mode;
-prepare transaction 'p1';
-});
-
-# Vacuum, update FSM without truncation
-$node_master->psql('postgres', 'vacuum verbose testtab');
-
-# Force a checkpoint
-$node_master->psql('postgres', 'checkpoint');
-
-# Now do some more insert/deletes, another vacuum to ensure full-page writes
-# are done
-$node_master->psql('postgres', qq{
-insert into testtab select generate_series(1,1000), 'foo';
-delete from testtab where ctid > '(8,0)';
-vacuum verbose testtab;
-});
-
-# Ensure all buffers are now clean on the standby
-$node_standby->psql('postgres', 'checkpoint');
-
-# Release the lock, vacuum again which should lead to truncation
-$node_master->psql('postgres', qq{
-rollback prepared 'p1';
-vacuum verbose testtab;
-});
-
-$node_master->psql('postgres', 'checkpoint');
-my $until_lsn =
-	$node_master->safe_psql('postgres', "SELECT pg_current_wal_location();");
-
-# Wait long enough for standby to receive and apply all WAL
-my $caughtup_query =
-	"SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_location()";
-$node_standby->poll_query_until('postgres', $caughtup_query)
-	or die "Timed out while waiting for standby to catch up";
-
-# Promote the standby
-$node_standby->promote;
-$node_standby->poll_query_until('postgres',
-	"SELECT NOT pg_is_in_recovery()")
-  or die "Timed out while waiting for promotion of standby";
-$node_standby->psql('postgres', 'checkpoint');
-
-# Restart to discard in-memory copy of FSM
-$node_standby->restart;
-
-# Insert should work on standby
-is($node_standby->psql('postgres',
-   qq{insert into testtab select generate_series(1,1000), 'foo';}),
-   0, 'INSERT succeeds with truncated relation FSM');
diff --git a/src/test/recovery/t/009_twophase.pl b/src/test/recovery/t/009_twophase.pl
deleted file mode 100644
index be7f00b..0000000
--- a/src/test/recovery/t/009_twophase.pl
+++ /dev/null
@@ -1,322 +0,0 @@
-# Tests dedicated to two-phase commit in recovery
-use strict;
-use warnings;
-
-use PostgresNode;
-use TestLib;
-use Test::More tests => 13;
-
-# Setup master node
-my $node_master = get_new_node("master");
-$node_master->init(allows_streaming => 1);
-$node_master->append_conf('postgresql.conf', qq(
-	max_prepared_transactions = 10
-	log_checkpoints = true
-));
-$node_master->start;
-$node_master->backup('master_backup');
-$node_master->psql('postgres', "CREATE TABLE t_009_tbl (id int)");
-
-# Setup slave node
-my $node_slave = get_new_node('slave');
-$node_slave->init_from_backup($node_master, 'master_backup', has_streaming => 1);
-$node_slave->start;
-
-# Switch to synchronous replication
-$node_master->append_conf('postgresql.conf', qq(
-	synchronous_standby_names = '*'
-));
-$node_master->psql('postgres', "SELECT pg_reload_conf()");
-
-my $psql_out = '';
-my $psql_rc = '';
-
-###############################################################################
-# Check that we can commit and abort transaction after soft restart.
-# Here checkpoint happens before shutdown and no WAL replay will occur at next
-# startup. In this case postgres re-creates shared-memory state from twophase
-# files.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (142);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (143);
-	PREPARE TRANSACTION 'xact_009_2';");
-$node_master->stop;
-$node_master->start;
-
-$psql_rc = $node_master->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
-is($psql_rc, '0', 'Commit prepared transaction after restart');
-
-$psql_rc = $node_master->psql('postgres', "ROLLBACK PREPARED 'xact_009_2'");
-is($psql_rc, '0', 'Rollback prepared transaction after restart');
-
-###############################################################################
-# Check that we can commit and abort after a hard restart.
-# At next startup, WAL replay will re-create shared memory state for prepared
-# transaction using dedicated WAL records.
-###############################################################################
-
-$node_master->psql('postgres', "
-	CHECKPOINT;
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (142);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (143);
-	PREPARE TRANSACTION 'xact_009_2';");
-$node_master->teardown_node;
-$node_master->start;
-
-$psql_rc = $node_master->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
-is($psql_rc, '0', 'Commit prepared transaction after teardown');
-
-$psql_rc = $node_master->psql('postgres', "ROLLBACK PREPARED 'xact_009_2'");
-is($psql_rc, '0', 'Rollback prepared transaction after teardown');
-
-###############################################################################
-# Check that WAL replay can handle several transactions with same GID name.
-###############################################################################
-
-$node_master->psql('postgres', "
-	CHECKPOINT;
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';
-	COMMIT PREPARED 'xact_009_1';
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';");
-$node_master->teardown_node;
-$node_master->start;
-
-$psql_rc = $node_master->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
-is($psql_rc, '0', 'Replay several transactions with same GID');
-
-###############################################################################
-# Check that WAL replay cleans up its shared memory state and releases locks
-# while replaying transaction commits.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';
-	COMMIT PREPARED 'xact_009_1';");
-$node_master->teardown_node;
-$node_master->start;
-$psql_rc = $node_master->psql('postgres', "
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	-- This prepare can fail due to conflicting GID or locks conflicts if
-	-- replay did not fully cleanup its state on previous commit.
-	PREPARE TRANSACTION 'xact_009_1';");
-is($psql_rc, '0', "Cleanup of shared memory state for 2PC commit");
-
-$node_master->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
-
-###############################################################################
-# Check that WAL replay will cleanup its shared memory state on running slave.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';
-	COMMIT PREPARED 'xact_009_1';");
-$node_slave->psql('postgres', "SELECT count(*) FROM pg_prepared_xacts",
-	  stdout => \$psql_out);
-is($psql_out, '0',
-   "Cleanup of shared memory state on running standby without checkpoint");
-
-###############################################################################
-# Same as in previous case, but let's force checkpoint on slave between
-# prepare and commit to use on-disk twophase files.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';");
-$node_slave->psql('postgres', "CHECKPOINT");
-$node_master->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
-$node_slave->psql('postgres', "SELECT count(*) FROM pg_prepared_xacts",
-	  stdout => \$psql_out);
-is($psql_out, '0',
-   "Cleanup of shared memory state on running standby after checkpoint");
-
-###############################################################################
-# Check that prepared transactions can be committed on promoted slave.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';");
-$node_master->teardown_node;
-$node_slave->promote;
-$node_slave->poll_query_until('postgres',
-	"SELECT NOT pg_is_in_recovery()")
-  or die "Timed out while waiting for promotion of standby";
-
-$psql_rc = $node_slave->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
-is($psql_rc, '0', "Restore of prepared transaction on promoted slave");
-
-# change roles
-($node_master, $node_slave) = ($node_slave, $node_master);
-$node_slave->enable_streaming($node_master);
-$node_slave->append_conf('recovery.conf', qq(
-recovery_target_timeline='latest'
-));
-$node_slave->start;
-
-###############################################################################
-# Check that prepared transactions are replayed after soft restart of standby
-# while master is down. Since standby knows that master is down it uses a
-# different code path on startup to ensure that the status of transactions is
-# consistent.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (42);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';");
-$node_master->stop;
-$node_slave->restart;
-$node_slave->promote;
-$node_slave->poll_query_until('postgres',
-	"SELECT NOT pg_is_in_recovery()")
-  or die "Timed out while waiting for promotion of standby";
-
-$node_slave->psql('postgres', "SELECT count(*) FROM pg_prepared_xacts",
-	  stdout => \$psql_out);
-is($psql_out, '1',
-   "Restore prepared transactions from files with master down");
-
-# restore state
-($node_master, $node_slave) = ($node_slave, $node_master);
-$node_slave->enable_streaming($node_master);
-$node_slave->append_conf('recovery.conf', qq(
-recovery_target_timeline='latest'
-));
-$node_slave->start;
-$node_master->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
-
-###############################################################################
-# Check that prepared transactions are correctly replayed after slave hard
-# restart while master is down.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (242);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (243);
-	PREPARE TRANSACTION 'xact_009_1';
-	");
-$node_master->stop;
-$node_slave->teardown_node;
-$node_slave->start;
-$node_slave->promote;
-$node_slave->poll_query_until('postgres',
-	"SELECT NOT pg_is_in_recovery()")
-  or die "Timed out while waiting for promotion of standby";
-
-$node_slave->psql('postgres', "SELECT count(*) FROM pg_prepared_xacts",
-	  stdout => \$psql_out);
-is($psql_out, '1',
-   "Restore prepared transactions from records with master down");
-
-# restore state
-($node_master, $node_slave) = ($node_slave, $node_master);
-$node_slave->enable_streaming($node_master);
-$node_slave->append_conf('recovery.conf', qq(
-recovery_target_timeline='latest'
-));
-$node_slave->start;
-$node_master->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
-
-
-###############################################################################
-# Check for a lock conflict between prepared transaction with DDL inside and replay of
-# XLOG_STANDBY_LOCK wal record.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	CREATE TABLE t_009_tbl2 (id int);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl2 VALUES (42);
-	PREPARE TRANSACTION 'xact_009_1';
-	-- checkpoint will issue XLOG_STANDBY_LOCK that can conflict with lock
-	-- held by 'create table' statement
-	CHECKPOINT;
-	COMMIT PREPARED 'xact_009_1';");
-
-$node_slave->psql('postgres', "SELECT count(*) FROM pg_prepared_xacts",
-	  stdout => \$psql_out);
-is($psql_out, '0', "Replay prepared transaction with DDL");
-
-
-###############################################################################
-# Check that replay will correctly set SUBTRANS and properly advance nextXid
-# so that it won't conflict with savepoint xids.
-###############################################################################
-
-$node_master->psql('postgres', "
-	BEGIN;
-	DELETE FROM t_009_tbl;
-	INSERT INTO t_009_tbl VALUES (43);
-	SAVEPOINT s1;
-	INSERT INTO t_009_tbl VALUES (43);
-	SAVEPOINT s2;
-	INSERT INTO t_009_tbl VALUES (43);
-	SAVEPOINT s3;
-	INSERT INTO t_009_tbl VALUES (43);
-	SAVEPOINT s4;
-	INSERT INTO t_009_tbl VALUES (43);
-	SAVEPOINT s5;
-	INSERT INTO t_009_tbl VALUES (43);
-	PREPARE TRANSACTION 'xact_009_1';
-	CHECKPOINT;");
-
-$node_master->stop;
-$node_master->start;
-$node_master->psql('postgres', "
-	-- here we can get xid of previous savepoint if nextXid
-	-- wasn't properly advanced
-	BEGIN;
-	INSERT INTO t_009_tbl VALUES (142);
-	ROLLBACK;
-	COMMIT PREPARED 'xact_009_1';");
-
-$node_master->psql('postgres', "SELECT count(*) FROM t_009_tbl",
-	  stdout => \$psql_out);
-is($psql_out, '6', "Check nextXid handling for prepared subtransactions");
diff --git a/src/test/recovery/t/010_logical_decoding_timelines.pl b/src/test/recovery/t/010_logical_decoding_timelines.pl
deleted file mode 100644
index cdddb4d..0000000
--- a/src/test/recovery/t/010_logical_decoding_timelines.pl
+++ /dev/null
@@ -1,184 +0,0 @@
-# Demonstrate that logical can follow timeline switches.
-#
-# Logical replication slots can follow timeline switches but it's
-# normally not possible to have a logical slot on a replica where
-# promotion and a timeline switch can occur. The only ways
-# we can create that circumstance are:
-#
-# * By doing a filesystem-level copy of the DB, since pg_basebackup
-#   excludes pg_replslot but we can copy it directly; or
-#
-# * by creating a slot directly at the C level on the replica and
-#   advancing it as we go using the low level APIs. It can't be done
-#   from SQL since logical decoding isn't allowed on replicas.
-#
-# This module uses the first approach to show that timeline following
-# on a logical slot works.
-#
-# (For convenience, it also tests some recovery-related operations
-# on logical slots).
-#
-use strict;
-use warnings;
-
-use PostgresNode;
-use TestLib;
-use Test::More tests => 13;
-use RecursiveCopy;
-use File::Copy;
-use IPC::Run ();
-use Scalar::Util qw(blessed);
-
-my ($stdout, $stderr, $ret);
-
-# Initialize master node
-my $node_master = get_new_node('master');
-$node_master->init(allows_streaming => 1, has_archiving => 1);
-$node_master->append_conf('postgresql.conf', q[
-wal_level = 'logical'
-max_replication_slots = 3
-max_wal_senders = 2
-log_min_messages = 'debug2'
-hot_standby_feedback = on
-wal_receiver_status_interval = 1
-]);
-$node_master->dump_info;
-$node_master->start;
-
-note "testing logical timeline following with a filesystem-level copy";
-
-$node_master->safe_psql('postgres',
-"SELECT pg_create_logical_replication_slot('before_basebackup', 'test_decoding');"
-);
-$node_master->safe_psql('postgres', "CREATE TABLE decoding(blah text);");
-$node_master->safe_psql('postgres',
-	"INSERT INTO decoding(blah) VALUES ('beforebb');");
-
-# We also want to verify that DROP DATABASE on a standby with a logical
-# slot works. This isn't strictly related to timeline following, but
-# the only way to get a logical slot on a standby right now is to use
-# the same physical copy trick, so:
-$node_master->safe_psql('postgres', 'CREATE DATABASE dropme;');
-$node_master->safe_psql('dropme',
-"SELECT pg_create_logical_replication_slot('dropme_slot', 'test_decoding');"
-);
-
-$node_master->safe_psql('postgres', 'CHECKPOINT;');
-
-my $backup_name = 'b1';
-$node_master->backup_fs_hot($backup_name);
-
-$node_master->safe_psql('postgres',
-	q[SELECT pg_create_physical_replication_slot('phys_slot');]);
-
-my $node_replica = get_new_node('replica');
-$node_replica->init_from_backup(
-	$node_master, $backup_name,
-	has_streaming => 1,
-	has_restoring => 1);
-$node_replica->append_conf(
-	'recovery.conf', q[primary_slot_name = 'phys_slot']);
-
-$node_replica->start;
-
-# If we drop 'dropme' on the master, the standby should drop the
-# db and associated slot.
-is($node_master->psql('postgres', 'DROP DATABASE dropme'), 0,
-	'dropped DB with logical slot OK on master');
-$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('insert'));
-is($node_replica->safe_psql('postgres', q[SELECT 1 FROM pg_database WHERE datname = 'dropme']), '',
-	'dropped DB dropme on standby');
-is($node_master->slot('dropme_slot')->{'slot_name'}, undef,
-	'logical slot was actually dropped on standby');
-
-# Back to testing failover...
-$node_master->safe_psql('postgres',
-"SELECT pg_create_logical_replication_slot('after_basebackup', 'test_decoding');"
-);
-$node_master->safe_psql('postgres',
-	"INSERT INTO decoding(blah) VALUES ('afterbb');");
-$node_master->safe_psql('postgres', 'CHECKPOINT;');
-
-# Verify that only the before base_backup slot is on the replica
-$stdout = $node_replica->safe_psql('postgres',
-	'SELECT slot_name FROM pg_replication_slots ORDER BY slot_name');
-is($stdout, 'before_basebackup',
-	'Expected to find only slot before_basebackup on replica');
-
-# Examine the physical slot the replica uses to stream changes
-# from the master to make sure its hot_standby_feedback
-# has locked in a catalog_xmin on the physical slot, and that
-# any xmin is < the catalog_xmin
-$node_master->poll_query_until('postgres', q[
-	SELECT catalog_xmin IS NOT NULL
-	FROM pg_replication_slots
-	WHERE slot_name = 'phys_slot'
-	]);
-my $phys_slot = $node_master->slot('phys_slot');
-isnt($phys_slot->{'xmin'}, '',
-	'xmin assigned on physical slot of master');
-isnt($phys_slot->{'catalog_xmin'}, '',
-	'catalog_xmin assigned on physical slot of master');
-# Ignore wrap-around here, we're on a new cluster:
-cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
-	   'xmin on physical slot must not be lower than catalog_xmin');
-
-$node_master->safe_psql('postgres', 'CHECKPOINT');
-
-# Boom, crash
-$node_master->stop('immediate');
-
-$node_replica->promote;
-print "waiting for replica to come up\n";
-$node_replica->poll_query_until('postgres',
-	"SELECT NOT pg_is_in_recovery();");
-
-$node_replica->safe_psql('postgres',
-	"INSERT INTO decoding(blah) VALUES ('after failover');");
-
-# Shouldn't be able to read from slot created after base backup
-($ret, $stdout, $stderr) = $node_replica->psql('postgres',
-"SELECT data FROM pg_logical_slot_peek_changes('after_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');"
-);
-is($ret, 3, 'replaying from after_basebackup slot fails');
-like(
-	$stderr,
-	qr/replication slot "after_basebackup" does not exist/,
-	'after_basebackup slot missing');
-
-# Should be able to read from slot created before base backup
-($ret, $stdout, $stderr) = $node_replica->psql(
-	'postgres',
-"SELECT data FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');",
-	timeout => 30);
-is($ret, 0, 'replay from slot before_basebackup succeeds');
-
-my $final_expected_output_bb = q(BEGIN
-table public.decoding: INSERT: blah[text]:'beforebb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'afterbb'
-COMMIT
-BEGIN
-table public.decoding: INSERT: blah[text]:'after failover'
-COMMIT);
-is($stdout, $final_expected_output_bb, 'decoded expected data from slot before_basebackup');
-is($stderr, '', 'replay from slot before_basebackup produces no stderr');
-
-# So far we've peeked the slots, so when we fetch the same info over
-# pg_recvlogical we should get complete results. First, find out the commit lsn
-# of the last transaction. There's no max(pg_lsn), so:
-
-my $endpos = $node_replica->safe_psql('postgres', "SELECT location FROM pg_logical_slot_peek_changes('before_basebackup', NULL, NULL) ORDER BY location DESC LIMIT 1;");
-
-# now use the walsender protocol to peek the slot changes and make sure we see
-# the same results.
-
-$stdout = $node_replica->pg_recvlogical_upto('postgres', 'before_basebackup',
-	$endpos, 30, 'include-xids' => '0', 'skip-empty-xacts' => '1');
-
-# walsender likes to add a newline
-chomp($stdout);
-is($stdout, $final_expected_output_bb, 'got same output from walsender via pg_recvlogical on before_basebackup');
-
-$node_replica->teardown_node();
diff --git a/src/test/recovery/t/011_crash_recovery.pl b/src/test/recovery/t/011_crash_recovery.pl
deleted file mode 100644
index 3c3718e..0000000
--- a/src/test/recovery/t/011_crash_recovery.pl
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Tests relating to PostgreSQL crash recovery and redo
-#
-use strict;
-use warnings;
-use PostgresNode;
-use TestLib;
-use Test::More tests => 3;
-
-my $node = get_new_node('master');
-$node->init(allows_streaming => 1);
-$node->start;
-
-my ($stdin, $stdout, $stderr) = ('', '', '');
-
-# Ensure that txid_status reports 'aborted' for xacts
-# that were in-progress during crash. To do that, we need
-# an xact to be in-progress when we crash and we need to know
-# its xid.
-my $tx = IPC::Run::start(
-	['psql', '-X', '-qAt', '-v', 'ON_ERROR_STOP=1', '-f', '-', '-d', $node->connstr('postgres')],
-	'<', \$stdin, '>', \$stdout, '2>', \$stderr);
-$stdin .= q[
-BEGIN;
-CREATE TABLE mine(x integer);
-SELECT txid_current();
-];
-$tx->pump until $stdout =~ /[[:digit:]]+[\r\n]$/;
-
-# Status should be in-progress
-my $xid = $stdout;
-chomp($xid);
-
-is($node->safe_psql('postgres', qq[SELECT txid_status('$xid');]), 'in progress', 'own xid is in-progres');
-
-# Crash and restart the postmaster
-$node->stop('immediate');
-$node->start;
-
-# Make sure we really got a new xid
-cmp_ok($node->safe_psql('postgres', 'SELECT txid_current()'), '>', $xid,
-	'new xid after restart is greater');
-# and make sure we show the in-progress xact as aborted
-is($node->safe_psql('postgres', qq[SELECT txid_status('$xid');]), 'aborted', 'xid is aborted after crash');
-
-$tx->kill_kill;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to