Heikki Linnakangas wrote:
Here's an updated version, ...

And here it is, for real...

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
*** src/backend/access/heap/Makefile
--- src/backend/access/heap/Makefile
***************
*** 12,17 **** subdir = src/backend/access/heap
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 12,17 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o
  
  include $(top_srcdir)/src/backend/common.mk
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 47,52 ****
--- 47,53 ----
  #include "access/transam.h"
  #include "access/tuptoaster.h"
  #include "access/valid.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
***************
*** 195,200 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
--- 196,202 ----
  	int			ntup;
  	OffsetNumber lineoff;
  	ItemId		lpp;
+ 	bool		all_visible;
  
  	Assert(page < scan->rs_nblocks);
  
***************
*** 233,252 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
  	lines = PageGetMaxOffsetNumber(dp);
  	ntup = 0;
  
  	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
  		 lineoff <= lines;
  		 lineoff++, lpp++)
  	{
  		if (ItemIdIsNormal(lpp))
  		{
- 			HeapTupleData loctup;
  			bool		valid;
  
! 			loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
! 			loctup.t_len = ItemIdGetLength(lpp);
! 			ItemPointerSet(&(loctup.t_self), page, lineoff);
  
! 			valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
  			if (valid)
  				scan->rs_vistuples[ntup++] = lineoff;
  		}
--- 235,266 ----
  	lines = PageGetMaxOffsetNumber(dp);
  	ntup = 0;
  
+ 	/*
+ 	 * If the all-visible flag indicates that all tuples on the page are
+ 	 * visible to everyone, we can skip the per-tuple visibility tests.
+ 	 */
+ 	all_visible = PageIsAllVisible(dp);
+ 
  	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
  		 lineoff <= lines;
  		 lineoff++, lpp++)
  	{
  		if (ItemIdIsNormal(lpp))
  		{
  			bool		valid;
  
! 			if (all_visible)
! 				valid = true;
! 			else
! 			{
! 				HeapTupleData loctup;
! 
! 				loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
! 				loctup.t_len = ItemIdGetLength(lpp);
! 				ItemPointerSet(&(loctup.t_self), page, lineoff);
  
! 				valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
! 			}
  			if (valid)
  				scan->rs_vistuples[ntup++] = lineoff;
  		}
***************
*** 1860,1865 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1874,1880 ----
  	TransactionId xid = GetCurrentTransactionId();
  	HeapTuple	heaptup;
  	Buffer		buffer;
+ 	bool		all_visible_cleared = false;
  
  	if (relation->rd_rel->relhasoids)
  	{
***************
*** 1920,1925 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1935,1946 ----
  
  	RelationPutHeapTuple(relation, buffer, heaptup);
  
+ 	if (PageIsAllVisible(BufferGetPage(buffer)))
+ 	{
+ 		all_visible_cleared = true;
+ 		PageClearAllVisible(BufferGetPage(buffer));
+ 	}
+ 
  	/*
  	 * XXX Should we set PageSetPrunable on this page ?
  	 *
***************
*** 1943,1948 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1964,1970 ----
  		Page		page = BufferGetPage(buffer);
  		uint8		info = XLOG_HEAP_INSERT;
  
+ 		xlrec.all_visible_cleared = all_visible_cleared;
  		xlrec.target.node = relation->rd_node;
  		xlrec.target.tid = heaptup->t_self;
  		rdata[0].data = (char *) &xlrec;
***************
*** 1994,1999 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 2016,2026 ----
  
  	UnlockReleaseBuffer(buffer);
  
+ 	/* Clear the bit in the visibility map if necessary */
+ 	if (all_visible_cleared)
+ 		visibilitymap_clear(relation, 
+ 							ItemPointerGetBlockNumber(&(heaptup->t_self)));
+ 
  	/*
  	 * If tuple is cachable, mark it for invalidation from the caches in case
  	 * we abort.  Note it is OK to do this after releasing the buffer, because
***************
*** 2070,2075 **** heap_delete(Relation relation, ItemPointer tid,
--- 2097,2103 ----
  	Buffer		buffer;
  	bool		have_tuple_lock = false;
  	bool		iscombo;
+ 	bool		all_visible_cleared = false;
  
  	Assert(ItemPointerIsValid(tid));
  
***************
*** 2216,2221 **** l1:
--- 2244,2255 ----
  	 */
  	PageSetPrunable(page, xid);
  
+ 	if (PageIsAllVisible(page))
+ 	{
+ 		all_visible_cleared = true;
+ 		PageClearAllVisible(page);
+ 	}
+ 
  	/* store transaction information of xact deleting the tuple */
  	tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
  							   HEAP_XMAX_INVALID |
***************
*** 2237,2242 **** l1:
--- 2271,2277 ----
  		XLogRecPtr	recptr;
  		XLogRecData rdata[2];
  
+ 		xlrec.all_visible_cleared = all_visible_cleared;
  		xlrec.target.node = relation->rd_node;
  		xlrec.target.tid = tp.t_self;
  		rdata[0].data = (char *) &xlrec;
***************
*** 2281,2286 **** l1:
--- 2316,2325 ----
  	 */
  	CacheInvalidateHeapTuple(relation, &tp);
  
+ 	/* Clear the bit in the visibility map if necessary */
+ 	if (all_visible_cleared)
+ 		visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+ 
  	/* Now we can release the buffer */
  	ReleaseBuffer(buffer);
  
***************
*** 2388,2393 **** heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
--- 2427,2434 ----
  	bool		have_tuple_lock = false;
  	bool		iscombo;
  	bool		use_hot_update = false;
+ 	bool		all_visible_cleared = false;
+ 	bool		all_visible_cleared_new = false;
  
  	Assert(ItemPointerIsValid(otid));
  
***************
*** 2763,2768 **** l2:
--- 2804,2815 ----
  		MarkBufferDirty(newbuf);
  	MarkBufferDirty(buffer);
  
+ 	/*
+ 	 * Note: we mustn't clear PD_ALL_VISIBLE flags before writing the WAL
+ 	 * record, because log_heap_update looks at those flags to set the
+ 	 * corresponding flags in the WAL record.
+ 	 */
+ 
  	/* XLOG stuff */
  	if (!relation->rd_istemp)
  	{
***************
*** 2778,2783 **** l2:
--- 2825,2842 ----
  		PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
  	}
  
+ 	/* Clear PD_ALL_VISIBLE flags */
+ 	if (PageIsAllVisible(BufferGetPage(buffer)))
+ 	{
+ 		all_visible_cleared = true;
+ 		PageClearAllVisible(BufferGetPage(buffer));
+ 	}
+ 	if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
+ 	{
+ 		all_visible_cleared_new = true;
+ 		PageClearAllVisible(BufferGetPage(newbuf));
+ 	}
+ 
  	END_CRIT_SECTION();
  
  	if (newbuf != buffer)
***************
*** 2791,2796 **** l2:
--- 2850,2861 ----
  	 */
  	CacheInvalidateHeapTuple(relation, &oldtup);
  
+ 	/* Clear bits in visibility map */
+ 	if (all_visible_cleared)
+ 		visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+ 	if (all_visible_cleared_new)
+ 		visibilitymap_clear(relation, BufferGetBlockNumber(newbuf));
+ 
  	/* Now we can release the buffer(s) */
  	if (newbuf != buffer)
  		ReleaseBuffer(newbuf);
***************
*** 3412,3417 **** l3:
--- 3477,3487 ----
  	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
  
  	/*
+ 	 * Don't update the visibility map here. Locking a tuple doesn't
+ 	 * change visibility info.
+ 	 */
+ 
+ 	/*
  	 * Now that we have successfully marked the tuple as locked, we can
  	 * release the lmgr tuple lock, if we had it.
  	 */
***************
*** 3916,3922 **** log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
--- 3986,3994 ----
  
  	xlrec.target.node = reln->rd_node;
  	xlrec.target.tid = from;
+ 	xlrec.all_visible_cleared = PageIsAllVisible(BufferGetPage(oldbuf));
  	xlrec.newtid = newtup->t_self;
+ 	xlrec.new_all_visible_cleared = PageIsAllVisible(BufferGetPage(newbuf));
  
  	rdata[0].data = (char *) &xlrec;
  	rdata[0].len = SizeOfHeapUpdate;
***************
*** 4185,4197 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
  	OffsetNumber offnum;
  	ItemId		lp = NULL;
  	HeapTupleHeader htup;
  
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	buffer = XLogReadBuffer(xlrec->target.node,
! 							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
! 							false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
--- 4257,4281 ----
  	OffsetNumber offnum;
  	ItemId		lp = NULL;
  	HeapTupleHeader htup;
+ 	BlockNumber	blkno;
+ 
+ 	blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+ 
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, blkno);
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
  
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
! 	buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
  	if (!BufferIsValid(buffer))
  		return;
  	page = (Page) BufferGetPage(buffer);
***************
*** 4223,4228 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
--- 4307,4315 ----
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	/* Make sure there is no forward chain link in t_ctid */
  	htup->t_ctid = xlrec->target.tid;
  	PageSetLSN(page, lsn);
***************
*** 4249,4259 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
  	Size		freespace;
  	BlockNumber	blkno;
  
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
- 	blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
- 
  	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
  	{
  		buffer = XLogReadBuffer(xlrec->target.node, blkno, true);
--- 4336,4357 ----
  	Size		freespace;
  	BlockNumber	blkno;
  
+ 	blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+ 
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, blkno);
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
  	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
  	{
  		buffer = XLogReadBuffer(xlrec->target.node, blkno, true);
***************
*** 4307,4312 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4405,4414 ----
  
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
+ 
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	MarkBufferDirty(buffer);
  	UnlockReleaseBuffer(buffer);
  
***************
*** 4347,4352 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4449,4466 ----
  	uint32		newlen;
  	Size		freespace;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln,
+ 							ItemPointerGetBlockNumber(&xlrec->target.tid));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  	{
  		if (samepage)
***************
*** 4411,4416 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4525,4533 ----
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	/*
  	 * this test is ugly, but necessary to avoid thinking that insert change
  	 * is already applied
***************
*** 4426,4431 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4543,4559 ----
  
  newt:;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->new_all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_2)
  		return;
  
***************
*** 4504,4509 **** newsame:;
--- 4632,4640 ----
  	if (offnum == InvalidOffsetNumber)
  		elog(PANIC, "heap_update_redo: failed to add tuple");
  
+ 	if (xlrec->new_all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */
  
  	PageSetLSN(page, lsn);
*** /dev/null
--- src/backend/access/heap/visibilitymap.c
***************
*** 0 ****
--- 1,478 ----
+ /*-------------------------------------------------------------------------
+  *
+  * visibilitymap.c
+  *	  bitmap for tracking visibility of heap tuples
+  *
+  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  * INTERFACE ROUTINES
+  *		visibilitymap_clear	- clear a bit in the visibility map
+  *		visibilitymap_pin	- pin a map page for setting a bit
+  *		visibilitymap_set	- set a bit in a previously pinned page
+  *		visibilitymap_test	- test if a bit is set
+  *
+  * NOTES
+  *
+  * The visibility map is a bitmap with one bit per heap page. A set bit means
+  * that all tuples on the page are visible to all transactions, and doesn't
+  * therefore need to be vacuumed. The map is conservative in the sense that we
+  * make sure that whenever a bit is set, we know the condition is true, but if
+  * a bit is not set, it might or might not be.
+  *
+  * There's no explicit WAL logging in the functions in this file. The callers
+  * must make sure that whenever a bit is cleared, the bit is cleared on WAL
+  * replay of the updating operation as well. Setting bits during recovery
+  * isn't necessary for correctness.
+  *
+  * Currently, the visibility map is only used as a hint, to speed up VACUUM.
+  * A corrupted visibility map won't cause data corruption, although it can
+  * make VACUUM skip pages that need vacuuming, until the next anti-wraparound
+  * vacuum. The visibility map is not used for anti-wraparound vacuums, because
+  * an anti-wraparound vacuum needs to freeze tuples and observe the latest xid
+  * present in the table, also on pages that don't have any dead tuples.
+  *
+  * Although the visibility map is just a hint at the moment, the PD_ALL_VISIBLE
+  * flag on heap pages *must* be correct.
+  *
+  * LOCKING
+  *
+  * In heapam.c, whenever a page is modified so that not all tuples on the
+  * page are visible to everyone anymore, the corresponding bit in the
+  * visibility map is cleared. The bit in the visibility map is cleared
+  * after releasing the lock on the heap page, to avoid holding the lock
+  * over possible I/O to read in the visibility map page.
+  *
+  * To set a bit, you need to hold a lock on the heap page. That prevents
+  * the race condition where VACUUM sees that all tuples on the page are
+  * visible to everyone, but another backend modifies the page before VACUUM
+  * sets the bit in the visibility map.
+  *
+  * When a bit is set, the LSN of the visibility map page is updated to make
+  * sure that the visibility map update doesn't get written to disk before the
+  * WAL record of the changes that made it possible to set the bit is flushed.
+  * But when a bit is cleared, we don't have to do that because it's always OK
+  * to clear a bit in the map from correctness point of view.
+  *
+  * TODO
+  *
+  * It would be nice to use the visibility map to skip visibility checkes in
+  * index scans.
+  *
+  * Currently, the visibility map is not 100% correct all the time.
+  * During updates, the bit in the visibility map is cleared after releasing
+  * the lock on the heap page. During the window after releasing the lock
+  * and clearing the bit in the visibility map, the bit in the visibility map
+  * is set, but the new insertion or deletion is not yet visible to other
+  * backends.
+  *
+  * That might actually be OK for the index scans, though. The newly inserted
+  * tuple wouldn't have an index pointer yet, so all tuples reachable from an
+  * index would still be visible to all other backends, and deletions wouldn't
+  * be visible to other backends yet.
+  *
+  * There's another hole in the way the PD_ALL_VISIBLE flag is set. When
+  * vacuum observes that all tuples are visible to all, it sets the flag on
+  * the heap page, and also sets the bit in the visibility map. If we then
+  * crash, and only the visibility map page was flushed to disk, we'll have
+  * a bit set in the visibility map, but the corresponding flag on the heap
+  * page is not set. If the heap page is then updated, the updater won't
+  * know to clear the bit in the visibility map.
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/visibilitymap.h"
+ #include "storage/bufmgr.h"
+ #include "storage/bufpage.h"
+ #include "storage/lmgr.h"
+ #include "storage/smgr.h"
+ #include "utils/inval.h"
+ 
+ /*#define TRACE_VISIBILITYMAP */
+ 
+ /*
+  * Size of the bitmap on each visibility map page, in bytes. There's no
+  * extra headers, so the whole page minus except for the standard page header
+  * is used for the bitmap.
+  */
+ #define MAPSIZE (BLCKSZ - SizeOfPageHeaderData)
+ 
+ /* Number of bits allocated for each heap block. */
+ #define BITS_PER_HEAPBLOCK 1
+ 
+ /* Number of heap blocks we can represent in one byte. */
+ #define HEAPBLOCKS_PER_BYTE 8
+ 
+ /* Number of heap blocks we can represent in one visibility map page. */
+ #define HEAPBLOCKS_PER_PAGE (MAPSIZE * HEAPBLOCKS_PER_BYTE)
+ 
+ /* Mapping from heap block number to the right bit in the visibility map */
+ #define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
+ #define HEAPBLK_TO_MAPBYTE(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE)
+ #define HEAPBLK_TO_MAPBIT(x) ((x) % HEAPBLOCKS_PER_BYTE)
+ 
+ /* prototypes for internal routines */
+ static Buffer vm_readbuf(Relation rel, BlockNumber blkno, bool extend);
+ static void vm_extend(Relation rel, BlockNumber nvmblocks);
+ 
+ 
+ /*
+  *	visibilitymap_clear - clear a bit in visibility map
+  *
+  * Clear a bit in the visibility map, marking that not all tuples are
+  * visible to all transactions anymore.
+  */
+ void
+ visibilitymap_clear(Relation rel, BlockNumber heapBlk)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	int			mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	int			mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	uint8		mask = 1 << mapBit;
+ 	Buffer		mapBuffer;
+ 	char	   *map;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(DEBUG1, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	mapBuffer = vm_readbuf(rel, mapBlock, false);
+ 	if (!BufferIsValid(mapBuffer))
+ 		return; /* nothing to do */
+ 
+ 	LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ 	map = PageGetContents(BufferGetPage(mapBuffer));
+ 
+ 	if (map[mapByte] & mask)
+ 	{
+ 		map[mapByte] &= ~mask;
+ 
+ 		MarkBufferDirty(mapBuffer);
+ 	}
+ 
+ 	UnlockReleaseBuffer(mapBuffer);
+ }
+ 
+ /*
+  *	visibilitymap_pin - pin a map page for setting a bit
+  *
+  * Setting a bit in the visibility map is a two-phase operation. First, call
+  * visibilitymap_pin, to pin the visibility map page containing the bit for
+  * the heap page. Because that can require I/O to read the map page, you
+  * shouldn't hold a lock on the heap page while doing that. Then, call
+  * visibilitymap_set to actually set the bit.
+  *
+  * On entry, *buf should be InvalidBuffer or a valid buffer returned by
+  * an earlier call to visibilitymap_pin or visibilitymap_test on the same
+  * relation. On return, *buf is a valid buffer with the map page containing
+  * the the bit for heapBlk.
+  *
+  * If the page doesn't exist in the map file yet, it is extended.
+  */
+ void
+ visibilitymap_pin(Relation rel, BlockNumber heapBlk, Buffer *buf)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 
+ 	/* Reuse the old pinned buffer if possible */
+ 	if (BufferIsValid(*buf))
+ 	{
+ 		if (BufferGetBlockNumber(*buf) == mapBlock)
+ 			return;
+ 
+ 		ReleaseBuffer(*buf);
+ 	}
+ 	*buf = vm_readbuf(rel, mapBlock, true);
+ }
+ 
+ /*
+  *	visibilitymap_set - set a bit on a previously pinned page
+  *
+  * recptr is the LSN of the heap page. The LSN of the visibility map page is
+  * advanced to that, to make sure that the visibility map doesn't get flushed
+  * to disk before the update to the heap page that made all tuples visible.
+  *
+  * This is an opportunistic function. It does nothing, unless *buf
+  * contains the bit for heapBlk. Call visibilitymap_pin first to pin
+  * the right map page. This function doesn't do any I/O.
+  */
+ void
+ visibilitymap_set(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
+ 				  Buffer *buf)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	Page		page;
+ 	char	   *map;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(DEBUG1, "vm_set %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	/* Check that we have the right page pinned */
+ 	if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != mapBlock)
+ 		return;
+ 
+ 	page = BufferGetPage(*buf);
+ 	map = PageGetContents(page);
+ 	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 	if (!(map[mapByte] & (1 << mapBit)))
+ 	{
+ 		map[mapByte] |= (1 << mapBit);
+ 
+ 		if (XLByteLT(PageGetLSN(page), recptr))
+ 			PageSetLSN(page, recptr);
+ 		PageSetTLI(page, ThisTimeLineID);
+ 		MarkBufferDirty(*buf);
+ 	}
+ 
+ 	LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
+ }
+ 
+ /*
+  *	visibilitymap_test - test if a bit is set
+  *
+  * Are all tuples on heapBlk visible to all, according to the visibility map?
+  *
+  * On entry, *buf should be InvalidBuffer or a valid buffer returned by an
+  * earlier call to visibilitymap_pin or visibilitymap_test on the same
+  * relation. On return, *buf is a valid buffer with the map page containing
+  * the the bit for heapBlk, or InvalidBuffer. The caller is responsible for
+  * releasing *buf after it's done testing and setting bits.
+  */
+ bool
+ visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *buf)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	bool		result;
+ 	char	   *map;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(DEBUG1, "vm_test %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	/* Reuse the old pinned buffer if possible */
+ 	if (BufferIsValid(*buf))
+ 	{
+ 		if (BufferGetBlockNumber(*buf) != mapBlock)
+ 		{
+ 			ReleaseBuffer(*buf);
+ 			*buf = InvalidBuffer;
+ 		}
+ 	}
+ 
+ 	if (!BufferIsValid(*buf))
+ 	{
+ 		*buf = vm_readbuf(rel, mapBlock, false);
+ 		if (!BufferIsValid(*buf))
+ 			return false;
+ 	}
+ 
+ 	map = PageGetContents(BufferGetPage(*buf));
+ 
+ 	/*
+ 	 * We don't need to lock the page, as we're only looking at a single bit.
+ 	 */
+ 	result = (map[mapByte] & (1 << mapBit)) ? true : false;
+ 
+ 	return result;
+ }
+ 
+ /*
+  *	visibilitymap_test - truncate the visibility map
+  */
+ void
+ visibilitymap_truncate(Relation rel, BlockNumber nheapblocks)
+ {
+ 	BlockNumber newnblocks;
+ 	/* last remaining block, byte, and bit */
+ 	BlockNumber truncBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks);
+ 	uint32		truncByte  = HEAPBLK_TO_MAPBYTE(nheapblocks);
+ 	uint8		truncBit   = HEAPBLK_TO_MAPBIT(nheapblocks);
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(DEBUG1, "vm_truncate %s %d", RelationGetRelationName(rel), nheapblocks);
+ #endif
+ 
+ 	/*
+ 	 * If no visibility map has been created yet for this relation, there's
+ 	 * nothing to truncate.
+ 	 */
+ 	if (!smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ 		return;
+ 
+ 	/*
+ 	 * Unless the new size is exactly at a visibility map page boundary, the
+ 	 * tail bits in the last remaining map page, representing truncated heap
+ 	 * blocks, need to be cleared. This is not only tidy, but also necessary
+ 	 * because we don't get a chance to clear the bits if the heap is
+ 	 * extended again.
+ 	 */
+ 	if (truncByte != 0 || truncBit != 0)
+ 	{
+ 		Buffer mapBuffer;
+ 		Page page;
+ 		char *map;
+ 
+ 		newnblocks = truncBlock + 1;
+ 
+ 		mapBuffer = vm_readbuf(rel, truncBlock, false);
+ 		if (!BufferIsValid(mapBuffer))
+ 		{
+ 			/* nothing to do, the file was already smaller */
+ 			return;
+ 		}
+ 
+ 		page = BufferGetPage(mapBuffer);
+ 		map = PageGetContents(page);
+ 
+ 		LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 		/* Clear out the unwanted bytes. */
+ 		MemSet(&map[truncByte + 1], 0, MAPSIZE - (truncByte + 1));
+ 
+ 		/*
+ 		 * Mask out the unwanted bits of the last remaining byte.
+ 		 *
+ 		 * ((1 << 0) - 1) = 00000000
+ 		 * ((1 << 1) - 1) = 00000001
+ 		 * ...
+ 		 * ((1 << 6) - 1) = 00111111
+ 		 * ((1 << 7) - 1) = 01111111
+ 		 */
+ 		map[truncByte] &= (1 << truncBit) - 1;
+ 
+ 		MarkBufferDirty(mapBuffer);
+ 		UnlockReleaseBuffer(mapBuffer);
+ 	}
+ 	else
+ 		newnblocks = truncBlock;
+ 
+ 	if (smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM) < newnblocks)
+ 	{
+ 		/* nothing to do, the file was already smaller than requested size */
+ 		return;
+ 	}
+ 
+ 	smgrtruncate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, newnblocks,
+ 				 rel->rd_istemp);
+ 
+ 	/*
+ 	 * Need to invalidate the relcache entry, because rd_vm_nblocks
+ 	 * seen by other backends is no longer valid.
+ 	 */
+ 	if (!InRecovery)
+ 		CacheInvalidateRelcache(rel);
+ 
+ 	rel->rd_vm_nblocks = newnblocks;
+ }
+ 
+ /*
+  * Read a visibility map page.
+  *
+  * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is
+  * true, the visibility map file is extended.
+  */
+ static Buffer
+ vm_readbuf(Relation rel, BlockNumber blkno, bool extend)
+ {
+ 	Buffer buf;
+ 
+ 	RelationOpenSmgr(rel);
+ 
+ 	/*
+ 	 * The current size of the visibility map fork is kept in relcache, to
+ 	 * avoid reading beyond EOF. If we haven't cached the size of the map yet,
+ 	 * do that first.
+ 	 */
+ 	if (rel->rd_vm_nblocks == InvalidBlockNumber)
+ 	{
+ 		if (smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ 			rel->rd_vm_nblocks = smgrnblocks(rel->rd_smgr,
+ 											 VISIBILITYMAP_FORKNUM);
+ 		else
+ 			rel->rd_vm_nblocks = 0;
+ 	}
+ 
+ 	/* Handle requests beyond EOF */
+ 	if (blkno >= rel->rd_vm_nblocks)
+ 	{
+ 		if (extend)
+ 			vm_extend(rel, blkno + 1);
+ 		else
+ 			return InvalidBuffer;
+ 	}
+ 
+ 	/*
+ 	 * Use ZERO_ON_ERROR mode, and initialize the page if necessary. It's
+ 	 * always safe to clear bits, so it's better to clear corrupt pages than
+ 	 * error out.
+ 	 */
+ 	buf = ReadBufferExtended(rel, VISIBILITYMAP_FORKNUM, blkno,
+ 							 RBM_ZERO_ON_ERROR, NULL);
+ 	if (PageIsNew(BufferGetPage(buf)))
+ 		PageInit(BufferGetPage(buf), BLCKSZ, 0);
+ 	return buf;
+ }
+ 
+ /*
+  * Ensure that the visibility map fork is at least vm_nblocks long, extending
+  * it if necessary with zeroed pages.
+  */
+ static void
+ vm_extend(Relation rel, BlockNumber vm_nblocks)
+ {
+ 	BlockNumber vm_nblocks_now;
+ 	Page pg;
+ 
+ 	pg = (Page) palloc(BLCKSZ);
+ 	PageInit(pg, BLCKSZ, 0);
+ 
+ 	/*
+ 	 * We use the relation extension lock to lock out other backends trying
+ 	 * to extend the visibility map at the same time. It also locks out
+ 	 * extension of the main fork, unnecessarily, but extending the
+ 	 * visibility map happens seldom enough that it doesn't seem worthwhile to
+ 	 * have a separate lock tag type for it.
+ 	 *
+ 	 * Note that another backend might have extended or created the
+ 	 * relation before we get the lock.
+ 	 */
+ 	LockRelationForExtension(rel, ExclusiveLock);
+ 
+ 	/* Create the file first if it doesn't exist */
+ 	if ((rel->rd_vm_nblocks == 0 || rel->rd_vm_nblocks == InvalidBlockNumber)
+ 		&& !smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM))
+ 	{
+ 		smgrcreate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, false);
+ 		vm_nblocks_now = 0;
+ 	}
+ 	else
+ 		vm_nblocks_now = smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+ 
+ 	while (vm_nblocks_now < vm_nblocks)
+ 	{
+ 		smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
+ 				   (char *) pg, rel->rd_istemp);
+ 		vm_nblocks_now++;
+ 	}
+ 
+ 	UnlockRelationForExtension(rel, ExclusiveLock);
+ 
+ 	pfree(pg);
+ 
+ 	/* Update the relcache with the up-to-date size */
+ 	if (!InRecovery)
+ 		CacheInvalidateRelcache(rel);
+ 	rel->rd_vm_nblocks = vm_nblocks_now;
+ }
*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 377,382 **** CreateFakeRelcacheEntry(RelFileNode rnode)
--- 377,383 ----
  
  	rel->rd_targblock = InvalidBlockNumber;
  	rel->rd_fsm_nblocks = InvalidBlockNumber;
+ 	rel->rd_vm_nblocks = InvalidBlockNumber;
  	rel->rd_smgr = NULL;
  
  	return rel;
*** src/backend/catalog/catalog.c
--- src/backend/catalog/catalog.c
***************
*** 54,60 ****
   */
  const char *forkNames[] = {
  	"main", /* MAIN_FORKNUM */
! 	"fsm"   /* FSM_FORKNUM */
  };
  
  /*
--- 54,61 ----
   */
  const char *forkNames[] = {
  	"main", /* MAIN_FORKNUM */
! 	"fsm",   /* FSM_FORKNUM */
! 	"vm"   /* VISIBILITYMAP_FORKNUM */
  };
  
  /*
*** src/backend/catalog/storage.c
--- src/backend/catalog/storage.c
***************
*** 19,24 ****
--- 19,25 ----
  
  #include "postgres.h"
  
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
***************
*** 175,180 **** void
--- 176,182 ----
  RelationTruncate(Relation rel, BlockNumber nblocks)
  {
  	bool fsm;
+ 	bool vm;
  
  	/* Open it at the smgr level if not already done */
  	RelationOpenSmgr(rel);
***************
*** 187,192 **** RelationTruncate(Relation rel, BlockNumber nblocks)
--- 189,199 ----
  	if (fsm)
  		FreeSpaceMapTruncateRel(rel, nblocks);
  
+ 	/* Truncate the visibility map too if it exists. */
+ 	vm = smgrexists(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+ 	if (vm)
+ 		visibilitymap_truncate(rel, nblocks);
+ 
  	/*
  	 * We WAL-log the truncation before actually truncating, which
  	 * means trouble if the truncation fails. If we then crash, the WAL
***************
*** 217,228 **** RelationTruncate(Relation rel, BlockNumber nblocks)
  
  		/*
  		 * Flush, because otherwise the truncation of the main relation
! 		 * might hit the disk before the WAL record of truncating the
! 		 * FSM is flushed. If we crashed during that window, we'd be
! 		 * left with a truncated heap, but the FSM would still contain
! 		 * entries for the non-existent heap pages.
  		 */
! 		if (fsm)
  			XLogFlush(lsn);
  	}
  
--- 224,235 ----
  
  		/*
  		 * Flush, because otherwise the truncation of the main relation
! 		 * might hit the disk before the WAL record, and the truncation of
! 		 * the FSM or visibility map. If we crashed during that window, we'd
! 		 * be left with a truncated heap, but the FSM or visibility map would
! 		 * still contain entries for the non-existent heap pages.
  		 */
! 		if (fsm || vm)
  			XLogFlush(lsn);
  	}
  
*** src/backend/commands/vacuum.c
--- src/backend/commands/vacuum.c
***************
*** 26,31 ****
--- 26,32 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlog.h"
  #include "catalog/namespace.h"
***************
*** 2902,2907 **** move_chain_tuple(Relation rel,
--- 2903,2914 ----
  	Size		tuple_len = old_tup->t_len;
  
  	/*
+ 	 * Clear the bits in the visibility map.
+ 	 */
+ 	visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
+ 	visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
+ 
+ 	/*
  	 * make a modifiable copy of the source tuple.
  	 */
  	heap_copytuple_with_tuple(old_tup, &newtup);
***************
*** 3005,3010 **** move_chain_tuple(Relation rel,
--- 3012,3021 ----
  
  	END_CRIT_SECTION();
  
+ 	PageClearAllVisible(BufferGetPage(old_buf));
+ 	if (dst_buf != old_buf)
+ 		PageClearAllVisible(BufferGetPage(dst_buf));
+ 
  	LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
  	if (dst_buf != old_buf)
  		LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
***************
*** 3107,3112 **** move_plain_tuple(Relation rel,
--- 3118,3140 ----
  
  	END_CRIT_SECTION();
  
+ 	/*
+ 	 * Clear the visible-to-all hint bits on the page, and bits in the
+ 	 * visibility map. Normally we'd release the locks on the heap pages
+ 	 * before updating the visibility map, but doesn't really matter here
+ 	 * because we're holding an AccessExclusiveLock on the relation anyway.
+ 	 */
+ 	if (PageIsAllVisible(dst_page))
+ 	{
+ 		PageClearAllVisible(dst_page);
+ 		visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
+ 	}
+ 	if (PageIsAllVisible(old_page))
+ 	{
+ 		PageClearAllVisible(old_page);
+ 		visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
+ 	}
+ 
  	dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
  	LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
  	LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
*** src/backend/commands/vacuumlazy.c
--- src/backend/commands/vacuumlazy.c
***************
*** 40,45 ****
--- 40,46 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "catalog/storage.h"
  #include "commands/dbcommands.h"
  #include "commands/vacuum.h"
***************
*** 88,93 **** typedef struct LVRelStats
--- 89,95 ----
  	int			max_dead_tuples;	/* # slots allocated in array */
  	ItemPointer dead_tuples;	/* array of ItemPointerData */
  	int			num_index_scans;
+ 	bool		scanned_all;	/* have we scanned all pages (this far)? */
  } LVRelStats;
  
  
***************
*** 102,108 **** static BufferAccessStrategy vac_strategy;
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
  				  IndexBulkDeleteResult **stats,
--- 104,110 ----
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, bool scan_all);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
  				  IndexBulkDeleteResult **stats,
***************
*** 141,146 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
--- 143,149 ----
  	BlockNumber possibly_freeable;
  	PGRUsage	ru0;
  	TimestampTz starttime = 0;
+ 	bool		scan_all;
  
  	pg_rusage_init(&ru0);
  
***************
*** 161,173 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
  	vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
  
  	vacrelstats->num_index_scans = 0;
  
  	/* Open all indexes of the relation */
  	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
  	vacrelstats->hasindex = (nindexes > 0);
  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
--- 164,183 ----
  	vacrelstats = (LVRelStats *) palloc0(sizeof(LVRelStats));
  
  	vacrelstats->num_index_scans = 0;
+ 	vacrelstats->scanned_all = true;
  
  	/* Open all indexes of the relation */
  	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
  	vacrelstats->hasindex = (nindexes > 0);
  
+ 	/* Should we use the visibility map or scan all pages? */
+ 	if (vacstmt->freeze_min_age != -1)
+ 		scan_all = true;
+ 	else
+ 		scan_all = false;
+  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, scan_all);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
***************
*** 186,195 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
  	/* Vacuum the Free Space Map */
  	FreeSpaceMapVacuum(onerel);
  
! 	/* Update statistics in pg_class */
  	vac_update_relstats(onerel,
  						vacrelstats->rel_pages, vacrelstats->rel_tuples,
! 						vacrelstats->hasindex, FreezeLimit);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
--- 196,209 ----
  	/* Vacuum the Free Space Map */
  	FreeSpaceMapVacuum(onerel);
  
! 	/*
! 	 * Update statistics in pg_class. We can only advance relfrozenxid if we
! 	 * didn't skip any pages.
! 	 */
  	vac_update_relstats(onerel,
  						vacrelstats->rel_pages, vacrelstats->rel_tuples,
! 						vacrelstats->hasindex,
! 						vacrelstats->scanned_all ? FreezeLimit : InvalidOid);
  
  	/* report results to the stats collector, too */
  	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
***************
*** 230,242 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes)
  {
  	BlockNumber nblocks,
  				blkno;
  	HeapTupleData tuple;
  	char	   *relname;
  	BlockNumber empty_pages,
  				vacuumed_pages;
  	double		num_tuples,
  				tups_vacuumed,
--- 244,257 ----
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, bool scan_all)
  {
  	BlockNumber nblocks,
  				blkno;
  	HeapTupleData tuple;
  	char	   *relname;
  	BlockNumber empty_pages,
+ 				scanned_pages,
  				vacuumed_pages;
  	double		num_tuples,
  				tups_vacuumed,
***************
*** 245,250 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 260,266 ----
  	IndexBulkDeleteResult **indstats;
  	int			i;
  	PGRUsage	ru0;
+ 	Buffer		vmbuffer = InvalidBuffer;
  
  	pg_rusage_init(&ru0);
  
***************
*** 254,260 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  					get_namespace_name(RelationGetNamespace(onerel)),
  					relname)));
  
! 	empty_pages = vacuumed_pages = 0;
  	num_tuples = tups_vacuumed = nkeep = nunused = 0;
  
  	indstats = (IndexBulkDeleteResult **)
--- 270,276 ----
  					get_namespace_name(RelationGetNamespace(onerel)),
  					relname)));
  
! 	empty_pages = vacuumed_pages = scanned_pages = 0;
  	num_tuples = tups_vacuumed = nkeep = nunused = 0;
  
  	indstats = (IndexBulkDeleteResult **)
***************
*** 278,286 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 294,321 ----
  		OffsetNumber frozen[MaxOffsetNumber];
  		int			nfrozen;
  		Size		freespace;
+ 		bool		all_visible_according_to_vm = false;
+ 		bool		all_visible;
+ 
+ 		/*
+ 		 * Skip pages that don't require vacuuming according to the
+ 		 * visibility map.
+ 		 */
+ 		if (!scan_all)
+ 		{
+ 			all_visible_according_to_vm =
+ 				visibilitymap_test(onerel, blkno, &vmbuffer);
+ 			if (all_visible_according_to_vm)
+ 			{
+ 				vacrelstats->scanned_all = false;
+ 				continue;
+ 			}
+ 		}
  
  		vacuum_delay_point();
  
+ 		scanned_pages++;
+ 
  		/*
  		 * If we are close to overrunning the available space for dead-tuple
  		 * TIDs, pause and do a cycle of vacuuming before we tackle this page.
***************
*** 354,360 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  		{
  			empty_pages++;
  			freespace = PageGetHeapFreeSpace(page);
! 			UnlockReleaseBuffer(buf);
  			RecordPageWithFreeSpace(onerel, blkno, freespace);
  			continue;
  		}
--- 389,414 ----
  		{
  			empty_pages++;
  			freespace = PageGetHeapFreeSpace(page);
! 
! 			if (!PageIsAllVisible(page))
! 			{
! 				SetBufferCommitInfoNeedsSave(buf);
! 				PageSetAllVisible(page);
! 			}
! 
! 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
! 
! 			/* Update the visibility map */
! 			if (!all_visible_according_to_vm)
! 			{
! 				visibilitymap_pin(onerel, blkno, &vmbuffer);
! 				LockBuffer(buf, BUFFER_LOCK_SHARE);
! 				if (PageIsAllVisible(page))
! 					visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
! 				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
! 			}
! 
! 			ReleaseBuffer(buf);
  			RecordPageWithFreeSpace(onerel, blkno, freespace);
  			continue;
  		}
***************
*** 371,376 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 425,431 ----
  		 * Now scan the page to collect vacuumable items and check for tuples
  		 * requiring freezing.
  		 */
+ 		all_visible = true;
  		nfrozen = 0;
  		hastup = false;
  		prev_dead_count = vacrelstats->num_dead_tuples;
***************
*** 408,413 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 463,469 ----
  			if (ItemIdIsDead(itemid))
  			{
  				lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+ 				all_visible = false;
  				continue;
  			}
  
***************
*** 442,447 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 498,504 ----
  						nkeep += 1;
  					else
  						tupgone = true; /* we can delete the tuple */
+ 					all_visible = false;
  					break;
  				case HEAPTUPLE_LIVE:
  					/* Tuple is good --- but let's do some validity checks */
***************
*** 449,454 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 506,540 ----
  						!OidIsValid(HeapTupleGetOid(&tuple)))
  						elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
  							 relname, blkno, offnum);
+ 
+ 					/*
+ 					 * Is the tuple definitely visible to all transactions?
+ 					 *
+ 					 * NB: Like with per-tuple hint bits, we can't set the
+ 					 * flag if the inserter committed asynchronously. See
+ 					 * SetHintBits for more info. Check that the
+ 					 * HEAP_XMIN_COMMITTED hint bit is set because of that.
+ 					 */
+ 					if (all_visible)
+ 					{
+ 						TransactionId xmin;
+ 
+ 						if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
+ 						{
+ 							all_visible = false;
+ 							break;
+ 						}
+ 						/*
+ 						 * The inserter definitely committed. But is it
+ 						 * old enough that everyone sees it as committed?
+ 						 */
+ 						xmin = HeapTupleHeaderGetXmin(tuple.t_data);
+ 						if (!TransactionIdPrecedes(xmin, OldestXmin))
+ 						{
+ 							all_visible = false;
+ 							break;
+ 						}
+ 					}
  					break;
  				case HEAPTUPLE_RECENTLY_DEAD:
  
***************
*** 457,468 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 543,557 ----
  					 * from relation.
  					 */
  					nkeep += 1;
+ 					all_visible = false;
  					break;
  				case HEAPTUPLE_INSERT_IN_PROGRESS:
  					/* This is an expected case during concurrent vacuum */
+ 					all_visible = false;
  					break;
  				case HEAPTUPLE_DELETE_IN_PROGRESS:
  					/* This is an expected case during concurrent vacuum */
+ 					all_visible = false;
  					break;
  				default:
  					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
***************
*** 525,536 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  
  		freespace = PageGetHeapFreeSpace(page);
  
  		/* Remember the location of the last page with nonremovable tuples */
  		if (hastup)
  			vacrelstats->nonempty_pages = blkno + 1;
  
- 		UnlockReleaseBuffer(buf);
- 
  		/*
  		 * If we remembered any tuples for deletion, then the page will be
  		 * visited again by lazy_vacuum_heap, which will compute and record
--- 614,656 ----
  
  		freespace = PageGetHeapFreeSpace(page);
  
+ 		/* Update the all-visible flag on the page */
+ 		if (!PageIsAllVisible(page) && all_visible)
+ 		{
+ 			SetBufferCommitInfoNeedsSave(buf);
+ 			PageSetAllVisible(page);
+ 		}
+ 		else if (PageIsAllVisible(page) && !all_visible)
+ 		{
+ 			elog(WARNING, "PD_ALL_VISIBLE flag was incorrectly set");
+ 			SetBufferCommitInfoNeedsSave(buf);
+ 			PageClearAllVisible(page);
+ 
+ 			/*
+ 			 * XXX: Normally, we would drop the lock on the heap page before
+ 			 * updating the visibility map.
+ 			 */
+ 			visibilitymap_clear(onerel, blkno);
+ 		}
+ 
+ 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 
+ 		/* Update the visibility map */
+ 		if (!all_visible_according_to_vm && all_visible)
+ 		{
+ 			visibilitymap_pin(onerel, blkno, &vmbuffer);
+ 			LockBuffer(buf, BUFFER_LOCK_SHARE);
+ 			if (PageIsAllVisible(page))
+ 				visibilitymap_set(onerel, blkno, PageGetLSN(page), &vmbuffer);
+ 			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+ 		}
+ 
+ 		ReleaseBuffer(buf);
+ 
  		/* Remember the location of the last page with nonremovable tuples */
  		if (hastup)
  			vacrelstats->nonempty_pages = blkno + 1;
  
  		/*
  		 * If we remembered any tuples for deletion, then the page will be
  		 * visited again by lazy_vacuum_heap, which will compute and record
***************
*** 560,565 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 680,692 ----
  		vacrelstats->num_index_scans++;
  	}
  
+ 	/* Release the pin on the visibility map page */
+ 	if (BufferIsValid(vmbuffer))
+ 	{
+ 		ReleaseBuffer(vmbuffer);
+ 		vmbuffer = InvalidBuffer;
+ 	}
+ 
  	/* Do post-vacuum cleanup and statistics update for each index */
  	for (i = 0; i < nindexes; i++)
  		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
***************
*** 572,580 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
  						tups_vacuumed, vacuumed_pages)));
  
  	ereport(elevel,
! 			(errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
  					RelationGetRelationName(onerel),
! 					tups_vacuumed, num_tuples, nblocks),
  			 errdetail("%.0f dead row versions cannot be removed yet.\n"
  					   "There were %.0f unused item pointers.\n"
  					   "%u pages are entirely empty.\n"
--- 699,707 ----
  						tups_vacuumed, vacuumed_pages)));
  
  	ereport(elevel,
! 			(errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages",
  					RelationGetRelationName(onerel),
! 					tups_vacuumed, num_tuples, scanned_pages, nblocks),
  			 errdetail("%.0f dead row versions cannot be removed yet.\n"
  					   "There were %.0f unused item pointers.\n"
  					   "%u pages are entirely empty.\n"
***************
*** 623,628 **** lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
--- 750,764 ----
  		LockBufferForCleanup(buf);
  		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
  
+ 		/*
+ 		 * Before we let the page go, prune it. The primary reason is to
+ 		 * update the visibility map in the common special case that we just
+ 		 * vacuumed away the last tuple on the page that wasn't visible to
+ 		 * everyone.
+ 		 */
+ 		vacrelstats->tuples_deleted +=
+ 			heap_page_prune(onerel, buf, OldestXmin, false, false);
+ 
  		/* Now that we've compacted the page, record its available space */
  		page = BufferGetPage(buf);
  		freespace = PageGetHeapFreeSpace(page);
*** src/backend/utils/cache/relcache.c
--- src/backend/utils/cache/relcache.c
***************
*** 305,310 **** AllocateRelationDesc(Relation relation, Form_pg_class relp)
--- 305,311 ----
  	MemSet(relation, 0, sizeof(RelationData));
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	relation->rd_smgr = NULL;
***************
*** 1377,1382 **** formrdesc(const char *relationName, Oid relationReltype,
--- 1378,1384 ----
  	relation = (Relation) palloc0(sizeof(RelationData));
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	relation->rd_smgr = NULL;
***************
*** 1665,1673 **** RelationReloadIndexInfo(Relation relation)
  	heap_freetuple(pg_class_tuple);
  	/* We must recalculate physical address in case it changed */
  	RelationInitPhysicalAddr(relation);
! 	/* Must reset targblock and fsm_nblocks in case rel was truncated */
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks = InvalidBlockNumber;
  	/* Must free any AM cached data, too */
  	if (relation->rd_amcache)
  		pfree(relation->rd_amcache);
--- 1667,1679 ----
  	heap_freetuple(pg_class_tuple);
  	/* We must recalculate physical address in case it changed */
  	RelationInitPhysicalAddr(relation);
! 	/*
! 	 * Must reset targblock, fsm_nblocks and vm_nblocks in case rel was
! 	 * truncated
! 	 */
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks = InvalidBlockNumber;
  	/* Must free any AM cached data, too */
  	if (relation->rd_amcache)
  		pfree(relation->rd_amcache);
***************
*** 1751,1756 **** RelationClearRelation(Relation relation, bool rebuild)
--- 1757,1763 ----
  	{
  		relation->rd_targblock = InvalidBlockNumber;
  		relation->rd_fsm_nblocks = InvalidBlockNumber;
+ 		relation->rd_vm_nblocks = InvalidBlockNumber;
  		if (relation->rd_rel->relkind == RELKIND_INDEX)
  		{
  			relation->rd_isvalid = false;		/* needs to be revalidated */
***************
*** 2346,2351 **** RelationBuildLocalRelation(const char *relname,
--- 2353,2359 ----
  
  	rel->rd_targblock = InvalidBlockNumber;
  	rel->rd_fsm_nblocks = InvalidBlockNumber;
+ 	rel->rd_vm_nblocks = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	rel->rd_smgr = NULL;
***************
*** 3603,3608 **** load_relcache_init_file(void)
--- 3611,3617 ----
  		rel->rd_smgr = NULL;
  		rel->rd_targblock = InvalidBlockNumber;
  		rel->rd_fsm_nblocks = InvalidBlockNumber;
+ 		rel->rd_vm_nblocks = InvalidBlockNumber;
  		if (rel->rd_isnailed)
  			rel->rd_refcnt = 1;
  		else
*** src/include/access/heapam.h
--- src/include/access/heapam.h
***************
*** 153,158 **** extern void heap_page_prune_execute(Buffer buffer,
--- 153,159 ----
  						OffsetNumber *nowunused, int nunused,
  						bool redirect_move);
  extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
+ extern void heap_page_update_all_visible(Buffer buffer);
  
  /* in heap/syncscan.c */
  extern void ss_report_location(Relation rel, BlockNumber location);
*** src/include/access/htup.h
--- src/include/access/htup.h
***************
*** 601,609 **** typedef struct xl_heaptid
  typedef struct xl_heap_delete
  {
  	xl_heaptid	target;			/* deleted tuple id */
  } xl_heap_delete;
  
! #define SizeOfHeapDelete	(offsetof(xl_heap_delete, target) + SizeOfHeapTid)
  
  /*
   * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
--- 601,610 ----
  typedef struct xl_heap_delete
  {
  	xl_heaptid	target;			/* deleted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
  } xl_heap_delete;
  
! #define SizeOfHeapDelete	(offsetof(xl_heap_delete, all_visible_cleared) + sizeof(bool))
  
  /*
   * We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
***************
*** 626,646 **** typedef struct xl_heap_header
  typedef struct xl_heap_insert
  {
  	xl_heaptid	target;			/* inserted tuple id */
  	/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_insert;
  
! #define SizeOfHeapInsert	(offsetof(xl_heap_insert, target) + SizeOfHeapTid)
  
  /* This is what we need to know about update|move|hot_update */
  typedef struct xl_heap_update
  {
  	xl_heaptid	target;			/* deleted tuple id */
  	ItemPointerData newtid;		/* new inserted tuple id */
  	/* NEW TUPLE xl_heap_header (PLUS xmax & xmin IF MOVE OP) */
  	/* and TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;
  
! #define SizeOfHeapUpdate	(offsetof(xl_heap_update, newtid) + SizeOfIptrData)
  
  /*
   * This is what we need to know about vacuum page cleanup/redirect
--- 627,650 ----
  typedef struct xl_heap_insert
  {
  	xl_heaptid	target;			/* inserted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
  	/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_insert;
  
! #define SizeOfHeapInsert	(offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
  
  /* This is what we need to know about update|move|hot_update */
  typedef struct xl_heap_update
  {
  	xl_heaptid	target;			/* deleted tuple id */
  	ItemPointerData newtid;		/* new inserted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
+ 	bool new_all_visible_cleared; /* same for the page of newtid */
  	/* NEW TUPLE xl_heap_header (PLUS xmax & xmin IF MOVE OP) */
  	/* and TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;
  
! #define SizeOfHeapUpdate	(offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))
  
  /*
   * This is what we need to know about vacuum page cleanup/redirect
*** /dev/null
--- src/include/access/visibilitymap.h
***************
*** 0 ****
--- 1,30 ----
+ /*-------------------------------------------------------------------------
+  *
+  * visibilitymap.h
+  *      visibility map interface
+  *
+  *
+  * Portions Copyright (c) 2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef VISIBILITYMAP_H
+ #define VISIBILITYMAP_H
+ 
+ #include "utils/rel.h"
+ #include "storage/buf.h"
+ #include "storage/itemptr.h"
+ #include "access/xlogdefs.h"
+ 
+ extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk);
+ extern void visibilitymap_pin(Relation rel, BlockNumber heapBlk,
+ 							  Buffer *vmbuf);
+ extern void visibilitymap_set(Relation rel, BlockNumber heapBlk,
+ 							  XLogRecPtr recptr, Buffer *vmbuf);
+ extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
+ extern void visibilitymap_truncate(Relation rel, BlockNumber heapblk);
+ 
+ #endif   /* VISIBILITYMAP_H */
*** src/include/storage/bufpage.h
--- src/include/storage/bufpage.h
***************
*** 152,159 **** typedef PageHeaderData *PageHeader;
  #define PD_HAS_FREE_LINES	0x0001		/* are there any unused line pointers? */
  #define PD_PAGE_FULL		0x0002		/* not enough free space for new
  										 * tuple? */
  
! #define PD_VALID_FLAG_BITS	0x0003		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
--- 152,161 ----
  #define PD_HAS_FREE_LINES	0x0001		/* are there any unused line pointers? */
  #define PD_PAGE_FULL		0x0002		/* not enough free space for new
  										 * tuple? */
+ #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
+ 										 * everyone */
  
! #define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
***************
*** 336,341 **** typedef PageHeaderData *PageHeader;
--- 338,350 ----
  #define PageClearFull(page) \
  	(((PageHeader) (page))->pd_flags &= ~PD_PAGE_FULL)
  
+ #define PageIsAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)
+ #define PageSetAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags |= PD_ALL_VISIBLE)
+ #define PageClearAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
+ 
  #define PageIsPrunable(page, oldestxmin) \
  ( \
  	AssertMacro(TransactionIdIsNormal(oldestxmin)), \
*** src/include/storage/relfilenode.h
--- src/include/storage/relfilenode.h
***************
*** 24,37 **** typedef enum ForkNumber
  {
  	InvalidForkNumber = -1,
  	MAIN_FORKNUM = 0,
! 	FSM_FORKNUM
  	/*
  	 * NOTE: if you add a new fork, change MAX_FORKNUM below and update the
  	 * forkNames array in catalog.c
  	 */
  } ForkNumber;
  
! #define MAX_FORKNUM		FSM_FORKNUM
  
  /*
   * RelFileNode must provide all that we need to know to physically access
--- 24,38 ----
  {
  	InvalidForkNumber = -1,
  	MAIN_FORKNUM = 0,
! 	FSM_FORKNUM,
! 	VISIBILITYMAP_FORKNUM
  	/*
  	 * NOTE: if you add a new fork, change MAX_FORKNUM below and update the
  	 * forkNames array in catalog.c
  	 */
  } ForkNumber;
  
! #define MAX_FORKNUM		VISIBILITYMAP_FORKNUM
  
  /*
   * RelFileNode must provide all that we need to know to physically access
*** src/include/utils/rel.h
--- src/include/utils/rel.h
***************
*** 195,202 **** typedef struct RelationData
  	List	   *rd_indpred;		/* index predicate tree, if any */
  	void	   *rd_amcache;		/* available for use by index AM */
  
! 	/* size of the FSM, or InvalidBlockNumber if not known yet */
  	BlockNumber	rd_fsm_nblocks;
  
  	/* use "struct" here to avoid needing to include pgstat.h: */
  	struct PgStat_TableStatus *pgstat_info;		/* statistics collection area */
--- 195,206 ----
  	List	   *rd_indpred;		/* index predicate tree, if any */
  	void	   *rd_amcache;		/* available for use by index AM */
  
! 	/*
! 	 * sizes of the free space and visibility map forks, or InvalidBlockNumber
! 	 * if not known yet
! 	 */
  	BlockNumber	rd_fsm_nblocks;
+ 	BlockNumber	rd_vm_nblocks;
  
  	/* use "struct" here to avoid needing to include pgstat.h: */
  	struct PgStat_TableStatus *pgstat_info;		/* statistics collection area */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to