On 01.06.2013 23:21, Robert Haas wrote:
On Sat, Jun 1, 2013 at 2:48 PM, Heikki Linnakangas
<hlinnakan...@vmware.com>  wrote:
We define a new page-level bit, something like PD_RECENTLY_FROZEN.
When this bit is set, it means there are no unfrozen tuples on the
page with XIDs that predate the current half-epoch.  Whenever we know
this to be true, we set the bit.  If the page LSN crosses more than
one half-epoch boundary at a time, we freeze the page and set the bit.
   If the page LSN crosses exactly one half-epoch boundary, then (1) if
the bit is set, we clear it and (2) if the bit is not set, we freeze
the page and set the bit.

Yep, I think that would work. Want to write the patch, or should I? ;-)

Have at it.

Here's a first draft. A lot of stuff missing and broken, but "make check" passes :-).

In the patch, instead of working with "half-epochs", there are "XID-LSN ranges", which can be of arbitrary size. An XID-LSN range consists of three values:

minlsn: The point in WAL where this range begins.
minxid - maxxid: The range of XIDs allowed in this range.

Every point in WAL belongs to exactly one range. The minxid-maxxid of the ranges can overlap. For example:

1. XIDs 25000942 - 27000003 LSN 0/3BB9938
2. XIDs 23000742 - 26000003 LSN 0/2AB9288
3. XIDs 22000721 - 25000003 LSN 0/1AB8BE0
4. XIDs 22000002 - 24000003 LSN 0/10B1550

The invariant with the ranges is that a page with a certain LSN is only allowed to contain XIDs that belong to the range specified by that LSN. For example, if a page has LSN 0/3500000, it belongs to the 2nd range, and can only contain XIDs between 23000742 - 26000003. If a backend updates the page, so that it's LSN is updated to, say, 0/3D12345, all XIDs on the page older than 25000942 must be frozen first, to avoid violating the rule.

The system keeps track of a small number of these XID-LSN ranges. Where we currently truncate clog, we can also truncate the ranges with maxxid < the clog truncation point. Vacuum removes any dead tuples and updates relfrozenxid as usual, to make sure that there are no dead tuples or aborted XIDs older than the minxid of the oldest tracked XID-LSN range. It no longer needs to freeze old committed XIDs, however - that's the gain from this patch (except to uphold the invariant, if it has to remove some dead tuples on the page and update its LSN).

A new range is created whenever we reach the maxxid on the current one. The new range's minxid is set to the current global oldest xmin value, and maxxid is just the old maxxid plus a fixed number (1 million in the patch, but we probably want a higher value in reality). This ensures that if you modify a page and update its LSN, all the existing XIDs on the page that cannot be frozen yet are greater than the minxid of the latest range. In other words, you can always freeze old XIDs on a page, so that any remaining non-frozen XIDs are within the minxid-maxxid of the latest range.

The HeapTupleSatisfies functions are modified to look at the page's LSN first. If it's old enough, it doesn't look at the XIDs on the page level at all, and just considers everything on the page is visible to everyone (I'm calling this state a "mature page").

I think the tricky part is going to be figuring out the
synchronization around half-epoch boundaries.

Yep. I skipped all those difficult parts in this first version. There are two race conditions that need to be fixed:

1. When a page is updated, we check if it needs to be frozen. If its LSN is greater than the latest range's LSN. IOW, if we've already modified the page, and thus frozen all older tuples, within the current range. However, it's possible that a new range is created immediately after we've checked that. When we then proceed to do the actual update on the page and WAL-log that, the new LSN falls within the next range, and we should've frozen the page. I'm planning to fix that by adding a "parity bit" on the page header. Each XID-LSN range is assigned a parity bit, 0 or 1. When we check if a page needs to be frozen on update, we make note of the latest range's parity bit, and write it in the page header. Later, when we look at the page's LSN to determine which XID-LSN range it belongs to, we compare the parity. If the parity doesn't match, we know that the race condition happened, so we treat the page to belong to the previous range, not the one it would normally belong to, per the LSN.

2. When we look at a page, and determine that it's not old enough to be "matured", we then check the clog as usual. A page is considered mature, if the XID-LSN range (and corresponding clog pages) has already been truncated away. It's possible that between those steps, the XID-LSN range and clog is truncated away, so that the backend tries to access a clog page that doesn't exist anymore. To fix that, the XID-LSN range and clog truncation needs to be done in two steps. First, mark the truncation point in shared memory. Then somehow wait until all backends see the new value, and go ahead with actually truncating the clog only after that.


Aside from those two race conditions, there are plenty of scalability issues remaining. Currently, the shared XID-LSN range array is checked every time a page is accessed, so it could quickly become a bottleneck. Need to cache that information in each backend. Oh, and I didn't implement the PD_RECENTLY_FROZEN bit in the page header yet, so you will get a freezing frenzy right after a new XID-LSN range is created.

I'll keep hacking away on those things, but please let me know if you see some fatal flaw with this plan.

- Heikki
diff --git a/TODO-xidlsnranges.txt b/TODO-xidlsnranges.txt
new file mode 100644
index 0000000..fed0fe9
--- /dev/null
+++ b/TODO-xidlsnranges.txt
@@ -0,0 +1,6 @@
+* Handle MultiXactIds
+* Fix race condition between PageUpdateNeedsFreezing() and actual update. (guard/parity bit?)
+* Fix race condition between PageIsMature, which checks clogtruncationpoint, and truncating the clog
+   - a backend might think that a page is not mature, but the clog gets truncated before it checks the clog
+* Vacuum no longer needs to scan all-visible pages for freezing purposes.
+* implement Robert's PD_RECENTLY_FROZEN bit to avoid epidemic freezing
diff --git a/contrib/pg_xlogdump/rmgrdesc.c b/contrib/pg_xlogdump/rmgrdesc.c
index 13ab745..bc8337c 100644
--- a/contrib/pg_xlogdump/rmgrdesc.c
+++ b/contrib/pg_xlogdump/rmgrdesc.c
@@ -17,6 +17,7 @@
 #include "access/nbtree.h"
 #include "access/rmgr.h"
 #include "access/spgist.h"
+#include "access/varsup_internal.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e88dd30..655702c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -373,7 +373,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
 	 * full page write. Until we can prove that beyond doubt, let's check each
 	 * tuple for visibility the hard way.
 	 */
-	all_visible = PageIsAllVisible(dp) && !snapshot->takenDuringRecovery;
+	all_visible = (PageIsAllVisible(dp) && !snapshot->takenDuringRecovery) ||
+				  PageIsMature(dp);
 
 	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
 		 lineoff <= lines;
@@ -1746,7 +1747,7 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 		 * transactions.
 		 */
 		if (all_dead && *all_dead &&
-			!HeapTupleIsSurelyDead(heapTuple->t_data, RecentGlobalXmin))
+			!HeapTupleIsSurelyDead(heapTuple->t_data, RecentGlobalXmin, buffer))
 			*all_dead = false;
 
 		/*
@@ -2049,6 +2050,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 									   InvalidBuffer, options, bistate,
 									   &vmbuffer, NULL);
 
+	if (PageUpdateNeedsFreezing(BufferGetPage(buffer)))
+		heap_freeze_page(relation, buffer);
+
 	/* NO EREPORT(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
 
@@ -2299,6 +2303,9 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 										   &vmbuffer, NULL);
 		page = BufferGetPage(buffer);
 
+		if (PageUpdateNeedsFreezing(page))
+			heap_freeze_page(relation, buffer);
+
 		/* NO EREPORT(ERROR) from here till changes are logged */
 		START_CRIT_SECTION();
 
@@ -2558,6 +2565,9 @@ heap_delete(Relation relation, ItemPointer tid,
 
 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 
+	if (PageUpdateNeedsFreezing(page))
+		heap_freeze_page(relation, buffer);
+
 	/*
 	 * If we didn't pin the visibility map page and the page has become all
 	 * visible while we were busy locking the buffer, we'll have to unlock and
@@ -3488,6 +3498,12 @@ l2:
 		PageSetFull(page);
 	}
 
+	if (PageUpdateNeedsFreezing(page))
+		heap_freeze_page(relation, buffer);
+
+	if (newbuf != buffer && PageUpdateNeedsFreezing(BufferGetPage(newbuf)))
+		heap_freeze_page(relation, newbuf);
+
 	/* NO EREPORT(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
 
@@ -4405,6 +4421,9 @@ failed:
 							  GetCurrentTransactionId(), mode, false,
 							  &xid, &new_infomask, &new_infomask2);
 
+	if (PageUpdateNeedsFreezing(page))
+		heap_freeze_page(relation, *buffer);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -4856,6 +4875,9 @@ l4:
 								  xid, mode, false,
 								  &new_xmax, &new_infomask, &new_infomask2);
 
+		if (PageUpdateNeedsFreezing(BufferGetPage(buf)))
+			heap_freeze_page(rel, buf);
+
 		START_CRIT_SECTION();
 
 		/* ... and set them */
@@ -5004,6 +5026,9 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 	if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff)
 		elog(ERROR, "heap_inplace_update: wrong tuple length");
 
+	if (PageUpdateNeedsFreezing(page))
+		heap_freeze_page(relation, buffer);
+
 	/* NO EREPORT(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
 
@@ -5053,6 +5078,102 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
 		CacheInvalidateHeapTuple(relation, tuple, NULL);
 }
 
+/*
+ *  heap_freeze_page - freeze old tuples on page, on an update
+ *
+ * When we're about to update a page that was last modified a long time ago,
+ * we might need to first freeze existing tuples on the page. This ensures that
+ * even after the update, the invariant that all XIDs on the page are within
+ * the range specified by the XidLSNRange entry that the page's LSN falls into.
+ *
+ * Caller must hold an exclusive lock on the page.
+ */
+void
+heap_freeze_page(Relation relation, Buffer buf)
+{
+	Page		page = BufferGetPage(buf);
+	OffsetNumber off;
+	OffsetNumber maxoff;
+	int			nfrozen;
+	OffsetNumber frozen[MaxHeapTuplesPerPage];
+	TransactionId FreezeLimit;
+	MultiXactId MultiXactFrzLimit;
+
+	/*
+	 * Calculate freeze limits based on the old LSN on the page. If the page
+	 * is old enough, we will just consider everything as committed and visible
+	 * to everyone, and freeze everything. Otherwise, freeze everything up to
+	 * the latest XID-LSN range's minxid, so that after the upcoming update,
+	 * all XIDs on the page will fall within the allowed range of the latest
+	 * range.
+	 */
+	if (PageIsMature(page))
+	{
+		FreezeLimit = InvalidTransactionId;
+		MultiXactFrzLimit = InvalidMultiXactId;
+	}
+	else
+	{
+		FreezeLimit = GetLatestRangeXmin();
+		MultiXactFrzLimit = InvalidMultiXactId; /* FIXME */
+	}
+
+	nfrozen = 0;
+	maxoff = PageGetMaxOffsetNumber(page);
+	for (off = FirstOffsetNumber; off <= maxoff; off++)
+	{
+		HeapTupleData tuple;
+		ItemId		itemid;
+
+		itemid = PageGetItemId(page, off);
+
+		if (ItemIdIsNormal(itemid))
+		{
+			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+			tuple.t_len = ItemIdGetLength(itemid);
+
+			switch (HeapTupleSatisfiesVacuum(tuple.t_data, FreezeLimit, buf))
+			{
+				case HEAPTUPLE_DEAD:
+				case HEAPTUPLE_RECENTLY_DEAD:
+				case HEAPTUPLE_INSERT_IN_PROGRESS:
+				case HEAPTUPLE_DELETE_IN_PROGRESS:
+					/*
+					 * The page contains a dead tuple, or something recent
+					 * that might become dead, anyway. That needs to be
+					 * vacuumed or pruned away, before this page can be left
+					 * to linger. The next vacuum will do that, We need to
+					 * look no further.
+					 */
+					return;
+
+				case HEAPTUPLE_LIVE:
+					if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
+									      MultiXactFrzLimit))
+						frozen[nfrozen++] = off;
+					break;
+			}
+		}
+	}
+
+	/*
+	 * If we froze any tuples, mark the buffer dirty, and write a WAL
+	 * record recording the changes.  We must log the changes to be
+	 * crash-safe against future truncation of CLOG.
+	 */
+	if (nfrozen > 0)
+	{
+		MarkBufferDirty(buf);
+		if (RelationNeedsWAL(relation))
+		{
+			XLogRecPtr	recptr;
+
+			recptr = log_heap_freeze(relation, buf, InvalidTransactionId,
+										InvalidMultiXactId, frozen, nfrozen);
+			PageSetLSN(page, recptr);
+		}
+	}
+}
 
 /*
  * heap_freeze_tuple
@@ -5079,6 +5200,10 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
  * Similarly, cutoff_multi must be less than or equal to the smallest
  * MultiXactId used by any transaction currently open.
  *
+ * An invalid cutoff_xid is taken to mean that the tuple should be frozen
+ * without checking the age of the xmin/xmax. In that case, the caller knows
+ * through some other means that the tuple is live and visible to everyone.
+ *
  * If the tuple is in a shared buffer, caller must hold an exclusive lock on
  * that buffer.
  *
@@ -5095,12 +5220,13 @@ bool
 heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
 				  MultiXactId cutoff_multi)
 {
+	bool		force = (cutoff_xid == InvalidTransactionId);
 	bool		changed = false;
 	TransactionId xid;
 
 	xid = HeapTupleHeaderGetXmin(tuple);
 	if (TransactionIdIsNormal(xid) &&
-		TransactionIdPrecedes(xid, cutoff_xid))
+		(force || TransactionIdPrecedes(xid, cutoff_xid)))
 	{
 		HeapTupleHeaderSetXmin(tuple, FrozenTransactionId);
 
@@ -5121,9 +5247,9 @@ heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
 	xid = HeapTupleHeaderGetRawXmax(tuple);
 	if ((tuple->t_infomask & HEAP_XMAX_IS_MULTI) ?
 		(MultiXactIdIsValid(xid) &&
-		 MultiXactIdPrecedes(xid, cutoff_multi)) :
+		 (force || MultiXactIdPrecedes(xid, cutoff_multi))) :
 		(TransactionIdIsNormal(xid) &&
-		 TransactionIdPrecedes(xid, cutoff_xid)))
+		 (force || TransactionIdPrecedes(xid, cutoff_xid))))
 	{
 		HeapTupleHeaderSetXmax(tuple, InvalidTransactionId);
 
@@ -5147,7 +5273,7 @@ heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
 	{
 		xid = HeapTupleHeaderGetXvac(tuple);
 		if (TransactionIdIsNormal(xid) &&
-			TransactionIdPrecedes(xid, cutoff_xid))
+			(force || TransactionIdPrecedes(xid, cutoff_xid)))
 		{
 			/*
 			 * If a MOVED_OFF tuple is not dead, the xvac transaction must
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 2ab723d..632d1a5 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -206,6 +206,10 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
 	/* Have we found any prunable items? */
 	if (prstate.nredirected > 0 || prstate.ndead > 0 || prstate.nunused > 0)
 	{
+		/* Before modifying the page, freeze it if necessary */
+		if (PageUpdateNeedsFreezing(BufferGetPage(buffer)))
+			heap_freeze_page(relation, buffer);
+
 		/*
 		 * Apply the planned item changes, then repair page fragmentation, and
 		 * update the page's hint bit about whether it has free line pointers.
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index 7d092d2..ba48400 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -10,6 +10,6 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = clogdesc.o dbasedesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \
 	   mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
-	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
+	   standbydesc.o tblspcdesc.o varsupdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/varsupdesc.c b/src/backend/access/rmgrdesc/varsupdesc.c
new file mode 100644
index 0000000..a13c03f
--- /dev/null
+++ b/src/backend/access/rmgrdesc/varsupdesc.c
@@ -0,0 +1,59 @@
+/*-------------------------------------------------------------------------
+ *
+ * varsupdesc.c
+ *	  rmgr descriptor routines for access/transam/varsup.c
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/varsupdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/transam.h"
+#include "access/varsup_internal.h"
+#include "access/xlog.h"
+#include "catalog/pg_control.h"
+
+void
+varsup_desc(StringInfo buf, uint8 xl_info, char *rec)
+{
+	uint8		info = xl_info & ~XLR_INFO_MASK;
+
+	if (info == VARSUP_NEXTOID)
+	{
+		Oid			nextOid;
+
+		memcpy(&nextOid, rec, sizeof(Oid));
+		appendStringInfo(buf, "nextOid: %u", nextOid);
+	}
+	else if (info == VARSUP_XID_LSN_RANGES)
+	{
+		uint32	numranges;
+		int		i;
+
+		/* use memcpy because xlog data can be unaligned */
+		memcpy(&numranges, rec, sizeof(uint32));
+		rec += sizeof(uint32);
+
+		appendStringInfo(buf, "xid-LSN ranges: %d ranges:", numranges);
+
+		for (i = 0; i < numranges; i++)
+		{
+			XidLSNRange range;
+
+			memcpy(&range, rec, sizeof(XidLSNRange));
+			rec += sizeof(XidLSNRange);
+			appendStringInfo(buf, " %u - %u LSN %X/%X",
+							 range.minxid, range.maxxid,
+							 (uint32) (range.minlsn >> 32),
+							 (uint32) (range.minlsn));
+		}
+	}
+	else
+		appendStringInfo(buf, "UNKNOWN");
+}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 2bad527..d50cd8c 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -64,13 +64,6 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
 	{
 		appendStringInfo(buf, "xlog no-op");
 	}
-	else if (info == XLOG_NEXTOID)
-	{
-		Oid			nextOid;
-
-		memcpy(&nextOid, rec, sizeof(Oid));
-		appendStringInfo(buf, "nextOid: %u", nextOid);
-	}
 	else if (info == XLOG_SWITCH)
 	{
 		appendStringInfo(buf, "xlog switch");
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 41d4379..ccf5298 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -15,6 +15,7 @@
 #include "access/multixact.h"
 #include "access/nbtree.h"
 #include "access/spgist.h"
+#include "access/varsup_internal.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/storage_xlog.h"
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 0579c84..7ba05bf 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -13,14 +13,21 @@
 
 #include "postgres.h"
 
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
 #include "access/clog.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
+#include "access/varsup_internal.h"
 #include "access/xact.h"
 #include "commands/dbcommands.h"
 #include "miscadmin.h"
 #include "postmaster/autovacuum.h"
+#include "storage/fd.h"
 #include "storage/pmsignal.h"
+#include "storage/procarray.h"
 #include "storage/proc.h"
 #include "utils/syscache.h"
 
@@ -32,6 +39,10 @@
 VariableCache ShmemVariableCache = NULL;
 
 
+static void NewXidLSNRange(void);
+static XLogRecPtr XLogPutXidLSNRanges(void);
+static void XLogPutNextOid(Oid nextOid);
+
 /*
  * Allocate the next XID for a new transaction or subtransaction.
  *
@@ -163,6 +174,16 @@ GetNewTransactionId(bool isSubXact)
 	ExtendSUBTRANS(xid);
 
 	/*
+	 * Create a new XID-LSN range, if we've consumed all the XIDs from the
+	 * current range.
+	 */
+	if (TransactionIdFollowsOrEquals(ShmemVariableCache->nextXid,
+									 ShmemVariableCache->xidlsnranges[0].maxxid))
+	{
+		NewXidLSNRange();
+	}
+
+	/*
 	 * Now advance the nextXid counter.  This must not happen until after we
 	 * have successfully completed ExtendCLOG() --- if that routine fails, we
 	 * want the next incoming transaction to try it again.	We cannot assign
@@ -249,6 +270,152 @@ ReadNewTransactionId(void)
 }
 
 /*
+ * Truncate the XID-LSN ranges.
+ *
+ * On entry, *truncateXID is the oldest XID still needed by the system.
+ * Older XIDs can still be present on-disk, but only committed ones. Any aborted
+ * XIDs have been vacuumed away.
+ *
+ * This function scans and trims the XID ranges table, and returns the min
+ * XID of the newest range that includes *truncateXID. After the call, any pages
+ * still containing XIDs older than the returned value will be considered
+ * frozen by PageIsFrozen().
+ */
+void
+TruncateXidLSNRange(TransactionId *truncateXID)
+{
+	TransactionId oldestXact = *truncateXID;
+	XidLSNRange *lastrange = NULL;
+	int			numranges;
+
+	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+
+	numranges = ShmemVariableCache->numranges;
+	for (numranges = ShmemVariableCache->numranges; numranges > 0; numranges--)
+	{
+		lastrange = &ShmemVariableCache->xidlsnranges[numranges - 1];
+		if (TransactionIdFollowsOrEquals(lastrange->maxxid, oldestXact))
+			break;
+	}
+	if (numranges == 0)
+		elog(ERROR, "cannot truncate all XID LSN ranges");
+
+	if (numranges == ShmemVariableCache->numranges)
+	{
+		/* can't truncate the oldest range yet */
+		elog(DEBUG1, "failed to truncate XID LSN ranges %d, oldest %u - %u, LSN %X/%X, OldestXact: %u",
+			numranges,
+			lastrange->minxid, lastrange->maxxid,
+			(uint32) (lastrange->minlsn >> 32), (uint32) (lastrange->minlsn),
+			oldestXact);
+	}
+	else
+	{
+		elog(LOG, "truncated XID LSN ranges from %d to %d ranges, oldest %u - %u, LSN %X/%X, OldestXact: %u",
+			ShmemVariableCache->numranges, numranges,
+			lastrange->minxid, lastrange->maxxid,
+			(uint32) (lastrange->minlsn >> 32), (uint32) (lastrange->minlsn),
+			oldestXact);
+
+		ShmemVariableCache->numranges = numranges;
+		ShmemVariableCache->clogtruncationpoint = lastrange->minlsn;
+
+		ShmemVariableCache->xidlsnranges_dirty = true;
+		ShmemVariableCache->xidlsnranges_recently_dirtied = true;
+
+		(void ) XLogPutXidLSNRanges();
+	}
+
+	/*
+	 * Set return value. The CLOG can be truncated up to the minxid of the
+	 * oldest remaining range, but not further than that.
+	 */
+	*truncateXID = lastrange->minxid;
+
+	LWLockRelease(XidGenLock);
+}
+
+/*
+ * Returns the min-xid of the currently open XID range.
+ *
+ * What that means is that if you were to update a page and set its LSN now,
+ * the page shall not contain any XIDs older than GetLatestRangeXmin(), or they
+ * would be outside the range implied by the new LSN.
+ *
+ * NewXidLSNRange() ensures that the min XID value of each range is older than
+ * the global OldestXmin value, so any XIDs older than what this function
+ * returns can be frozen. That ensures that when you update a page, you can
+ * always freeze old tuples so that all remaining XIDs present on the page and
+ * any new XIDs all fall within the min and max of the currently open XID
+ * range.
+ */
+TransactionId
+GetLatestRangeXmin(void)
+{
+	TransactionId result;
+
+	LWLockAcquire(XidGenLock, LW_SHARED);
+
+	result = ShmemVariableCache->xidlsnranges[0].minxid;
+
+	LWLockRelease(XidGenLock);
+
+	return result;
+}
+
+/*
+ * Begin a new XID-LSN range.
+ *
+ * Caller must hold XidGenLock.
+ */
+static void
+NewXidLSNRange(void)
+{
+	TransactionId minxid;
+	TransactionId maxxid;
+	XLogRecPtr	lsn;
+	int			numranges;
+
+	elog(LOG, "exhausted XID range %u - %u (LSN %X/%X)",
+			ShmemVariableCache->xidlsnranges[0].minxid,
+			ShmemVariableCache->xidlsnranges[0].maxxid,
+			(uint32) (ShmemVariableCache->xidlsnranges[0].minlsn >> 32),
+			(uint32) (ShmemVariableCache->xidlsnranges[0].minlsn));
+
+	/* Shift old ranges one slot down */
+	numranges = ShmemVariableCache->numranges;
+	if (numranges >= NUM_XID_LSN_RANGES)
+		elog(ERROR, "out of XID-LSN range slots");
+	numranges++;
+	memmove(&ShmemVariableCache->xidlsnranges[1],
+			&ShmemVariableCache->xidlsnranges[0],
+			sizeof(XidLSNRange) * (numranges - 1));
+
+#define XID_LSN_RANGE_SIZE (1000000)
+
+	/* Create new range */
+	minxid = GetOldestXmin(false, true);
+	maxxid = ShmemVariableCache->xidlsnranges[0].maxxid + XID_LSN_RANGE_SIZE;
+	if (!TransactionIdIsNormal(ShmemVariableCache->xidlsnranges[1].maxxid))
+		maxxid = FirstNormalTransactionId;
+
+	ShmemVariableCache->xidlsnranges[0].minxid = minxid;
+	ShmemVariableCache->xidlsnranges[0].maxxid = maxxid;
+	ShmemVariableCache->xidlsnranges[0].minlsn = InvalidXLogRecPtr;
+	ShmemVariableCache->numranges = numranges;
+
+	lsn = XLogPutXidLSNRanges();
+
+	ShmemVariableCache->xidlsnranges[0].minlsn = lsn;
+
+	ShmemVariableCache->xidlsnranges_dirty = true;
+	ShmemVariableCache->xidlsnranges_recently_dirtied = true;
+
+	elog(LOG, "started new XID range %u - %u (LSN %X/%X)",
+		minxid, maxxid, (uint32) (lsn >> 32), (uint32) (lsn));
+}
+
+/*
  * Determine the last safe XID to allocate given the currently oldest
  * datfrozenxid (ie, the oldest XID that might exist in any database
  * of our cluster), and the OID of the (or a) database with that value.
@@ -420,6 +587,275 @@ ForceTransactionIdLimitUpdate(void)
 	return false;
 }
 
+/*
+ * Returns true if the page is so old that all tuples on the page must be
+ * visible. The XIDs on a mature page must not be looked at, because we might
+ * already have wrapped around the XID horizon, even multiple times over, since
+ * they were written.
+ */
+bool
+PageIsMature(Page dp)
+{
+	XLogRecPtr clogtruncationpoint;
+	XLogRecPtr pagelsn;
+
+	LWLockAcquire(XidGenLock, LW_SHARED);
+	clogtruncationpoint = ShmemVariableCache->clogtruncationpoint;
+	LWLockRelease(XidGenLock);
+
+	pagelsn = PageGetLSN(dp);
+	if (pagelsn < clogtruncationpoint)
+	{
+		elog(DEBUG2, "page with LSN %X/%X is frozen (clog truncated at %X/%X)",
+			 (uint32) (pagelsn >> 32), (uint32) pagelsn,
+			 (uint32) (clogtruncationpoint >> 32), (uint32) (clogtruncationpoint));
+		return true;
+	}
+	else
+		return false;
+}
+
+/*
+ * Does the given page need to be frozen before it can be modified?
+ *
+ * This should be called while holding a lock on the page.
+ */
+bool
+PageUpdateNeedsFreezing(Page dp)
+{
+	/* Freeze page whenever LSN crosses a range boundary */
+	XLogRecPtr recentboundary;
+	XLogRecPtr pagelsn;
+
+	LWLockAcquire(XidGenLock, LW_SHARED);
+	recentboundary = ShmemVariableCache->xidlsnranges[0].minlsn;
+	LWLockRelease(XidGenLock);
+
+	/*
+	 * Return true, unless the old LSN on the page falls within the latest
+	 * XID-LSN range.
+	 */
+	pagelsn = PageGetLSN(dp);
+	if (pagelsn < recentboundary)
+	{
+		elog(DEBUG2, "page with LSN %X/%X needs freezing (boundary %X/%X)",
+			 (uint32) (pagelsn >> 32), (uint32) pagelsn,
+			 (uint32) (recentboundary >> 32), (uint32) (recentboundary));
+		return true;
+	}
+	else
+		return false;
+}
+
+/*
+ * XID-LSN range file management.
+ */
+
+#define XID_LSN_RANGES_FILENAME		"global/pg_xidlsnranges"
+
+#define XID_LSN_RANGES_FILEMAGIC		0x255cb384		/* version ID value */
+
+typedef struct
+{
+	int32		magic;
+	pg_crc32	crc;
+	int32		numranges;
+	XidLSNRange ranges[1]; /* VARIABLE LENGTH ARRAY */
+} XidLSNRangesFile;
+
+#define SizeOfXidLSNRangesFile(numranges) \
+	(offsetof(XidLSNRangesFile, ranges) + (numranges) * sizeof(XidLSNRange))
+
+static void
+WriteXidLSNRangesFile(void)
+{
+	int			fd;
+	XidLSNRangesFile *content;
+	int			numranges;
+	pg_crc32	crc;
+	off_t		sz;
+
+	/* Copy to local memory first, to avoid holding the lock for a long time */
+	LWLockAcquire(XidGenLock, LW_SHARED);
+	numranges = ShmemVariableCache->numranges;
+	sz = SizeOfXidLSNRangesFile(numranges);
+	content = palloc0(sz);
+	memcpy(content->ranges, ShmemVariableCache->xidlsnranges,
+		   sizeof(XidLSNRange) * numranges);
+	LWLockRelease(XidGenLock);
+
+	Assert(numranges > 0);
+
+	elog(LOG, "writing XID LSN ranges file with %d ranges", numranges);
+
+	content->magic = XID_LSN_RANGES_FILEMAGIC;
+	content->numranges = numranges;
+	content->crc = 0;
+
+	INIT_CRC32(crc);
+	COMP_CRC32(crc, (char *) content, sz);
+	FIN_CRC32(crc);
+
+	content->crc = crc;
+
+	fd = OpenTransientFile(XID_LSN_RANGES_FILENAME,
+						   O_WRONLY | O_CREAT | PG_BINARY,
+						   S_IRUSR | S_IWUSR);
+	if (fd < 0)
+		ereport(FATAL,
+				(errcode_for_file_access(),
+				 errmsg("could not open XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+	if (write(fd, content, sz) != sz)
+	{
+		/* if write didn't set errno, assume problem is no disk space */
+		if (errno == 0)
+			errno = ENOSPC;
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not write to XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+	}
+	if (pg_fsync(fd) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not fsync XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+
+	if (CloseTransientFile(fd))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not close XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+
+	pfree(content);
+}
+
+static void
+LoadXidLSNRangesFile(void)
+{
+	int			fd;
+	XidLSNRangesFile *content;
+	pg_crc32	crc;
+	pg_crc32	filecrc;
+	off_t		sz;
+	off_t		ret;
+
+	fd = OpenTransientFile(XID_LSN_RANGES_FILENAME, O_RDONLY | PG_BINARY,
+						   S_IRUSR | S_IWUSR);
+	if (fd < 0)
+		ereport(FATAL,
+				(errcode_for_file_access(),
+				 errmsg("could not open XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+	/* check file size */
+	sz = lseek(fd, 0, SEEK_END);
+	if (sz < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not seek in XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+	if (sz < sizeof(XidLSNRangesFile))
+		ereport(ERROR,
+				(errmsg("XID LSN range file \"%s\" has invalid size: %d",
+						XID_LSN_RANGES_FILENAME, (int) sz)));
+	/* rewind back to beginning */
+	ret = lseek(fd, 0, SEEK_SET);
+	if (ret < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not seek in XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+
+	/* slurp the file into memory */
+	content = palloc(sz);
+	if (read(fd, content, sz) != sz)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+
+	/* don't need the file anymore */
+	if (CloseTransientFile(fd))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not close XID LSN range file \"%s\": %m",
+						XID_LSN_RANGES_FILENAME)));
+
+	/* Sanity check the file */
+	if (content->magic != XID_LSN_RANGES_FILEMAGIC || content->numranges < 0)
+		ereport(ERROR,
+				(errmsg("XID LSN range file \"%s\" contains invalid data: %X",
+						XID_LSN_RANGES_FILENAME, content->magic)));
+	if (SizeOfXidLSNRangesFile(content->numranges) != sz)
+		ereport(ERROR,
+				(errmsg("XID LSN range file \"%s\" has invalid size",
+						XID_LSN_RANGES_FILENAME)));
+	filecrc = content->crc;
+	content->crc = 0;
+	INIT_CRC32(crc);
+	COMP_CRC32(crc, (char *) content, sz);
+	FIN_CRC32(crc);
+	if (filecrc != crc)
+		ereport(ERROR,
+				(errmsg("XID LSN range file \"%s\" contain invalid checksum",
+						XID_LSN_RANGES_FILENAME)));
+
+	/* The contents seem to be valid. Load into shared memory. */
+	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+	memcpy(ShmemVariableCache->xidlsnranges, content->ranges,
+		   sizeof(XidLSNRange) * content->numranges);
+	ShmemVariableCache->numranges = content->numranges;
+	ShmemVariableCache->clogtruncationpoint = content->ranges[content->numranges - 1].minlsn;
+
+	ShmemVariableCache->xidlsnranges_dirty = false;
+	ShmemVariableCache->xidlsnranges_recently_dirtied = false;
+	LWLockRelease(XidGenLock);
+
+	elog(LOG, "loaded XID LSN ranges file with %d ranges", content->numranges);
+}
+
+void
+BootStrapVarsup(void)
+{
+	WriteXidLSNRangesFile();
+}
+
+void
+StartupVarsup(void)
+{
+	LoadXidLSNRangesFile();
+}
+
+/*
+ * Flushes LSN ranges to disk on a checkpoint.
+ */
+void
+CheckPointVarsup(void)
+{
+	bool dirty;
+
+	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+	dirty = ShmemVariableCache->xidlsnranges_dirty;
+	ShmemVariableCache->xidlsnranges_recently_dirtied = false;
+	LWLockRelease(XidGenLock);
+
+	if (dirty)
+	{
+		WriteXidLSNRangesFile();
+
+		/*
+		 * Clear the dirty flag, unless someone modified the ranges again,
+		 * while we were writing them. What we wrote is still valid and is
+		 * enough for this checkpoint, but keep the dirty flag so that we'll
+		 * write out the new changes on next checkpoint.
+		 */
+		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+		if (!ShmemVariableCache->xidlsnranges_recently_dirtied)
+			ShmemVariableCache->xidlsnranges_dirty = false;
+		LWLockRelease(XidGenLock);
+	}
+}
 
 /*
  * GetNewObjectId -- allocate a new OID
@@ -494,3 +930,114 @@ GetNewObjectId(void)
 
 	return result;
 }
+
+/*
+ * Log the current XID-LSN ranges.
+ */
+static XLogRecPtr
+XLogPutXidLSNRanges(void)
+{
+	XLogRecData rdata[2];
+	uint32 numranges = ShmemVariableCache->numranges;
+
+	rdata[0].data = (char *) (&numranges);
+	rdata[0].len = sizeof(uint32);
+	rdata[0].buffer = InvalidBuffer;
+	rdata[0].next = &rdata[1];
+
+	rdata[1].data = (char *) (ShmemVariableCache->xidlsnranges);
+	rdata[1].len = sizeof(XidLSNRange) * numranges;
+	rdata[1].buffer = InvalidBuffer;
+	rdata[1].next = NULL;
+	return XLogInsert(RM_VARSUP_ID, VARSUP_XID_LSN_RANGES, rdata);
+}
+
+/*
+ * Write a NEXTOID log record
+ */
+static void
+XLogPutNextOid(Oid nextOid)
+{
+	XLogRecData rdata;
+
+	rdata.data = (char *) (&nextOid);
+	rdata.len = sizeof(Oid);
+	rdata.buffer = InvalidBuffer;
+	rdata.next = NULL;
+	(void) XLogInsert(RM_VARSUP_ID, VARSUP_NEXTOID, &rdata);
+
+	/*
+	 * We need not flush the NEXTOID record immediately, because any of the
+	 * just-allocated OIDs could only reach disk as part of a tuple insert or
+	 * update that would have its own XLOG record that must follow the NEXTOID
+	 * record.	Therefore, the standard buffer LSN interlock applied to those
+	 * records will ensure no such OID reaches disk before the NEXTOID record
+	 * does.
+	 *
+	 * Note, however, that the above statement only covers state "within" the
+	 * database.  When we use a generated OID as a file or directory name, we
+	 * are in a sense violating the basic WAL rule, because that filesystem
+	 * change may reach disk before the NEXTOID WAL record does.  The impact
+	 * of this is that if a database crash occurs immediately afterward, we
+	 * might after restart re-generate the same OID and find that it conflicts
+	 * with the leftover file or directory.  But since for safety's sake we
+	 * always loop until finding a nonconflicting filename, this poses no real
+	 * problem in practice. See pgsql-hackers discussion 27-Sep-2006.
+	 */
+}
+
+/*
+ * Varsup resource manager's routines
+ */
+void
+varsup_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+	uint8		info = record->xl_info & ~XLR_INFO_MASK;
+
+	/* Backup blocks are not used by XLOG rmgr */
+	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
+	if (info == VARSUP_NEXTOID)
+	{
+		Oid			nextOid;
+
+		/*
+		 * We used to try to take the maximum of ShmemVariableCache->nextOid
+		 * and the recorded nextOid, but that fails if the OID counter wraps
+		 * around.	Since no OID allocation should be happening during replay
+		 * anyway, better to just believe the record exactly.  We still take
+		 * OidGenLock while setting the variable, just in case.
+		 */
+		memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
+		LWLockAcquire(OidGenLock, LW_EXCLUSIVE);
+		ShmemVariableCache->nextOid = nextOid;
+		ShmemVariableCache->oidCount = 0;
+		LWLockRelease(OidGenLock);
+	}
+	else if (info == VARSUP_XID_LSN_RANGES)
+	{
+		char   *p = XLogRecGetData(record);
+		uint32	numranges;
+
+		memcpy(&numranges, p, sizeof(uint32));
+		p += sizeof(uint32);
+
+		LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+		memcpy(ShmemVariableCache->xidlsnranges, p, numranges * sizeof(XidLSNRange));
+
+		/*
+		 * Invalid LSN means that a new range was logged, and it's LSN is the
+		 * LSN of the WAL record itself. It can't be included in the payload
+		 * because you don't know the LSN of a record until you insert it.
+		 */
+		if (ShmemVariableCache->xidlsnranges[0].minlsn == InvalidXLogRecPtr)
+			ShmemVariableCache->xidlsnranges[0].minlsn = lsn;
+
+		ShmemVariableCache->numranges = numranges;
+		ShmemVariableCache->clogtruncationpoint = ShmemVariableCache->xidlsnranges[numranges - 1].minlsn;
+		LWLockRelease(XidGenLock);
+
+		/* update the file on disk immediately. */
+		WriteXidLSNRangesFile();
+	}
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0a573f7..596008d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4052,9 +4052,13 @@ BootStrapXLOG(void)
 	checkPoint.time = (pg_time_t) time(NULL);
 	checkPoint.oldestActiveXid = InvalidTransactionId;
 
-	ShmemVariableCache->nextXid = checkPoint.nextXid;
-	ShmemVariableCache->nextOid = checkPoint.nextOid;
-	ShmemVariableCache->oidCount = 0;
+	/* XXX: perhaps this should be in BootStrapVarsup() */
+	ShmemVariableCache->xidlsnranges[0].minlsn = 0;
+	ShmemVariableCache->xidlsnranges[0].minxid = FirstNormalTransactionId;
+	ShmemVariableCache->xidlsnranges[0].maxxid = FirstNormalTransactionId + 1000000;
+	ShmemVariableCache->numranges = 1;
+	ShmemVariableCache->clogtruncationpoint = 0;
+
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
@@ -4137,6 +4141,7 @@ BootStrapXLOG(void)
 
 	/* Bootstrap the commit log, too */
 	BootStrapCLOG();
+	BootStrapVarsup();
 	BootStrapSUBTRANS();
 	BootStrapMultiXact();
 
@@ -5393,6 +5398,7 @@ StartupXLOG(void)
 			 * maintained during recovery and need not be started yet.
 			 */
 			StartupCLOG();
+			StartupVarsup();
 			StartupSUBTRANS(oldestActiveXID);
 
 			/*
@@ -6049,6 +6055,7 @@ StartupXLOG(void)
 	if (standbyState == STANDBY_DISABLED)
 	{
 		StartupCLOG();
+		StartupVarsup();
 		StartupSUBTRANS(oldestActiveXID);
 	}
 
@@ -7245,6 +7252,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 	CheckPointCLOG();
 	CheckPointSUBTRANS();
 	CheckPointMultiXact();
+	CheckPointVarsup();
 	CheckPointPredicate();
 	CheckPointRelationMap();
 	CheckPointBuffers(flags);	/* performs all required fsyncs */
@@ -7572,40 +7580,6 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 }
 
 /*
- * Write a NEXTOID log record
- */
-void
-XLogPutNextOid(Oid nextOid)
-{
-	XLogRecData rdata;
-
-	rdata.data = (char *) (&nextOid);
-	rdata.len = sizeof(Oid);
-	rdata.buffer = InvalidBuffer;
-	rdata.next = NULL;
-	(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
-
-	/*
-	 * We need not flush the NEXTOID record immediately, because any of the
-	 * just-allocated OIDs could only reach disk as part of a tuple insert or
-	 * update that would have its own XLOG record that must follow the NEXTOID
-	 * record.	Therefore, the standard buffer LSN interlock applied to those
-	 * records will ensure no such OID reaches disk before the NEXTOID record
-	 * does.
-	 *
-	 * Note, however, that the above statement only covers state "within" the
-	 * database.  When we use a generated OID as a file or directory name, we
-	 * are in a sense violating the basic WAL rule, because that filesystem
-	 * change may reach disk before the NEXTOID WAL record does.  The impact
-	 * of this is that if a database crash occurs immediately afterward, we
-	 * might after restart re-generate the same OID and find that it conflicts
-	 * with the leftover file or directory.  But since for safety's sake we
-	 * always loop until finding a nonconflicting filename, this poses no real
-	 * problem in practice. See pgsql-hackers discussion 27-Sep-2006.
-	 */
-}
-
-/*
  * Write an XLOG SWITCH record.
  *
  * Here we just blindly issue an XLogInsert request for the record.
@@ -7916,24 +7890,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 	/* Backup blocks are not used by XLOG rmgr */
 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
-	if (info == XLOG_NEXTOID)
-	{
-		Oid			nextOid;
-
-		/*
-		 * We used to try to take the maximum of ShmemVariableCache->nextOid
-		 * and the recorded nextOid, but that fails if the OID counter wraps
-		 * around.	Since no OID allocation should be happening during replay
-		 * anyway, better to just believe the record exactly.  We still take
-		 * OidGenLock while setting the variable, just in case.
-		 */
-		memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
-		LWLockAcquire(OidGenLock, LW_EXCLUSIVE);
-		ShmemVariableCache->nextOid = nextOid;
-		ShmemVariableCache->oidCount = 0;
-		LWLockRelease(OidGenLock);
-	}
-	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
+	if (info == XLOG_CHECKPOINT_SHUTDOWN)
 	{
 		CheckPoint	checkPoint;
 
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 641c740..0d96fcc 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -833,6 +833,7 @@ vac_truncate_clog(TransactionId frozenXID, MultiXactId frozenMulti)
 	Oid			oldestxid_datoid;
 	Oid			oldestmulti_datoid;
 	bool		frozenAlreadyWrapped = false;
+	TransactionId truncateXID;
 
 	/* init oldest datoids to sync with my frozen values */
 	oldestxid_datoid = MyDatabaseId;
@@ -894,8 +895,11 @@ vac_truncate_clog(TransactionId frozenXID, MultiXactId frozenMulti)
 		return;
 	}
 
-	/* Truncate CLOG and Multi to the oldest computed value */
-	TruncateCLOG(frozenXID);
+	/* Truncate the XID-LSN ranges to the last one that includes frozenXID */
+	truncateXID = frozenXID;
+	TruncateXidLSNRange(&truncateXID);
+	/* Truncate CLOG and Multi to the beginning of the oldest retained range */
+	TruncateCLOG(truncateXID);
 	TruncateMultiXact(frozenMulti);
 
 	/*
@@ -904,7 +908,7 @@ vac_truncate_clog(TransactionId frozenXID, MultiXactId frozenMulti)
 	 * for an(other) autovac cycle if needed.	XXX should we avoid possibly
 	 * signalling twice?
 	 */
-	SetTransactionIdLimit(frozenXID, oldestxid_datoid);
+	SetTransactionIdLimit(truncateXID, oldestxid_datoid);
 	MultiXactAdvanceOldest(frozenMulti, oldestmulti_datoid);
 }
 
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
index 078b822..7f82d79 100644
--- a/src/backend/commands/vacuumlazy.c
+++ b/src/backend/commands/vacuumlazy.c
@@ -479,8 +479,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		bool		tupgone,
 					hastup;
 		int			prev_dead_count;
-		OffsetNumber frozen[MaxOffsetNumber];
-		int			nfrozen;
 		Size		freespace;
 		bool		all_visible_according_to_vm;
 		bool		all_visible;
@@ -692,6 +690,14 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 			continue;
 		}
 
+		if (PageIsMature(page))
+		{
+			/* we can skip the page altogether. */
+			/* TODO: set visiblity map bit */
+			UnlockReleaseBuffer(buf);
+			continue;
+		}
+
 		/*
 		 * Prune all HOT-update chains in this page.
 		 *
@@ -706,7 +712,6 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 		 */
 		all_visible = true;
 		has_dead_tuples = false;
-		nfrozen = 0;
 		hastup = false;
 		prev_dead_count = vacrelstats->num_dead_tuples;
 		maxoff = PageGetMaxOffsetNumber(page);
@@ -863,34 +868,16 @@ lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
 				hastup = true;
 
 				/*
-				 * Each non-removable tuple must be checked to see if it needs
-				 * freezing.  Note we already have exclusive buffer lock.
+				 * Don't touch live tuples. We used to freeze them here, but
+				 * we don't do that anymore to avoid dirtying the page. Scans
+				 * will treat the tuple as live, as soon as the chunk of clog
+				 * containing the XIDs on this page is truncated (= the page
+				 * has "matured").
 				 */
-				if (heap_freeze_tuple(tuple.t_data, FreezeLimit,
-									  MultiXactFrzLimit))
-					frozen[nfrozen++] = offnum;
 			}
 		}						/* scan along page */
 
 		/*
-		 * If we froze any tuples, mark the buffer dirty, and write a WAL
-		 * record recording the changes.  We must log the changes to be
-		 * crash-safe against future truncation of CLOG.
-		 */
-		if (nfrozen > 0)
-		{
-			MarkBufferDirty(buf);
-			if (RelationNeedsWAL(onerel))
-			{
-				XLogRecPtr	recptr;
-
-				recptr = log_heap_freeze(onerel, buf, FreezeLimit,
-										 MultiXactFrzLimit, frozen, nfrozen);
-				PageSetLSN(page, recptr);
-			}
-		}
-
-		/*
 		 * If there are no indexes then we can vacuum the page right now
 		 * instead of doing a second scan.
 		 */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index ea16c64..0fb4d85 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2294,7 +2294,7 @@ static struct config_int ConfigureNamesInt[] =
 		},
 		&autovacuum_freeze_max_age,
 		/* see pg_resetxlog if you change the upper-limit value */
-		200000000, 100000000, 2000000000,
+		200000000, 100000, 2000000000, /* XXX: changed the min value for easier testing. */
 		NULL, NULL, NULL
 	},
 	{
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index ab4020a..87dff98 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -165,6 +165,9 @@ HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
 bool
 HeapTupleSatisfiesSelf(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
 {
+	if (PageIsMature(BufferGetPage(buffer)))
+		return true;
+
 	if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
 	{
 		if (tuple->t_infomask & HEAP_XMIN_INVALID)
@@ -353,6 +356,9 @@ HeapTupleSatisfiesSelf(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
 bool
 HeapTupleSatisfiesNow(HeapTupleHeader tuple, Snapshot snapshot, Buffer buffer)
 {
+	if (PageIsMature(BufferGetPage(buffer)))
+		return true;
+
 	if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
 	{
 		if (tuple->t_infomask & HEAP_XMIN_INVALID)
@@ -549,6 +555,9 @@ bool
 HeapTupleSatisfiesToast(HeapTupleHeader tuple, Snapshot snapshot,
 						Buffer buffer)
 {
+	if (PageIsMature(BufferGetPage(buffer)))
+		return true;
+
 	if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
 	{
 		if (tuple->t_infomask & HEAP_XMIN_INVALID)
@@ -630,6 +639,9 @@ HTSU_Result
 HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid,
 						 Buffer buffer)
 {
+	if (PageIsMature(BufferGetPage(buffer)))
+		return HeapTupleMayBeUpdated;
+
 	if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
 	{
 		if (tuple->t_infomask & HEAP_XMIN_INVALID)
@@ -854,6 +866,9 @@ HeapTupleSatisfiesDirty(HeapTupleHeader tuple, Snapshot snapshot,
 {
 	snapshot->xmin = snapshot->xmax = InvalidTransactionId;
 
+	if (PageIsMature(BufferGetPage(buffer)))
+		return true;
+
 	if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
 	{
 		if (tuple->t_infomask & HEAP_XMIN_INVALID)
@@ -1043,6 +1058,9 @@ bool
 HeapTupleSatisfiesMVCC(HeapTupleHeader tuple, Snapshot snapshot,
 					   Buffer buffer)
 {
+	if (PageIsMature(BufferGetPage(buffer)))
+		return true;
+
 	if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
 	{
 		if (tuple->t_infomask & HEAP_XMIN_INVALID)
@@ -1236,6 +1254,9 @@ HTSV_Result
 HeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin,
 						 Buffer buffer)
 {
+	if (PageIsMature(BufferGetPage(buffer)))
+		return HEAPTUPLE_LIVE;
+
 	/*
 	 * Has inserting transaction committed?
 	 *
@@ -1464,8 +1485,12 @@ HeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin,
  *	just whether or not the tuple is surely dead).
  */
 bool
-HeapTupleIsSurelyDead(HeapTupleHeader tuple, TransactionId OldestXmin)
+HeapTupleIsSurelyDead(HeapTupleHeader tuple, TransactionId OldestXmin,
+					  Buffer buffer)
 {
+	if (PageIsMature(BufferGetPage(buffer)))
+		return false;
+
 	/*
 	 * If the inserting transaction is marked invalid, then it aborted, and
 	 * the tuple is definitely dead.  If it's marked neither committed nor
@@ -1662,6 +1687,8 @@ HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple)
 	if (TransactionIdDidCommit(xmax))
 		return false;
 
+	/* FIXME: do we need to check PageIsMature here? */
+
 	/*
 	 * not current, not in progress, not committed -- must have aborted or
 	 * crashed
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index baa8c50..fbfc1e7 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -146,6 +146,7 @@ extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 				bool follow_update,
 				Buffer *buffer, HeapUpdateFailureData *hufd);
 extern void heap_inplace_update(Relation relation, HeapTuple tuple);
+extern void heap_freeze_page(Relation relation, Buffer buf);
 extern bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
 				  TransactionId cutoff_multi);
 extern bool heap_tuple_needs_freeze(HeapTupleHeader tuple, TransactionId cutoff_xid,
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 7ad71b3..8d84a5f 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -42,3 +42,4 @@ PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup
 PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL)
 PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, NULL, NULL, NULL)
 PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_xlog_startup, spg_xlog_cleanup, NULL)
+PG_RMGR(RM_VARSUP_ID, "Varsup", varsup_redo, varsup_desc, NULL, NULL, NULL)
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 23a41fd..1fe61f2 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -15,6 +15,8 @@
 #define TRANSAM_H
 
 #include "access/xlogdefs.h"
+#include "catalog/pg_control.h"
+#include "storage/bufpage.h"
 
 
 /* ----------------
@@ -88,6 +90,15 @@
 #define FirstBootstrapObjectId	10000
 #define FirstNormalObjectId		16384
 
+typedef struct XidLSNRange
+{
+	XLogRecPtr	minlsn;
+	TransactionId minxid;	/* inclusive */
+	TransactionId maxxid;	/* exclusive */
+} XidLSNRange;
+
+#define NUM_XID_LSN_RANGES		100
+
 /*
  * VariableCache is a data structure in shared memory that is used to track
  * OID and XID assignment state.  For largely historical reasons, there is
@@ -118,6 +129,18 @@ typedef struct VariableCacheData
 	TransactionId xidWrapLimit; /* where the world ends */
 	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
 
+	int			numranges;
+	XidLSNRange	xidlsnranges[NUM_XID_LSN_RANGES];
+	/*
+	 * These flags are used to determine if the ranges need to be flushed out
+	 * at a checkpoint. They work like corresponding flags in the buffer
+	 * manager.
+	 */
+	bool		xidlsnranges_dirty;
+	bool		xidlsnranges_recently_dirtied;
+
+	XLogRecPtr	clogtruncationpoint;	/* The point where we have truncated clog. */
+
 	/*
 	 * These fields are protected by ProcArrayLock.
 	 */
@@ -158,11 +181,18 @@ extern TransactionId TransactionIdLatest(TransactionId mainxid,
 extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid);
 
 /* in transam/varsup.c */
+extern bool PageUpdateNeedsFreezing(Page page);
+extern bool PageIsMature(Page page);
+extern TransactionId GetLatestRangeXmin(void);
 extern TransactionId GetNewTransactionId(bool isSubXact);
 extern TransactionId ReadNewTransactionId(void);
 extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
 					  Oid oldest_datoid);
 extern bool ForceTransactionIdLimitUpdate(void);
 extern Oid	GetNewObjectId(void);
+extern void TruncateXidLSNRange(TransactionId *oldestXact);
+extern void BootStrapVarsup(void);
+extern void StartupVarsup(void);
+extern void CheckPointVarsup(void);
 
 #endif   /* TRAMSAM_H */
diff --git a/src/include/access/varsup_internal.h b/src/include/access/varsup_internal.h
new file mode 100644
index 0000000..6fa406a
--- /dev/null
+++ b/src/include/access/varsup_internal.h
@@ -0,0 +1,27 @@
+/*
+ * varsup_internal.h
+ *
+ * varsup WAL routines. Internal to varsup.c and varsupdesc.c
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/xlog_internal.h
+ */
+#ifndef VARSUP_INTERNAL_H
+#define VARSUP_INTERNAL_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+
+/* XLOG info values for varsup rmgr */
+#define VARSUP_NEXTOID					0x10
+#define VARSUP_XID_LSN_RANGES			0x20
+
+/* in varsup.c */
+extern void varsup_redo(XLogRecPtr lsn, XLogRecord *record);
+
+/* in varsupdesc.c */
+extern void varsup_desc(StringInfo buf, uint8 xl_info, char *rec);
+
+#endif   /* XLOG_INTERNAL_H */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index b4a75ce..14b388e 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -306,7 +306,6 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
-extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
 extern XLogRecPtr GetRedoRecPtr(void);
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 4f154a9..5e1afc6 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -60,7 +60,7 @@ typedef struct CheckPoint
 #define XLOG_CHECKPOINT_SHUTDOWN		0x00
 #define XLOG_CHECKPOINT_ONLINE			0x10
 #define XLOG_NOOP						0x20
-#define XLOG_NEXTOID					0x30
+/* #define XLOG_NEXTOID					0x30 moved to varsup.c in 9.4 */
 #define XLOG_SWITCH						0x40
 #define XLOG_BACKUP_END					0x50
 #define XLOG_PARAMETER_CHANGE			0x60
diff --git a/src/include/utils/tqual.h b/src/include/utils/tqual.h
index 465231c..6c02092 100644
--- a/src/include/utils/tqual.h
+++ b/src/include/utils/tqual.h
@@ -84,7 +84,7 @@ extern HTSU_Result HeapTupleSatisfiesUpdate(HeapTupleHeader tuple,
 extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTupleHeader tuple,
 						 TransactionId OldestXmin, Buffer buffer);
 extern bool HeapTupleIsSurelyDead(HeapTupleHeader tuple,
-					  TransactionId OldestXmin);
+					  TransactionId OldestXmin, Buffer buffer);
 
 extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer,
 					 uint16 infomask, TransactionId xid);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to