commit 48edc162d325e9240b7fdc8db472999e67c4a431
Author: Alexander Korotkov <akorotkov@postgresql.org>
Date:   Sun Sep 1 10:36:59 2019 +0300

    Write visibility map during CLUSTER/VACUUM FULL
    
    Reported-by:
    Bug:
    Discussion:
    Author:
    Reviewed-by:
    Tested-by:
    Backpatch-through:

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e9544822bf9..4446c6de9ba 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8935,7 +8935,7 @@ heap2_redo(XLogReaderState *record)
  * to be done here.)
  */
 void
-heap_sync(Relation rel)
+heap_sync(Relation rel, bool sync_vm)
 {
 	/* non-WAL-logged tables never need fsync */
 	if (!RelationNeedsWAL(rel))
@@ -8945,6 +8945,8 @@ heap_sync(Relation rel)
 	FlushRelationBuffers(rel);
 	/* FlushRelationBuffers will have opened rd_smgr */
 	smgrimmedsync(rel->rd_smgr, MAIN_FORKNUM);
+	if (sync_vm)
+		smgrimmedsync(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
 
 	/* FSM is not critical, don't bother syncing it */
 
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 2dd8821facd..0429919ec86 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -566,7 +566,7 @@ heapam_finish_bulk_insert(Relation relation, int options)
 	 * indexes since those use WAL anyway / don't go through tableam)
 	 */
 	if (options & HEAP_INSERT_SKIP_WAL)
-		heap_sync(relation);
+		heap_sync(relation, false);
 }
 
 
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 0172a139576..45a85307e90 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -112,6 +112,7 @@
 #include "access/heaptoast.h"
 #include "access/rewriteheap.h"
 #include "access/transam.h"
+#include "access/visibilitymap.h"
 #include "access/xact.h"
 #include "access/xloginsert.h"
 
@@ -161,6 +162,11 @@ typedef struct RewriteStateData
 	HTAB	   *rs_old_new_tid_map; /* unmatched B tuples */
 	HTAB	   *rs_logical_mappings;	/* logical remapping files */
 	uint32		rs_num_rewrite_mappings;	/* # in memory mappings */
+	Page		rs_vm_buffer;	/* visibility map page */
+	BlockNumber rs_vm_blockno;	/* block number of visibility page */
+	BlockNumber rs_vm_buffer_valid;	/* T if any bits are set */
+	bool		rs_all_visible;	/* all visible flag for rs_buffer */
+	bool		rs_all_frozen;	/* all frozen flag for rs_buffer */
 }			RewriteStateData;
 
 /*
@@ -222,6 +228,9 @@ typedef struct RewriteMappingDataEntry
 
 
 /* prototypes for internal functions */
+static void rewrite_flush_vm_page(RewriteState state);
+static void rewrite_set_vm_flags(RewriteState state);
+static void rewrite_update_vm_flags(RewriteState state, HeapTuple tuple);
 static void raw_heap_insert(RewriteState state, HeapTuple tup);
 
 /* internal logical remapping prototypes */
@@ -276,6 +285,11 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	state->rs_freeze_xid = freeze_xid;
 	state->rs_cutoff_multi = cutoff_multi;
 	state->rs_cxt = rw_cxt;
+	state->rs_vm_buffer = (Page) palloc(BLCKSZ);
+	state->rs_vm_blockno = HEAPBLK_TO_MAPBLOCK(state->rs_blockno);
+	state->rs_vm_buffer_valid = false;
+	state->rs_all_visible = true;
+	state->rs_all_frozen = true;
 
 	/* Initialize hash tables used to track update chains */
 	memset(&hash_ctl, 0, sizeof(hash_ctl));
@@ -297,6 +311,10 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 					&hash_ctl,
 					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	RelationOpenSmgr(state->rs_new_rel);
+	if (!smgrexists(state->rs_new_rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+		smgrcreate(state->rs_new_rel->rd_smgr, VISIBILITYMAP_FORKNUM, false);
+
 	MemoryContextSwitchTo(old_cxt);
 
 	logical_begin_heap_rewrite(state);
@@ -330,6 +348,9 @@ end_heap_rewrite(RewriteState state)
 	/* Write the last page, if any */
 	if (state->rs_buffer_valid)
 	{
+		if (state->rs_all_visible)
+			PageSetAllVisible(state->rs_buffer);
+
 		if (state->rs_use_wal)
 			log_newpage(&state->rs_new_rel->rd_node,
 						MAIN_FORKNUM,
@@ -342,8 +363,14 @@ end_heap_rewrite(RewriteState state)
 
 		smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM, state->rs_blockno,
 				   (char *) state->rs_buffer, true);
+
+		rewrite_set_vm_flags(state);
 	}
 
+	/* Write the last VM page too */
+	if (state->rs_vm_buffer_valid)
+		rewrite_flush_vm_page(state);
+
 	/*
 	 * If the rel is WAL-logged, must fsync before commit.  We use heap_sync
 	 * to ensure that the toast table gets fsync'd too.
@@ -356,7 +383,7 @@ end_heap_rewrite(RewriteState state)
 	 * wrote before the checkpoint.
 	 */
 	if (RelationNeedsWAL(state->rs_new_rel))
-		heap_sync(state->rs_new_rel);
+		heap_sync(state->rs_new_rel, true);
 
 	logical_end_heap_rewrite(state);
 
@@ -364,6 +391,98 @@ end_heap_rewrite(RewriteState state)
 	MemoryContextDelete(state->rs_cxt);
 }
 
+/* Write contents of VM page */
+static void
+rewrite_flush_vm_page(RewriteState state)
+{
+	Assert(state->rs_vm_buffer_valid);
+
+	if (state->rs_use_wal)
+		log_newpage(&state->rs_new_rel->rd_node,
+					VISIBILITYMAP_FORKNUM,
+					state->rs_vm_blockno,
+					state->rs_vm_buffer,
+					true);
+	RelationOpenSmgr(state->rs_new_rel);
+
+	PageSetChecksumInplace(state->rs_vm_buffer, state->rs_vm_blockno);
+
+	smgrextend(state->rs_new_rel->rd_smgr, VISIBILITYMAP_FORKNUM,
+			   state->rs_vm_blockno, (char *) state->rs_vm_buffer, true);
+
+	state->rs_vm_buffer_valid = false;
+}
+
+/* Set VM flags to the VM page */
+static void
+rewrite_set_vm_flags(RewriteState state)
+{
+	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(state->rs_blockno);
+	uint32		mapByte = HEAPBLK_TO_MAPBYTE(state->rs_blockno);
+	uint8		mapOffset = HEAPBLK_TO_OFFSET(state->rs_blockno);
+	char	   *map;
+	uint8		flags;
+
+	if (mapBlock != state->rs_vm_blockno && state->rs_vm_buffer_valid)
+		rewrite_flush_vm_page(state);
+
+	if (!state->rs_vm_buffer_valid)
+	{
+		PageInit(state->rs_vm_buffer, BLCKSZ, 0);
+		state->rs_vm_blockno = mapBlock;
+		state->rs_vm_buffer_valid = true;
+	}
+
+	flags = (state->rs_all_visible ? VISIBILITYMAP_ALL_VISIBLE : 0) |
+			(state->rs_all_frozen ? VISIBILITYMAP_ALL_FROZEN : 0);
+
+	map = PageGetContents(state->rs_vm_buffer);
+	map[mapByte] |= (flags << mapOffset);
+}
+
+/*
+ * Update rs_all_visible and rs_all_frozen flags according to the tuple.  We
+ * use simplified check assuming that HeapTupleSatisfiesVacuum() should already
+ * set tuple hint bits.
+ */
+static void
+rewrite_update_vm_flags(RewriteState state, HeapTuple tuple)
+{
+	TransactionId	xmin;
+
+	if (!state->rs_all_visible)
+		return;
+
+	if (!HeapTupleHeaderXminCommitted(tuple->t_data))
+	{
+		state->rs_all_visible = false;
+		state->rs_all_frozen = false;
+		return;
+	}
+
+	xmin = HeapTupleHeaderGetXmin(tuple->t_data);
+	if (!TransactionIdPrecedes(xmin, state->rs_oldest_xmin))
+	{
+		state->rs_all_visible = false;
+		state->rs_all_frozen = false;
+		return;
+	}
+
+	if (!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID) &&
+		!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask))
+	{
+		state->rs_all_visible = false;
+		state->rs_all_frozen = false;
+		return;
+	}
+
+	if (!state->rs_all_frozen)
+		return;
+
+	if (heap_tuple_needs_eventual_freeze(tuple->t_data))
+		state->rs_all_frozen = false;
+}
+
 /*
  * Add a tuple to the new heap.
  *
@@ -490,6 +609,7 @@ rewrite_heap_tuple(RewriteState state,
 
 		/* Insert the tuple and find out where it's put in new_heap */
 		raw_heap_insert(state, new_tuple);
+		rewrite_update_vm_flags(state, new_tuple);
 		new_tid = new_tuple->t_self;
 
 		logical_rewrite_heap_tuple(state, old_tid, new_tuple);
@@ -694,6 +814,9 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 		{
 			/* Doesn't fit, so write out the existing page */
 
+			if (state->rs_all_visible)
+				PageSetAllVisible(page);
+
 			/* XLOG stuff */
 			if (state->rs_use_wal)
 				log_newpage(&state->rs_new_rel->rd_node,
@@ -715,6 +838,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
 			smgrextend(state->rs_new_rel->rd_smgr, MAIN_FORKNUM,
 					   state->rs_blockno, (char *) page, true);
 
+			rewrite_set_vm_flags(state);
+
 			state->rs_blockno++;
 			state->rs_buffer_valid = false;
 		}
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index a08922b0798..657cac31457 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -98,24 +98,6 @@
 
 /*#define TRACE_VISIBILITYMAP */
 
-/*
- * Size of the bitmap on each visibility map page, in bytes. There's no
- * extra headers, so the whole page minus the standard page header is
- * used for the bitmap.
- */
-#define MAPSIZE (BLCKSZ - MAXALIGN(SizeOfPageHeaderData))
-
-/* Number of heap blocks we can represent in one byte */
-#define HEAPBLOCKS_PER_BYTE (BITS_PER_BYTE / BITS_PER_HEAPBLOCK)
-
-/* Number of heap blocks we can represent in one visibility map page. */
-#define HEAPBLOCKS_PER_PAGE (MAPSIZE * HEAPBLOCKS_PER_BYTE)
-
-/* Mapping from heap block number to the right bit in the visibility map */
-#define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
-#define HEAPBLK_TO_MAPBYTE(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE)
-#define HEAPBLK_TO_OFFSET(x) (((x) % HEAPBLOCKS_PER_BYTE) * BITS_PER_HEAPBLOCK)
-
 /* Masks for counting subsets of bits in the visibility map. */
 #define VISIBLE_MASK64	UINT64CONST(0x5555555555555555) /* The lower bit of each
 														 * bit pair */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 858bcb6bc96..bb0c610d676 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -166,7 +166,7 @@ extern void simple_heap_delete(Relation relation, ItemPointer tid);
 extern void simple_heap_update(Relation relation, ItemPointer otid,
 							   HeapTuple tup);
 
-extern void heap_sync(Relation relation);
+extern void heap_sync(Relation relation, bool sync_vm);
 
 extern TransactionId heap_compute_xid_horizon_for_tuples(Relation rel,
 														 ItemPointerData *items,
diff --git a/src/include/access/visibilitymap.h b/src/include/access/visibilitymap.h
index 2d8804351ac..524abb4f2ca 100644
--- a/src/include/access/visibilitymap.h
+++ b/src/include/access/visibilitymap.h
@@ -34,6 +34,24 @@
 #define VM_ALL_FROZEN(r, b, v) \
 	((visibilitymap_get_status((r), (b), (v)) & VISIBILITYMAP_ALL_FROZEN) != 0)
 
+/*
+ * Size of the bitmap on each visibility map page, in bytes. There's no
+ * extra headers, so the whole page minus the standard page header is
+ * used for the bitmap.
+ */
+#define MAPSIZE (BLCKSZ - MAXALIGN(SizeOfPageHeaderData))
+
+/* Number of heap blocks we can represent in one byte */
+#define HEAPBLOCKS_PER_BYTE (BITS_PER_BYTE / BITS_PER_HEAPBLOCK)
+
+/* Number of heap blocks we can represent in one visibility map page. */
+#define HEAPBLOCKS_PER_PAGE (MAPSIZE * HEAPBLOCKS_PER_BYTE)
+
+/* Mapping from heap block number to the right bit in the visibility map */
+#define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
+#define HEAPBLK_TO_MAPBYTE(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE)
+#define HEAPBLK_TO_OFFSET(x) (((x) % HEAPBLOCKS_PER_BYTE) * BITS_PER_HEAPBLOCK)
+
 extern bool visibilitymap_clear(Relation rel, BlockNumber heapBlk,
 								Buffer vmbuf, uint8 flags);
 extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
