This patch is related to the "Reduce pinning in btree indexes"
patch submitted here:

http://www.postgresql.org/message-id/721615179.3351449.1423959585771.javamail.ya...@mail.yahoo.com

That describes how they evolved and how they relate; I won't
duplicate that here.

Unlike the other patch, this one is more at the "proof of concept"
phase, because it requires support in the heap and each index AM to
work correctly; so far I have only had time to cover the heap and
btree indexes.  In spite of that, I have thrown the worst test
cases I could think of at it (and only succeeded in uncovering a
bug which was already out there in production), and it has shown
its value in a two-day test test simulating a 300 user load with
complex real-world applications (although the only indexes it used
were btree indexes).  Without the patches the database growth was
39GB per day; with the patches it was 28.5GB per day.  (The test
does involve more inserts than deletes, so some growth is
expected.)  At the end of the tests, pgstattuple reported eight
times as many dead tuples in the database without the patches.
More importantly, without the patches the CPU load started at 60%
and showed linear growth to 92% over the course of the first day;
with the patches it stayed at a stable 60% throughout the test.

What this patch does is add a GUC call old_snapshot_threshold.  It
defaults to -1, which leaves behavior matching unpatched code.
Above that it allows tuples to be vacuumed away after the number of
transaction IDs specified by the GUC have been consumed.  It also
saves the current insertion LSN into every snapshot when it is
created.  When reading from the heap or any index, if the snapshot
is vulnerable to showing incorrect data because the threshold has
been crossed since it was generated, reading any page with an LSN
past the snapshot LSN causes a "snapshot too old" error to be
thrown.  Since this is LSN-based, the new logic is not used for any
relation which is not WAL-logged.

Note that if you don't read data from a page which has been
modified after your snapshot was taken, the threshold doesn't
matter.

All `make installcheck` tests succeed with any setting.  With a
setting of 0 (the most extreme), `make installcheck-world` sees
four isolation tests fail.  Those all pass if you raise the
setting to 2.  The postgres_fdw test needs a setting of 4 to
succeed.  I would expect most shops would want to tune this to
something in the six-digit to eight-digit range.  In the tests
mentioned above it was set to 150000 (which corresponded to just
under 4 minutes of txid consumption) and there were no "snapshot
too old" errors, even though some cursors were left open for the
entire two-day run.

The patch still lacks (as mentioned above) support for index AMs
other than btree, and lacks documentation for the new GUC.  I'm
sure that there are some comments and README files that need
adjustment, too.  As I said, this is still POC.

--
Kevin Grittner
EDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 366,371 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
--- 366,372 ----
  	LockBuffer(buffer, BUFFER_LOCK_SHARE);
  
  	dp = (Page) BufferGetPage(buffer);
+ 	TestForOldSnapshot(snapshot, scan->rs_rd, dp);
  	lines = PageGetMaxOffsetNumber(dp);
  	ntup = 0;
  
***************
*** 496,501 **** heapgettup(HeapScanDesc scan,
--- 497,503 ----
  		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
  
  		dp = (Page) BufferGetPage(scan->rs_cbuf);
+ 		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
  		lines = PageGetMaxOffsetNumber(dp);
  		/* page and lineoff now reference the physically next tid */
  
***************
*** 538,543 **** heapgettup(HeapScanDesc scan,
--- 540,546 ----
  		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
  
  		dp = (Page) BufferGetPage(scan->rs_cbuf);
+ 		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
  		lines = PageGetMaxOffsetNumber(dp);
  
  		if (!scan->rs_inited)
***************
*** 696,701 **** heapgettup(HeapScanDesc scan,
--- 699,705 ----
  		LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
  
  		dp = (Page) BufferGetPage(scan->rs_cbuf);
+ 		TestForOldSnapshot(snapshot, scan->rs_rd, dp);
  		lines = PageGetMaxOffsetNumber((Page) dp);
  		linesleft = lines;
  		if (backward)
***************
*** 1573,1578 **** heap_fetch(Relation relation,
--- 1577,1583 ----
  	 */
  	LockBuffer(buffer, BUFFER_LOCK_SHARE);
  	page = BufferGetPage(buffer);
+ 	TestForOldSnapshot(snapshot, relation, page);
  
  	/*
  	 * We'd better check for out-of-range offnum in case of VACUUM since the
***************
*** 1902,1907 **** heap_get_latest_tid(Relation relation,
--- 1907,1913 ----
  		buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
  		LockBuffer(buffer, BUFFER_LOCK_SHARE);
  		page = BufferGetPage(buffer);
+ 		TestForOldSnapshot(snapshot, relation, page);
  
  		/*
  		 * Check for bogus item number.  This is not treated as an error
*** a/src/backend/access/heap/pruneheap.c
--- b/src/backend/access/heap/pruneheap.c
***************
*** 92,103 **** heap_page_prune_opt(Relation relation, Buffer buffer)
  	 * need to use the horizon that includes slots, otherwise the data-only
  	 * horizon can be used. Note that the toast relation of user defined
  	 * relations are *not* considered catalog relations.
  	 */
  	if (IsCatalogRelation(relation) ||
  		RelationIsAccessibleInLogicalDecoding(relation))
  		OldestXmin = RecentGlobalXmin;
  	else
! 		OldestXmin = RecentGlobalDataXmin;
  
  	Assert(TransactionIdIsValid(OldestXmin));
  
--- 92,112 ----
  	 * need to use the horizon that includes slots, otherwise the data-only
  	 * horizon can be used. Note that the toast relation of user defined
  	 * relations are *not* considered catalog relations.
+ 	 *
+ 	 * It is OK to apply the old snapshot limit before acquiring the cleanup
+ 	 * lock because the worst that can happen is that we are not quite as
+ 	 * aggressive about the cleanup (by however many transaction IDs are
+ 	 * consumed between this point and acquiring the lock).  This allows us to
+ 	 * save significant overhead in the case where the page is found not to be
+ 	 * prunable.
  	 */
  	if (IsCatalogRelation(relation) ||
  		RelationIsAccessibleInLogicalDecoding(relation))
  		OldestXmin = RecentGlobalXmin;
  	else
! 		OldestXmin =
! 				TransactionIdLimitedForOldSnapshots(RecentGlobalDataXmin,
! 													relation);
  
  	Assert(TransactionIdIsValid(OldestXmin));
  
*** a/src/backend/access/nbtree/nbtinsert.c
--- b/src/backend/access/nbtree/nbtinsert.c
***************
*** 118,124 **** _bt_doinsert(Relation rel, IndexTuple itup,
  
  top:
  	/* find the first page containing this key */
! 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
  
  	offset = InvalidOffsetNumber;
  
--- 118,124 ----
  
  top:
  	/* find the first page containing this key */
! 	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
  
  	offset = InvalidOffsetNumber;
  
***************
*** 134,140 **** top:
  	 * precise description.
  	 */
  	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
! 						true, stack, BT_WRITE);
  
  	/*
  	 * If we're not allowing duplicates, make sure the key isn't already in
--- 134,140 ----
  	 * precise description.
  	 */
  	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
! 						true, stack, BT_WRITE, NULL);
  
  	/*
  	 * If we're not allowing duplicates, make sure the key isn't already in
***************
*** 1654,1660 **** _bt_insert_parent(Relation rel,
  			elog(DEBUG2, "concurrent ROOT page split");
  			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
  			/* Find the leftmost page at the next level up */
! 			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
  			/* Set up a phony stack entry pointing there */
  			stack = &fakestack;
  			stack->bts_blkno = BufferGetBlockNumber(pbuf);
--- 1654,1661 ----
  			elog(DEBUG2, "concurrent ROOT page split");
  			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
  			/* Find the leftmost page at the next level up */
! 			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false,
! 									NULL);
  			/* Set up a phony stack entry pointing there */
  			stack = &fakestack;
  			stack->bts_blkno = BufferGetBlockNumber(pbuf);
*** a/src/backend/access/nbtree/nbtpage.c
--- b/src/backend/access/nbtree/nbtpage.c
***************
*** 1254,1260 **** _bt_pagedel(Relation rel, Buffer buf)
  				itup_scankey = _bt_mkscankey(rel, targetkey);
  				/* find the leftmost leaf page containing this key */
  				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
! 								   false, &lbuf, BT_READ);
  				/* don't need a pin on the page */
  				_bt_relbuf(rel, lbuf);
  
--- 1254,1260 ----
  				itup_scankey = _bt_mkscankey(rel, targetkey);
  				/* find the leftmost leaf page containing this key */
  				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
! 								   false, &lbuf, BT_READ, NULL);
  				/* don't need a pin on the page */
  				_bt_relbuf(rel, lbuf);
  
*** a/src/backend/access/nbtree/nbtsearch.c
--- b/src/backend/access/nbtree/nbtsearch.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "storage/predicate.h"
  #include "utils/lsyscache.h"
  #include "utils/rel.h"
+ #include "utils/snapmgr.h"
  
  
  static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
***************
*** 29,35 **** static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
  static void _bt_saveitem(BTScanOpaque so, int itemIndex,
  			 OffsetNumber offnum, IndexTuple itup);
  static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
! static Buffer _bt_walk_left(Relation rel, Buffer buf);
  static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
  
  
--- 30,36 ----
  static void _bt_saveitem(BTScanOpaque so, int itemIndex,
  			 OffsetNumber offnum, IndexTuple itup);
  static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
! static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
  static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
  
  
***************
*** 48,53 **** static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
--- 49,58 ----
   * address of the leaf-page buffer, which is read-locked and pinned.
   * No locks are held on the parent pages, however!
   *
+  * If the snapshot parameter is not NULL, "old snapshot" checking will take
+  * place during the descent through the tree.  This is not needed when
+  * positioning for an insert or delete, so NULL is used for those cases.
+  *
   * NOTE that the returned buffer is read-locked regardless of the access
   * parameter.  However, access = BT_WRITE will allow an empty root page
   * to be created and returned.  When access = BT_READ, an empty index
***************
*** 56,62 **** static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
   */
  BTStack
  _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
! 		   Buffer *bufP, int access)
  {
  	BTStack		stack_in = NULL;
  
--- 61,67 ----
   */
  BTStack
  _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
! 		   Buffer *bufP, int access, Snapshot snapshot)
  {
  	BTStack		stack_in = NULL;
  
***************
*** 65,71 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
--- 70,79 ----
  
  	/* If index is empty and access = BT_READ, no root page is created. */
  	if (!BufferIsValid(*bufP))
+ 	{
+ 		/* FIXME: old snapshot checking special case here? */
  		return (BTStack) NULL;
+ 	}
  
  	/* Loop iterates once per level descended in the tree */
  	for (;;)
***************
*** 93,99 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
  		 */
  		*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
  							  (access == BT_WRITE), stack_in,
! 							  BT_READ);
  
  		/* if this is a leaf page, we're done */
  		page = BufferGetPage(*bufP);
--- 101,107 ----
  		 */
  		*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
  							  (access == BT_WRITE), stack_in,
! 							  BT_READ, snapshot);
  
  		/* if this is a leaf page, we're done */
  		page = BufferGetPage(*bufP);
***************
*** 166,171 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
--- 174,183 ----
   * On entry, we have the buffer pinned and a lock of the type specified by
   * 'access'.  If we move right, we release the buffer and lock and acquire
   * the same on the right sibling.  Return value is the buffer we stop at.
+  *
+  * If the snapshot parameter is not NULL, "old snapshot" checking will take
+  * place during the descent through the tree.  This is not needed when
+  * positioning for an insert or delete, so NULL is used for those cases.
   */
  Buffer
  _bt_moveright(Relation rel,
***************
*** 175,181 **** _bt_moveright(Relation rel,
  			  bool nextkey,
  			  bool forupdate,
  			  BTStack stack,
! 			  int access)
  {
  	Page		page;
  	BTPageOpaque opaque;
--- 187,194 ----
  			  bool nextkey,
  			  bool forupdate,
  			  BTStack stack,
! 			  int access,
! 			  Snapshot snapshot)
  {
  	Page		page;
  	BTPageOpaque opaque;
***************
*** 201,206 **** _bt_moveright(Relation rel,
--- 214,220 ----
  	for (;;)
  	{
  		page = BufferGetPage(buf);
+ 		TestForOldSnapshot(snapshot, rel, page);
  		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  
  		if (P_RIGHTMOST(opaque))
***************
*** 937,943 **** _bt_first(IndexScanDesc scan, ScanDirection dir)
  	 * Use the manufactured insertion scan key to descend the tree and
  	 * position ourselves on the target leaf page.
  	 */
! 	stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ);
  
  	/* don't need to keep the stack around... */
  	_bt_freestack(stack);
--- 951,958 ----
  	 * Use the manufactured insertion scan key to descend the tree and
  	 * position ourselves on the target leaf page.
  	 */
! 	stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
! 					   scan->xs_snapshot);
  
  	/* don't need to keep the stack around... */
  	_bt_freestack(stack);
***************
*** 1308,1313 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
--- 1323,1329 ----
  			so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
  			/* check for deleted page */
  			page = BufferGetPage(so->currPos.buf);
+ 			TestForOldSnapshot(scan->xs_snapshot, rel, page);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  			if (!P_IGNORE(opaque))
  			{
***************
*** 1344,1350 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
  			}
  
  			/* Step to next physical page */
! 			so->currPos.buf = _bt_walk_left(rel, so->currPos.buf);
  
  			/* if we're physically at end of index, return failure */
  			if (so->currPos.buf == InvalidBuffer)
--- 1360,1367 ----
  			}
  
  			/* Step to next physical page */
! 			so->currPos.buf = _bt_walk_left(rel, so->currPos.buf,
! 											scan->xs_snapshot);
  
  			/* if we're physically at end of index, return failure */
  			if (so->currPos.buf == InvalidBuffer)
***************
*** 1356,1361 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
--- 1373,1379 ----
  			 * and do it all again.
  			 */
  			page = BufferGetPage(so->currPos.buf);
+ 			TestForOldSnapshot(scan->xs_snapshot, rel, page);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  			if (!P_IGNORE(opaque))
  			{
***************
*** 1386,1392 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
   * again if it's important.
   */
  static Buffer
! _bt_walk_left(Relation rel, Buffer buf)
  {
  	Page		page;
  	BTPageOpaque opaque;
--- 1404,1410 ----
   * again if it's important.
   */
  static Buffer
! _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
  {
  	Page		page;
  	BTPageOpaque opaque;
***************
*** 1416,1421 **** _bt_walk_left(Relation rel, Buffer buf)
--- 1434,1440 ----
  		CHECK_FOR_INTERRUPTS();
  		buf = _bt_getbuf(rel, blkno, BT_READ);
  		page = BufferGetPage(buf);
+ 		TestForOldSnapshot(snapshot, rel, page);
  		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  
  		/*
***************
*** 1442,1453 **** _bt_walk_left(Relation rel, Buffer buf)
--- 1461,1474 ----
  			blkno = opaque->btpo_next;
  			buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
  			page = BufferGetPage(buf);
+ 			TestForOldSnapshot(snapshot, rel, page);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  		}
  
  		/* Return to the original page to see what's up */
  		buf = _bt_relandgetbuf(rel, buf, obknum, BT_READ);
  		page = BufferGetPage(buf);
+ 		TestForOldSnapshot(snapshot, rel, page);
  		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  		if (P_ISDELETED(opaque))
  		{
***************
*** 1465,1470 **** _bt_walk_left(Relation rel, Buffer buf)
--- 1486,1492 ----
  				blkno = opaque->btpo_next;
  				buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
  				page = BufferGetPage(buf);
+ 				TestForOldSnapshot(snapshot, rel, page);
  				opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  				if (!P_ISDELETED(opaque))
  					break;
***************
*** 1501,1507 **** _bt_walk_left(Relation rel, Buffer buf)
   * The returned buffer is pinned and read-locked.
   */
  Buffer
! _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
  {
  	Buffer		buf;
  	Page		page;
--- 1523,1530 ----
   * The returned buffer is pinned and read-locked.
   */
  Buffer
! _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
! 				 Snapshot snapshot)
  {
  	Buffer		buf;
  	Page		page;
***************
*** 1524,1529 **** _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
--- 1547,1553 ----
  		return InvalidBuffer;
  
  	page = BufferGetPage(buf);
+ 	TestForOldSnapshot(snapshot, rel, page);
  	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  
  	for (;;)
***************
*** 1543,1548 **** _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
--- 1567,1573 ----
  					 RelationGetRelationName(rel));
  			buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ);
  			page = BufferGetPage(buf);
+ 			TestForOldSnapshot(snapshot, rel, page);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  		}
  
***************
*** 1595,1601 **** _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
  	 * version of _bt_search().  We don't maintain a stack since we know we
  	 * won't need it.
  	 */
! 	buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir));
  
  	if (!BufferIsValid(buf))
  	{
--- 1620,1626 ----
  	 * version of _bt_search().  We don't maintain a stack since we know we
  	 * won't need it.
  	 */
! 	buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot);
  
  	if (!BufferIsValid(buf))
  	{
*** a/src/backend/commands/vacuum.c
--- b/src/backend/commands/vacuum.c
***************
*** 441,447 **** vacuum_set_xid_limits(Relation rel,
  	 * working on a particular table at any time, and that each vacuum is
  	 * always an independent transaction.
  	 */
! 	*oldestXmin = GetOldestXmin(rel, true);
  
  	Assert(TransactionIdIsNormal(*oldestXmin));
  
--- 441,448 ----
  	 * working on a particular table at any time, and that each vacuum is
  	 * always an independent transaction.
  	 */
! 	*oldestXmin =
! 		TransactionIdLimitedForOldSnapshots(GetOldestXmin(rel, true), rel);
  
  	Assert(TransactionIdIsNormal(*oldestXmin));
  
*** a/src/backend/commands/vacuumlazy.c
--- b/src/backend/commands/vacuumlazy.c
***************
*** 59,64 ****
--- 59,65 ----
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
  #include "utils/pg_rusage.h"
+ #include "utils/snapmgr.h"
  #include "utils/timestamp.h"
  #include "utils/tqual.h"
  
***************
*** 267,273 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
  	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
  	if (possibly_freeable > 0 &&
  		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
! 		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION))
  		lazy_truncate_heap(onerel, vacrelstats);
  
  	/* Vacuum the Free Space Map */
--- 268,275 ----
  	possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
  	if (possibly_freeable > 0 &&
  		(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
! 		 possibly_freeable >= vacrelstats->rel_pages / REL_TRUNCATE_FRACTION) &&
! 		old_snapshot_threshold < 0)
  		lazy_truncate_heap(onerel, vacrelstats);
  
  	/* Vacuum the Free Space Map */
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 1609,1614 **** GetSnapshotData(Snapshot snapshot)
--- 1609,1620 ----
  	snapshot->regd_count = 0;
  	snapshot->copied = false;
  
+ 	/*
+ 	 * Capture the current WAL stream location in case this snapshot becomes
+ 	 * old enough to need to fall back on the special "old snapshot" logic.
+ 	 */
+ 	snapshot->lsn = GetXLogInsertRecPtr();
+ 
  	return snapshot;
  }
  
*** a/src/backend/utils/errcodes.txt
--- b/src/backend/utils/errcodes.txt
***************
*** 410,415 **** Section: Class 58 - System Error (errors external to PostgreSQL itself)
--- 410,419 ----
  58P01    E    ERRCODE_UNDEFINED_FILE                                         undefined_file
  58P02    E    ERRCODE_DUPLICATE_FILE                                         duplicate_file
  
+ Section: Class 72 - Snapshot Failure
+ # (class borrowed from Oracle)
+ 72000    E    ERRCODE_SNAPSHOT_TOO_OLD                                       snapshot_too_old
+ 
  Section: Class F0 - Configuration File Error
  
  # (PostgreSQL-specific error class)
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 2448,2453 **** static struct config_int ConfigureNamesInt[] =
--- 2448,2463 ----
  	},
  
  	{
+ 		{"old_snapshot_threshold", PGC_POSTMASTER, RESOURCES_ASYNCHRONOUS,
+ 			gettext_noop("The number of transaction IDs which must be consumed before a snapshot can be considered too old."),
+ 			gettext_noop("A value of -1 disables this feature.")
+ 		},
+ 		&old_snapshot_threshold,
+ 		-1, -1, 2000000000,
+ 		NULL, NULL, NULL
+ 	},
+ 
+ 	{
  		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
  			gettext_noop("Time between issuing TCP keepalives."),
  			gettext_noop("A value of 0 uses the system default."),
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
***************
*** 54,59 ****
--- 54,60 ----
  #include "storage/sinval.h"
  #include "utils/builtins.h"
  #include "utils/memutils.h"
+ #include "utils/rel.h"
  #include "utils/resowner_private.h"
  #include "utils/snapmgr.h"
  #include "utils/syscache.h"
***************
*** 61,66 ****
--- 62,73 ----
  
  
  /*
+  * GUC parameters
+  */
+ int			old_snapshot_threshold;
+ 
+ 
+ /*
   * CurrentSnapshot points to the only snapshot taken in transaction-snapshot
   * mode, and to the latest one taken in a read-committed transaction.
   * SecondarySnapshot is a snapshot that's always up-to-date as of the current
***************
*** 1065,1070 **** pg_export_snapshot(PG_FUNCTION_ARGS)
--- 1072,1104 ----
  
  
  /*
+  * TransactionIdLimitedForOldSnapshots -- apply old snapshot limit, if any
+  */
+ TransactionId
+ TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+ 									Relation relation)
+ {
+ 	if (TransactionIdIsNormal(recentXmin)
+ 		&& old_snapshot_threshold >= 0
+ 		&& RelationNeedsWAL(relation)
+ 		&& !IsCatalogRelation(relation)
+ 		&& !RelationIsAccessibleInLogicalDecoding(relation))
+ 	{
+ 		TransactionId xlimit;
+ 
+ 		xlimit = ShmemVariableCache->latestCompletedXid;
+ 		Assert(TransactionIdIsNormal(xlimit));
+ 		xlimit -= old_snapshot_threshold;
+ 		TransactionIdAdvance(xlimit);
+ 		if (NormalTransactionIdFollows(xlimit, recentXmin))
+ 			return xlimit;
+ 	}
+ 
+ 	return recentXmin;
+ }
+ 
+ 
+ /*
   * Parsing subroutines for ImportSnapshot: parse a line with the given
   * prefix followed by a value, and advance *s to the next line.  The
   * filename is provided for use in error messages.
*** a/src/include/access/nbtree.h
--- b/src/include/access/nbtree.h
***************
*** 669,685 **** extern int	_bt_pagedel(Relation rel, Buffer buf);
   */
  extern BTStack _bt_search(Relation rel,
  		   int keysz, ScanKey scankey, bool nextkey,
! 		   Buffer *bufP, int access);
  extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
  			  ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
! 			  int access);
  extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
  			ScanKey scankey, bool nextkey);
  extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
  			Page page, OffsetNumber offnum);
  extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
  extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
! extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
  
  /*
   * prototypes for functions in nbtutils.c
--- 669,686 ----
   */
  extern BTStack _bt_search(Relation rel,
  		   int keysz, ScanKey scankey, bool nextkey,
! 		   Buffer *bufP, int access, Snapshot snapshot);
  extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
  			  ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
! 			  int access, Snapshot snapshot);
  extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
  			ScanKey scankey, bool nextkey);
  extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
  			Page page, OffsetNumber offnum);
  extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
  extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
! extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
! 							   Snapshot snapshot);
  
  /*
   * prototypes for functions in nbtutils.c
*** a/src/include/utils/rel.h
--- b/src/include/utils/rel.h
***************
*** 15,20 ****
--- 15,21 ----
  #define REL_H
  
  #include "access/tupdesc.h"
+ #include "access/xlog.h"
  #include "catalog/pg_am.h"
  #include "catalog/pg_class.h"
  #include "catalog/pg_index.h"
*** a/src/include/utils/snapmgr.h
--- b/src/include/utils/snapmgr.h
***************
*** 14,21 ****
--- 14,41 ----
  #define SNAPMGR_H
  
  #include "fmgr.h"
+ #include "catalog/catalog.h"
  #include "utils/resowner.h"
  #include "utils/snapshot.h"
+ #include "utils/tqual.h"
+ 
+ 
+ #define TestForOldSnapshot(snapshot, relation, page) \
+ 	do { \
+ 		if (old_snapshot_threshold >= 0 \
+ 		 && ((snapshot) != NULL) \
+ 		 && (snapshot)->satisfies == HeapTupleSatisfiesMVCC \
+ 		 && !XLogRecPtrIsInvalid((snapshot)->lsn) \
+ 		 && PageGetLSN(page) > (snapshot)->lsn \
+ 		 && NormalTransactionIdFollows(TransactionIdLimitedForOldSnapshots((snapshot)->xmin, relation), (snapshot)->xmin)) \
+ 			ereport(ERROR, \
+ 					(errcode(ERRCODE_SNAPSHOT_TOO_OLD), \
+ 					 errmsg("snapshot too old"))); \
+ 	} while (0)
+ 
+ 
+ /* GUC variables */
+ extern int	old_snapshot_threshold;
  
  
  extern bool FirstSnapshotSet;
***************
*** 54,59 **** extern void ImportSnapshot(const char *idstr);
--- 74,81 ----
  extern bool XactHasExportedSnapshots(void);
  extern void DeleteAllExportedSnapshotFiles(void);
  extern bool ThereAreNoPriorRegisteredSnapshots(void);
+ extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
+ 														 Relation relation);
  
  extern char *ExportSnapshot(Snapshot snapshot);
  
*** a/src/include/utils/snapshot.h
--- b/src/include/utils/snapshot.h
***************
*** 14,19 ****
--- 14,20 ----
  #define SNAPSHOT_H
  
  #include "access/htup.h"
+ #include "access/xlogdefs.h"
  #include "lib/pairingheap.h"
  #include "storage/buf.h"
  
***************
*** 95,100 **** typedef struct SnapshotData
--- 96,103 ----
  	uint32		regd_count;		/* refcount on RegisteredSnapshots */
  
  	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
+ 
+ 	XLogRecPtr	lsn;			/* position in the WAL stream */
  } SnapshotData;
  
  /*
*** a/src/include/utils/tqual.h
--- b/src/include/utils/tqual.h
***************
*** 15,20 ****
--- 15,21 ----
  #ifndef TQUAL_H
  #define TQUAL_H
  
+ #include "utils/hsearch.h"
  #include "utils/snapshot.h"
  
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to