Hi,

There has been some interest in keeping track of timestamp of
transaction commits.  This patch implements that.

There are some seemingly curious choices here.  First, this module can
be disabled, and in fact it's turned off by default.  At startup, we
verify whether it's enabled, and create the necessary SLRU segments if
so.  And if the server is started with this disabled, we set the oldest
value we know about to avoid trying to read the commit TS of
transactions of which we didn't keep record.  The ability to turn this
off is there to avoid imposing the overhead on systems that don't need
this feature.

Another thing of note is that we allow for some extra data alongside the
timestamp proper.  This might be useful for a replication system that
wants to keep track of the origin node ID of a committed transaction,
for example.  Exactly what will we do with the bit space we have is
unclear, so I have kept it generic and called it "commit extra data".

This offers the chance for outside modules to set the commit TS of a
transaction; there is support for WAL-logging such values.  But the core
user of the feature (RecordTransactionCommit) doesn't use it, because
xact.c's WAL logging itself is enough.  For systems that are replicating
transactions from remote nodes, it is useful.

We also keep track of the latest committed transaction.  This is
supposed to be useful to calculate replication lag.

-- 
Álvaro Herrera                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
*** a/contrib/pg_xlogdump/rmgrdesc.c
--- b/contrib/pg_xlogdump/rmgrdesc.c
***************
*** 9,14 ****
--- 9,15 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/committs.h"
  #include "access/gin.h"
  #include "access/gist_private.h"
  #include "access/hash.h"
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2257,2262 **** include 'filename'
--- 2257,2277 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
+       <term><varname>track_commit_timestamp</varname> (<type>bool</type>)</term>
+       <indexterm>
+        <primary><varname>track_commit_timestamp</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Record commit time of transactions.  This parameter
+         can only be set in
+         the <filename>postgresql.conf</> file or on the server command line.
+         The default value is off.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       </variablelist>
      </sect2>
  
*** a/src/backend/access/rmgrdesc/Makefile
--- b/src/backend/access/rmgrdesc/Makefile
***************
*** 8,14 **** subdir = src/backend/access/rmgrdesc
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = clogdesc.o dbasedesc.o gindesc.o gistdesc.o hashdesc.o heapdesc.o \
  	   mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
  	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
  
--- 8,15 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o hashdesc.o \
!        heapdesc.o \
  	   mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
  	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
  
*** /dev/null
--- b/src/backend/access/rmgrdesc/committsdesc.c
***************
*** 0 ****
--- 1,53 ----
+ /*-------------------------------------------------------------------------
+  *
+  * committsdesc.c
+  *    rmgr descriptor routines for access/transam/committs.c
+  *
+  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *    src/backend/access/rmgrdesc/committsdesc.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/committs.h"
+ #include "utils/timestamp.h"
+ 
+ 
+ void
+ committs_desc(StringInfo buf, uint8 xl_info, char *rec)
+ {
+ 	uint8		info = xl_info & ~XLR_INFO_MASK;
+ 
+ 	if (info == COMMITTS_ZEROPAGE)
+ 	{
+ 		int			pageno;
+ 
+ 		memcpy(&pageno, rec, sizeof(int));
+ 		appendStringInfo(buf, "zeropage: %d", pageno);
+ 	}
+ 	else if (info == COMMITTS_TRUNCATE)
+ 	{
+ 		int			pageno;
+ 
+ 		memcpy(&pageno, rec, sizeof(int));
+ 		appendStringInfo(buf, "truncate before: %d", pageno);
+ 	}
+ 	else if (info == COMMITTS_SETTS)
+ 	{
+ 		xl_committs_set *xlrec = (xl_committs_set *) rec;
+ 		int		i;
+ 
+ 		appendStringInfo(buf, "set committs %s for: %u",
+ 						 timestamptz_to_str(xlrec->timestamp),
+ 						 xlrec->mainxid);
+ 		for (i = 0; i < xlrec->nsubxids; i++)
+ 			appendStringInfo(buf, ", %u", xlrec->subxids[i]);
+ 	}
+ 	else
+ 		appendStringInfo(buf, "UNKNOWN");
+ }
*** a/src/backend/access/rmgrdesc/xlogdesc.c
--- b/src/backend/access/rmgrdesc/xlogdesc.c
***************
*** 44,50 **** xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
  		appendStringInfo(buf, "checkpoint: redo %X/%X; "
  						 "tli %u; prev tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
  						 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
! 						 "oldest running xid %u; %s",
  				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
  						 checkpoint->ThisTimeLineID,
  						 checkpoint->PrevTimeLineID,
--- 44,50 ----
  		appendStringInfo(buf, "checkpoint: redo %X/%X; "
  						 "tli %u; prev tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
  						 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
! 						 "oldest CommitTs xid: %u; oldest running xid %u; %s",
  				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
  						 checkpoint->ThisTimeLineID,
  						 checkpoint->PrevTimeLineID,
***************
*** 57,62 **** xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
--- 57,63 ----
  						 checkpoint->oldestXidDB,
  						 checkpoint->oldestMulti,
  						 checkpoint->oldestMultiDB,
+ 						 checkpoint->oldestCommitTs,
  						 checkpoint->oldestActiveXid,
  				 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
  	}
*** a/src/backend/access/transam/Makefile
--- b/src/backend/access/transam/Makefile
***************
*** 14,20 **** include $(top_builddir)/src/Makefile.global
  
  OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
  	timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
! 	xlogreader.o xlogutils.o
  
  include $(top_srcdir)/src/backend/common.mk
  
--- 14,20 ----
  
  OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
  	timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
! 	xlogreader.o xlogutils.o committs.o
  
  include $(top_srcdir)/src/backend/common.mk
  
*** a/src/backend/access/transam/clog.c
--- b/src/backend/access/transam/clog.c
***************
*** 152,159 **** TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
  		   status == TRANSACTION_STATUS_ABORTED);
  
  	/*
! 	 * See how many subxids, if any, are on the same page as the parent, if
! 	 * any.
  	 */
  	for (i = 0; i < nsubxids; i++)
  	{
--- 152,158 ----
  		   status == TRANSACTION_STATUS_ABORTED);
  
  	/*
! 	 * See how many subxids, if any, are on the same page as the parent.
  	 */
  	for (i = 0; i < nsubxids; i++)
  	{
*** /dev/null
--- b/src/backend/access/transam/committs.c
***************
*** 0 ****
--- 1,819 ----
+ /*-------------------------------------------------------------------------
+  *
+  * committs.c
+  *		PostgreSQL commit timestamp manager
+  *
+  * This module is a pg_clog-like system that stores the commit timestamp
+  * for each transaction.
+  *
+  * XLOG interactions: this module generates an XLOG record whenever a new
+  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
+  * generated for setting of values when the caller requests it; this allows
+  * us to support values coming from places other than transaction commit.
+  * Other writes of CommitTS come from recording of transaction commit in
+  * xact.c, which generates its own XLOG records for these events and will
+  * re-perform the status update on redo; so we need make no additional XLOG
+  * entry here.
+  *
+  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * src/backend/access/transam/committs.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/committs.h"
+ #include "access/htup_details.h"
+ #include "access/slru.h"
+ #include "access/transam.h"
+ #include "catalog/pg_type.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "pg_trace.h"
+ #include "utils/builtins.h"
+ #include "utils/snapmgr.h"
+ #include "utils/timestamp.h"
+ 
+ /*
+  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
+  * everywhere else in Postgres.
+  *
+  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+  * CommitTs page numbering also wraps around at
+  * 0xFFFFFFFF/COMMITTS_XACTS_PER_PAGE, and CommitTs segment numbering at
+  * 0xFFFFFFFF/COMMITTS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
+  * explicit notice of that fact in this module, except when comparing segment
+  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
+  */
+ 
+ /* We need 8+4 bytes per xact */
+ typedef struct CommitTimestampEntry
+ {
+ 	TimestampTz		time;
+ 	CommitExtraData	extra;
+ } CommitTimestampEntry;
+ 
+ #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, extra) + \
+ 									sizeof(CommitExtraData))
+ 
+ #define COMMITTS_XACTS_PER_PAGE \
+ 	(BLCKSZ / SizeOfCommitTimestampEntry)
+ 
+ #define TransactionIdToCTsPage(xid)	\
+ 	((xid) / (TransactionId) COMMITTS_XACTS_PER_PAGE)
+ #define TransactionIdToCTsEntry(xid)	\
+ 	((xid) % (TransactionId) COMMITTS_XACTS_PER_PAGE)
+ 
+ /*
+  * Link to shared-memory data structures for CLOG control
+  */
+ static SlruCtlData CommitTsCtlData;
+ 
+ #define CommitTsCtl (&CommitTsCtlData)
+ 
+ /*
+  * We keep a cache of the last value set in shared memory.  This is protected
+  * by CommitTsLock.
+  */
+ typedef struct CommitTimestampShared
+ {
+ 	TransactionId	xidLastCommit;
+ 	CommitTimestampEntry dataLastCommit;
+ } CommitTimestampShared;
+ 
+ CommitTimestampShared	*commitTsShared;
+ 
+ 
+ /* GUC variables */
+ bool	commit_ts_enabled;
+ 
+ static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+ 					 TransactionId *subxids, TimestampTz committs,
+ 					 CommitExtraData extra, int pageno);
+ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz committs,
+ 						  CommitExtraData extra, int slotno);
+ static int	ZeroCommitTsPage(int pageno, bool writeXlog);
+ static bool CommitTsPagePrecedes(int page1, int page2);
+ static void WriteZeroPageXlogRec(int pageno);
+ static void WriteTruncateXlogRec(int pageno);
+ static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
+ 						 TransactionId *subxids, TimestampTz timestamp,
+ 						 CommitExtraData data);
+ 
+ 
+ /*
+  * TransactionTreeSetCommitTimestamp
+  *
+  * Record the final commit timestamp of transaction entries in the commit log
+  * for a transaction and its subtransaction tree, as efficiently as possible.
+  *
+  * xid is the top level transaction id.
+  *
+  * subxids is an array of xids of length nsubxids, representing subtransactions
+  * in the tree of xid. In various cases nsubxids may be zero.
+  *
+  * The do_xlog parameter tells us whether to include a XLog record of this
+  * or not.  Normal path through RecordTransactionCommit() will be related
+  * to a transaction commit XLog record, and so should pass "false" here.
+  * Other callers probably want to pass true, so that the given values persist
+  * in case of crashes.
+  */
+ void
+ TransactionTreeSetCommitTimestamp(TransactionId xid, int nsubxids,
+ 								  TransactionId *subxids, TimestampTz timestamp,
+ 								  CommitExtraData extra, bool do_xlog)
+ {
+ 	int			i;
+ 	TransactionId headxid;
+ 
+ 	if (!commit_ts_enabled)
+ 		return;
+ 
+ 	/*
+ 	 * Comply with the WAL-before-data rule: if caller specified it wants
+ 	 * this value to be recorded in WAL, do so before touching the data.
+ 	 */
+ 	if (do_xlog)
+ 		WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, extra);
+ 
+ 	/*
+ 	 * We split the xids to set the timestamp to in groups belonging to the
+ 	 * same SLRU page; the first element in each such set is its head.  The
+ 	 * first group has the main XID as the head; subsequent sets use the
+ 	 * first subxid not on the previous page as head.  This way, we only have
+ 	 * to lock/modify each SLRU page once.
+ 	 */
+ 	for (i = 0, headxid = xid;;)
+ 	{
+ 		int			pageno = TransactionIdToCTsPage(headxid);
+ 		int			j;
+ 
+ 		for (j = i; j < nsubxids; j++)
+ 		{
+ 			if (TransactionIdToCTsPage(subxids[j]) != pageno)
+ 				break;
+ 		}
+ 		/* subxids[i..j] are on the same page as the head */
+ 
+ 		SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, extra,
+ 							 pageno);
+ 
+ 		/* if we wrote out all subxids, we're done. */
+ 		if (j + 1 >= nsubxids)
+ 			break;
+ 
+ 		/*
+ 		 * Set the new head and skip over it, as well as over the subxids
+ 		 * we just wrote.
+ 		 */
+ 		headxid = subxids[j];
+ 		i += j - i + 1;
+ 	}
+ 
+ 	/*
+ 	 * Update the cached value in shared memory
+ 	 */
+ 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ 	commitTsShared->xidLastCommit = xid;
+ 	commitTsShared->dataLastCommit.time = timestamp;
+ 	commitTsShared->dataLastCommit.extra = extra;
+ 	LWLockRelease(CommitTsLock);
+ }
+ 
+ /*
+  * Record the commit timestamp of transaction entries in the commit log for all
+  * entries on a single page.  Atomic only on this page.
+  */
+ static void
+ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+ 					 TransactionId *subxids, TimestampTz committs,
+ 					 CommitExtraData extra, int pageno)
+ {
+ 	int			slotno;
+ 	int			i;
+ 
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 	slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
+ 
+ 	TransactionIdSetCommitTs(xid, committs, extra, slotno);
+ 	for (i = 0; i < nsubxids; i++)
+ 		TransactionIdSetCommitTs(subxids[i], committs, extra, slotno);
+ 
+ 	CommitTsCtl->shared->page_dirty[slotno] = true;
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * Sets the commit timestamp of a single transaction.
+  *
+  * Must be called with CommitTsControlLock held
+  */
+ static void
+ TransactionIdSetCommitTs(TransactionId xid, TimestampTz committs,
+ 						 CommitExtraData extra, int slotno)
+ {
+ 	int			entryno = TransactionIdToCTsEntry(xid);
+ 	CommitTimestampEntry *entry;
+ 
+ 	entry = (CommitTimestampEntry *)
+ 		(CommitTsCtl->shared->page_buffer[slotno] +
+ 		 SizeOfCommitTimestampEntry * entryno);
+ 
+ 	entry->time = committs;
+ 	entry->extra = extra;
+ }
+ 
+ /*
+  * Interrogate the commit timestamp of a transaction.
+  */
+ void
+ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
+ 							 CommitExtraData *data)
+ {
+ 	int			pageno = TransactionIdToCTsPage(xid);
+ 	int			entryno = TransactionIdToCTsEntry(xid);
+ 	int			slotno;
+ 	CommitTimestampEntry *entry;
+ 	TransactionId oldestCommitTs;
+ 
+ 	/* Return empty if module not enabled */
+ 	if (!commit_ts_enabled)
+ 	{
+ 		if (ts)
+ 			*ts = InvalidTransactionId;
+ 		if (data)
+ 			*data = (CommitExtraData) 0;
+ 		return;
+ 	}
+ 
+ 	/* Also return empty if the requested value is older than what we have */
+ 	LWLockAcquire(CommitTsControlLock, LW_SHARED);
+ 	oldestCommitTs = ShmemVariableCache->oldestCommitTs;
+ 	LWLockRelease(CommitTsControlLock);
+ 
+ 	if (!TransactionIdIsValid(oldestCommitTs) ||
+ 		TransactionIdPrecedes(xid, oldestCommitTs))
+ 	{
+ 		if (ts)
+ 			*ts = InvalidTransactionId;
+ 		if (data)
+ 			*data = (CommitExtraData) 0;
+ 		return;
+ 	}
+ 
+ 	/*
+ 	 * Use an unlocked atomic read on our cached value in shared memory;
+ 	 * if it's a hit, acquire a lock and read the data, after verifying
+ 	 * that it's still what we initially read.  Otherwise, fall through
+ 	 * to read from SLRU.
+ 	 */
+ 	if (commitTsShared->xidLastCommit == xid)
+ 	{
+ 		LWLockAcquire(CommitTsLock, LW_SHARED);
+ 		if (commitTsShared->xidLastCommit == xid)
+ 		{
+ 			if (ts)
+ 				*ts = commitTsShared->dataLastCommit.time;
+ 			if (data)
+ 				*data = commitTsShared->dataLastCommit.extra;
+ 			LWLockRelease(CommitTsLock);
+ 			return;
+ 		}
+ 		LWLockRelease(CommitTsLock);
+ 	}
+ 
+ 	/* lock is acquired by SimpleLruReadPage_ReadOnly */
+ 	slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
+ 	entry = (CommitTimestampEntry *)
+ 		(CommitTsCtl->shared->page_buffer[slotno] +
+ 		 SizeOfCommitTimestampEntry * entryno);
+ 
+ 	if (ts)
+ 		*ts = entry->time;
+ 
+ 	if (data)
+ 		*data = entry->extra;
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * Return the Xid of the latest committed transaction.  (As far as this module
+  * is concerned, anyway; it's up to the caller to ensure the value is useful
+  * for its purposes.)
+  *
+  * ts and extra are filled with the corresponding data; they can be passed
+  * as NULL if not wanted.
+  */
+ TransactionId
+ GetLatestCommitTimestampData(TimestampTz *ts, CommitExtraData *extra)
+ {
+ 	TransactionId	xid;
+ 
+ 	/* Return empty if module not enabled */
+ 	if (!commit_ts_enabled)
+ 	{
+ 		if (ts)
+ 			*ts = InvalidTransactionId;
+ 		if (extra)
+ 			*extra = (CommitExtraData) 0;
+ 		return InvalidTransactionId;
+ 	}
+ 
+ 	LWLockAcquire(CommitTsLock, LW_SHARED);
+ 	xid = commitTsShared->xidLastCommit;
+ 	if (ts)
+ 		*ts = commitTsShared->dataLastCommit.time;
+ 	if (extra)
+ 		*extra = commitTsShared->dataLastCommit.extra;
+ 	LWLockRelease(CommitTsLock);
+ 
+ 	return xid;
+ }
+ 
+ /*
+  * SQL-callable wrapper to obtain commit time of a transaction
+  */
+ PG_FUNCTION_INFO_V1(pg_get_transaction_committime);
+ Datum
+ pg_get_transaction_committime(PG_FUNCTION_ARGS)
+ {
+ 	TransactionId	xid = PG_GETARG_UINT32(0);
+ 	TimestampTz		committs;
+ 
+ 	TransactionIdGetCommitTsData(xid, &committs, NULL);
+ 
+ 	PG_RETURN_TIMESTAMPTZ(committs);
+ }
+ 
+ PG_FUNCTION_INFO_V1(pg_get_transaction_extradata);
+ Datum
+ pg_get_transaction_extradata(PG_FUNCTION_ARGS)
+ {
+ 	TransactionId	xid = PG_GETARG_UINT32(0);
+ 	CommitExtraData	data;
+ 
+ 	TransactionIdGetCommitTsData(xid, NULL, &data);
+ 
+ 	PG_RETURN_INT32(data);
+ }
+ 
+ PG_FUNCTION_INFO_V1(pg_get_transaction_committime_data);
+ Datum
+ pg_get_transaction_committime_data(PG_FUNCTION_ARGS)
+ {
+ 	TransactionId	xid = PG_GETARG_UINT32(0);
+ 	TimestampTz		committs;
+ 	CommitExtraData	data;
+ 	Datum       values[2];
+ 	bool        nulls[2];
+ 	TupleDesc   tupdesc;
+ 	HeapTuple	htup;
+ 
+ 	/*
+ 	 * Construct a tuple descriptor for the result row.  This must match this
+ 	 * function's pg_proc entry!
+ 	 */
+ 	tupdesc = CreateTemplateTupleDesc(2, false);
+ 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
+ 					   TIMESTAMPTZOID, -1, 0);
+ 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "extra",
+ 					   INT4OID, -1, 0);
+ 	tupdesc = BlessTupleDesc(tupdesc);
+ 
+ 	/* and construct a tuple with our data */
+ 	TransactionIdGetCommitTsData(xid, &committs, &data);
+ 
+ 	values[0] = TimestampTzGetDatum(committs);
+ 	nulls[0] = false;
+ 
+ 	values[1] = Int32GetDatum(data);
+ 	nulls[1] = false;
+ 
+ 	htup = heap_form_tuple(tupdesc, values, nulls);
+ 
+ 	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+ }
+ 
+ PG_FUNCTION_INFO_V1(pg_get_latest_transaction_committime_data);
+ Datum
+ pg_get_latest_transaction_committime_data(PG_FUNCTION_ARGS)
+ {
+ 	TransactionId	xid;
+ 	TimestampTz		committs;
+ 	CommitExtraData	data;
+ 	Datum       values[3];
+ 	bool        nulls[3];
+ 	TupleDesc   tupdesc;
+ 	HeapTuple	htup;
+ 
+ 	/*
+ 	 * Construct a tuple descriptor for the result row.  This must match this
+ 	 * function's pg_proc entry!
+ 	 */
+ 	tupdesc = CreateTemplateTupleDesc(3, false);
+ 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
+ 					   XIDOID, -1, 0);
+ 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
+ 					   TIMESTAMPTZOID, -1, 0);
+ 	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
+ 					   INT4OID, -1, 0);
+ 	tupdesc = BlessTupleDesc(tupdesc);
+ 
+ 	/* and construct a tuple with our data */
+ 	xid = GetLatestCommitTimestampData(&committs, &data);
+ 
+ 	values[0] = TransactionIdGetDatum(xid);
+ 	nulls[0] = false;
+ 
+ 	values[1] = TimestampTzGetDatum(committs);
+ 	nulls[1] = false;
+ 
+ 	values[2] = Int32GetDatum(data);
+ 	nulls[2] = false;
+ 
+ 	htup = heap_form_tuple(tupdesc, values, nulls);
+ 
+ 	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+ }
+ 
+ /*
+  * Number of shared CommitTS buffers.
+  *
+  * We use a very similar logic as for the number of CLOG buffers; see comments
+  * in CLOGShmemBuffers.
+  */
+ Size
+ CommitTsShmemBuffers(void)
+ {
+ 	return Min(16, Max(4, NBuffers / 1024));
+ }
+ 
+ /*
+  * Initialization of shared memory for CommitTs
+  */
+ Size
+ CommitTsShmemSize(void)
+ {
+ 	return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
+ 		sizeof(CommitTimestampShared);
+ }
+ 
+ void
+ CommitTsShmemInit(void)
+ {
+ 	bool	found;
+ 
+ 	CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
+ 	SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
+ 				  CommitTsControlLock, "pg_committs");
+ 
+ 	commitTsShared = ShmemInitStruct("CommitTs shared",
+ 									 sizeof(CommitTimestampShared),
+ 									 &found);
+ 
+ 	if (!IsUnderPostmaster)
+ 	{
+ 		Assert(!found);
+ 
+ 		commitTsShared->xidLastCommit = InvalidTransactionId;
+ 		commitTsShared->dataLastCommit.time = 0;
+ 		commitTsShared->dataLastCommit.extra = 0;
+ 	}
+ 	else
+ 		Assert(found);
+ }
+ 
+ /*
+  * This function must be called ONCE on system install.
+  *
+  * (The CommitTs directory is assumed to have been created by initdb, and
+  * CommitTsShmemInit must have been called already.)
+  */
+ void
+ BootStrapCommitTs(void)
+ {
+ 	/*
+ 	 * Nothing to do here at present, unlike most other SLRU modules; segments
+ 	 * are created when the server is started with this module enabled.
+ 	 * See StartupCommitTs.
+ 	 */
+ }
+ 
+ /*
+  * Initialize (or reinitialize) a page of CommitTs to zeroes.
+  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
+  *
+  * The page is not actually written, just set up in shared memory.
+  * The slot number of the new page is returned.
+  *
+  * Control lock must be held at entry, and will be held at exit.
+  */
+ static int
+ ZeroCommitTsPage(int pageno, bool writeXlog)
+ {
+ 	int			slotno;
+ 
+ 	slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
+ 
+ 	if (writeXlog)
+ 		WriteZeroPageXlogRec(pageno);
+ 
+ 	return slotno;
+ }
+ 
+ /*
+  * This must be called ONCE during postmaster or standalone-backend startup,
+  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+  *
+  * This is in charge of creating the currently active segment, if it's not
+  * already there.  The reason for this is that the server might have been
+  * running with this module disabled for a while and thus might have skipped
+  * the normal creation point.
+  */
+ void
+ StartupCommitTs(void)
+ {
+ 	TransactionId xid = ShmemVariableCache->nextXid;
+ 	int			pageno = TransactionIdToCTsPage(xid);
+ 	SlruCtl		ctl = CommitTsCtl;
+ 
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * Initialize our idea of the latest page number.
+ 	 */
+ 	CommitTsCtl->shared->latest_page_number = pageno;
+ 
+ 	/*
+ 	 * If this module is not currently enabled, make sure we don't hand back
+ 	 * possibly-invalid data; also remove segments of old data.
+ 	 */
+ 	if (!commit_ts_enabled)
+ 	{
+ 		ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
+ 		LWLockRelease(CommitTsControlLock);
+ 
+ 		TruncateCommitTs(ReadNewTransactionId());
+ 
+ 		return;
+ 	}
+ 
+ 	/*
+ 	 * If CommitTs is enabled, but it wasn't in the previous server run, we
+ 	 * need to set the oldest value to the next Xid; that way, we will not try
+ 	 * to read data that might not have been set.
+ 	 *
+ 	 * XXX does this have a problem if a server is started with commitTs
+ 	 * enabled, then started with commitTs disabled, then restarted with it
+ 	 * enabled again?  It doesn't look like it does, because there should be a
+ 	 * checkpoint that sets the value to InvalidTransactionId at end of
+ 	 * recovery; and so any chance of injecting new transactions without
+ 	 * CommitTs values would occur after the oldestCommitTs has been set to
+ 	 * Invalid temporarily.
+ 	 */
+ 	if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
+ 		ShmemVariableCache->oldestCommitTs = ReadNewTransactionId();
+ 
+ 	/* Finally, create the current segment file, if necessary */
+ 	if (!SimpleLruDoesPhysicalPageExist(ctl, pageno))
+ 	{
+ 		int		slotno;
+ 
+ 		slotno = ZeroCommitTsPage(pageno, false);
+ 		SimpleLruWritePage(CommitTsCtl, slotno);
+ 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+ 	}
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * This must be called ONCE during postmaster or standalone-backend shutdown
+  */
+ void
+ ShutdownCommitTs(void)
+ {
+ 	/* Flush dirty CommitTs pages to disk */
+ 	SimpleLruFlush(CommitTsCtl, false);
+ }
+ 
+ /*
+  * Perform a checkpoint --- either during shutdown, or on-the-fly
+  */
+ void
+ CheckPointCommitTs(void)
+ {
+ 	/* Flush dirty CommitTs pages to disk */
+ 	SimpleLruFlush(CommitTsCtl, true);
+ }
+ 
+ /*
+  * Make sure that CommitTs has room for a newly-allocated XID.
+  *
+  * NB: this is called while holding XidGenLock.  We want it to be very fast
+  * most of the time; even when it's not so fast, no actual I/O need happen
+  * unless we're forced to write out a dirty CommitTs or xlog page to make room
+  * in shared memory.
+  */
+ void
+ ExtendCommitTs(TransactionId newestXact)
+ {
+ 	int			pageno;
+ 
+ 	/* nothing to do if module not enabled */
+ 	if (!commit_ts_enabled)
+ 		return;
+ 
+ 	/*
+ 	 * No work except at first XID of a page.  But beware: just after
+ 	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
+ 	 */
+ 	if (TransactionIdToCTsEntry(newestXact) != 0 &&
+ 		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
+ 		return;
+ 
+ 	pageno = TransactionIdToCTsPage(newestXact);
+ 
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 	/* Zero the page and make an XLOG entry about it */
+ 	ZeroCommitTsPage(pageno, !InRecovery);
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * Remove all CommitTs segments before the one holding the passed
+  * transaction ID
+  *
+  * Note that we don't need to flush XLOG here.
+  */
+ void
+ TruncateCommitTs(TransactionId oldestXact)
+ {
+ 	int			cutoffPage;
+ 
+ 	/*
+ 	 * The cutoff point is the start of the segment containing oldestXact. We
+ 	 * pass the *page* containing oldestXact to SimpleLruTruncate.
+ 	 */
+ 	cutoffPage = TransactionIdToCTsPage(oldestXact);
+ 
+ 	/* Check to see if there's any files that could be removed */
+ 	if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence, &cutoffPage))
+ 		return;					/* nothing to remove */
+ 
+ 	/* Write XLOG record */
+ 	WriteTruncateXlogRec(cutoffPage);
+ 
+ 	/* Now we can remove the old CommitTs segment(s) */
+ 	SimpleLruTruncate(CommitTsCtl, cutoffPage);
+ }
+ 
+ /*
+  * Set the earliest value for which commit TS can be consulted.
+  */
+ void
+ SetCommitTsLimit(TransactionId oldestXact)
+ {
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 	ShmemVariableCache->oldestCommitTs = oldestXact;
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * Decide which of two CLOG page numbers is "older" for truncation purposes.
+  *
+  * We need to use comparison of TransactionIds here in order to do the right
+  * thing with wraparound XID arithmetic.  However, if we are asked about
+  * page number zero, we don't want to hand InvalidTransactionId to
+  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
+  * offset both xids by FirstNormalTransactionId to avoid that.
+  */
+ static bool
+ CommitTsPagePrecedes(int page1, int page2)
+ {
+ 	TransactionId xid1;
+ 	TransactionId xid2;
+ 
+ 	xid1 = ((TransactionId) page1) * COMMITTS_XACTS_PER_PAGE;
+ 	xid1 += FirstNormalTransactionId;
+ 	xid2 = ((TransactionId) page2) * COMMITTS_XACTS_PER_PAGE;
+ 	xid2 += FirstNormalTransactionId;
+ 
+ 	return TransactionIdPrecedes(xid1, xid2);
+ }
+ 
+ 
+ /*
+  * Write a ZEROPAGE xlog record
+  */
+ static void
+ WriteZeroPageXlogRec(int pageno)
+ {
+ 	XLogRecData rdata;
+ 
+ 	rdata.data = (char *) (&pageno);
+ 	rdata.len = sizeof(int);
+ 	rdata.buffer = InvalidBuffer;
+ 	rdata.next = NULL;
+ 	(void) XLogInsert(RM_COMMITTS_ID, COMMITTS_ZEROPAGE, &rdata);
+ }
+ 
+ /*
+  * Write a TRUNCATE xlog record
+  */
+ static void
+ WriteTruncateXlogRec(int pageno)
+ {
+ 	XLogRecData rdata;
+ 
+ 	rdata.data = (char *) (&pageno);
+ 	rdata.len = sizeof(int);
+ 	rdata.buffer = InvalidBuffer;
+ 	rdata.next = NULL;
+ 	XLogInsert(RM_COMMITTS_ID, COMMITTS_TRUNCATE, &rdata);
+ }
+ 
+ /*
+  * Write a SETTS xlog record
+  */
+ static void
+ WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
+ 						 TransactionId *subxids, TimestampTz timestamp,
+ 						 CommitExtraData data)
+ {
+ 	XLogRecData	rdata;
+ 	xl_committs_set	record;
+ 
+ 	record.timestamp = timestamp;
+ 	record.data = data;
+ 	record.mainxid = mainxid;
+ 	record.nsubxids = nsubxids;
+ 	memcpy(record.subxids, subxids, sizeof(TransactionId) * nsubxids);
+ 
+ 	rdata.data = (char *) &record;
+ 	rdata.len = offsetof(xl_committs_set, subxids) +
+ 		nsubxids * sizeof(TransactionId);
+ 	rdata.buffer = InvalidBuffer;
+ 	rdata.next = NULL;
+ 	XLogInsert(RM_COMMITTS_ID, COMMITTS_SETTS, &rdata);
+ }
+ 
+ 
+ /*
+  * CommitTS resource manager's routines
+  */
+ void
+ committs_redo(XLogRecPtr lsn, XLogRecord *record)
+ {
+ 	uint8		info = record->xl_info & ~XLR_INFO_MASK;
+ 
+ 	/* Backup blocks are not used in committs records */
+ 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ 
+ 	if (info == COMMITTS_ZEROPAGE)
+ 	{
+ 		int			pageno;
+ 		int			slotno;
+ 
+ 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ 
+ 		LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 		slotno = ZeroCommitTsPage(pageno, false);
+ 		SimpleLruWritePage(CommitTsCtl, slotno);
+ 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+ 
+ 		LWLockRelease(CommitTsControlLock);
+ 	}
+ 	else if (info == COMMITTS_TRUNCATE)
+ 	{
+ 		int			pageno;
+ 
+ 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ 
+ 		/*
+ 		 * During XLOG replay, latest_page_number isn't set up yet; insert a
+ 		 * suitable value to bypass the sanity test in SimpleLruTruncate.
+ 		 */
+ 		CommitTsCtl->shared->latest_page_number = pageno;
+ 
+ 		SimpleLruTruncate(CommitTsCtl, pageno);
+ 	}
+ 	else if (info == COMMITTS_SETTS)
+ 	{
+ 		xl_committs_set *setts = (xl_committs_set *) XLogRecGetData(record);
+ 
+ 		TransactionTreeSetCommitTimestamp(setts->mainxid, setts->nsubxids,
+ 										  setts->subxids, setts->timestamp,
+ 										  setts->data, false);
+ 	}
+ 	else
+ 		elog(PANIC, "committs_redo: unknown op code %u", info);
+ }
*** a/src/backend/access/transam/rmgr.c
--- b/src/backend/access/transam/rmgr.c
***************
*** 8,13 ****
--- 8,14 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/committs.h"
  #include "access/gin.h"
  #include "access/gist_private.h"
  #include "access/hash.h"
*** a/src/backend/access/transam/varsup.c
--- b/src/backend/access/transam/varsup.c
***************
*** 14,19 ****
--- 14,20 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/committs.h"
  #include "access/subtrans.h"
  #include "access/transam.h"
  #include "access/xact.h"
***************
*** 157,165 **** GetNewTransactionId(bool isSubXact)
  	 * XID before we zero the page.  Fortunately, a page of the commit log
  	 * holds 32K or more transactions, so we don't have to do this very often.
  	 *
! 	 * Extend pg_subtrans too.
  	 */
  	ExtendCLOG(xid);
  	ExtendSUBTRANS(xid);
  
  	/*
--- 158,167 ----
  	 * XID before we zero the page.  Fortunately, a page of the commit log
  	 * holds 32K or more transactions, so we don't have to do this very often.
  	 *
! 	 * Extend pg_subtrans and pg_committs too.
  	 */
  	ExtendCLOG(xid);
+ 	ExtendCommitTs(xid);
  	ExtendSUBTRANS(xid);
  
  	/*
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 20,25 ****
--- 20,26 ----
  #include <time.h>
  #include <unistd.h>
  
+ #include "access/committs.h"
  #include "access/multixact.h"
  #include "access/subtrans.h"
  #include "access/transam.h"
***************
*** 1118,1123 **** RecordTransactionCommit(void)
--- 1119,1132 ----
  	}
  
  	/*
+ 	 * We don't need to log the commit timestamp separately since the commit
+ 	 * record logged above has all the necessary action to set the timestamp
+ 	 * again.
+ 	 */
+ 	TransactionTreeSetCommitTimestamp(xid, nchildren, children,
+ 									  xactStopTimestamp, 0, false);
+ 
+ 	/*
  	 * Check if we want to commit asynchronously.  We can allow the XLOG flush
  	 * to happen asynchronously if synchronous_commit=off, or if the current
  	 * transaction has not performed any WAL-logged operation.	The latter
***************
*** 4563,4568 **** xactGetCommittedChildren(TransactionId **ptr)
--- 4572,4578 ----
   */
  static void
  xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
+ 						  TimestampTz commit_time,
  						  TransactionId *sub_xids, int nsubxacts,
  						  SharedInvalidationMessage *inval_msgs, int nmsgs,
  						  RelFileNode *xnodes, int nrels,
***************
*** 4590,4595 **** xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
--- 4600,4609 ----
  		LWLockRelease(XidGenLock);
  	}
  
+ 	/* Set the transaction commit time */
+ 	TransactionTreeSetCommitTimestamp(xid, nsubxacts, sub_xids,
+ 									  commit_time, 0, false);
+ 
  	if (standbyState == STANDBY_DISABLED)
  	{
  		/*
***************
*** 4709,4715 **** xact_redo_commit(xl_xact_commit *xlrec,
  	/* invalidation messages array follows subxids */
  	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
  
! 	xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
  							  inval_msgs, xlrec->nmsgs,
  							  xlrec->xnodes, xlrec->nrels,
  							  xlrec->dbId,
--- 4723,4730 ----
  	/* invalidation messages array follows subxids */
  	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
  
! 	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
! 							  subxacts, xlrec->nsubxacts,
  							  inval_msgs, xlrec->nmsgs,
  							  xlrec->xnodes, xlrec->nrels,
  							  xlrec->dbId,
***************
*** 4724,4730 **** static void
  xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
  						 TransactionId xid, XLogRecPtr lsn)
  {
! 	xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
  							  NULL, 0,	/* inval msgs */
  							  NULL, 0,	/* relfilenodes */
  							  InvalidOid,		/* dbId */
--- 4739,4746 ----
  xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
  						 TransactionId xid, XLogRecPtr lsn)
  {
! 	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
! 							  xlrec->subxacts, xlrec->nsubxacts,
  							  NULL, 0,	/* inval msgs */
  							  NULL, 0,	/* relfilenodes */
  							  InvalidOid,		/* dbId */
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 22,27 ****
--- 22,28 ----
  #include <unistd.h>
  
  #include "access/clog.h"
+ #include "access/committs.h"
  #include "access/multixact.h"
  #include "access/subtrans.h"
  #include "access/timeline.h"
***************
*** 5183,5188 **** BootStrapXLOG(void)
--- 5184,5190 ----
  	checkPoint.oldestXidDB = TemplateDbOid;
  	checkPoint.oldestMulti = FirstMultiXactId;
  	checkPoint.oldestMultiDB = TemplateDbOid;
+ 	checkPoint.oldestCommitTs = InvalidTransactionId;
  	checkPoint.time = (pg_time_t) time(NULL);
  	checkPoint.oldestActiveXid = InvalidTransactionId;
  
***************
*** 5192,5197 **** BootStrapXLOG(void)
--- 5194,5200 ----
  	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
  	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
  	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+ 	SetCommitTsLimit(InvalidTransactionId);
  
  	/* Set up the XLOG page header */
  	page->xlp_magic = XLOG_PAGE_MAGIC;
***************
*** 5272,5277 **** BootStrapXLOG(void)
--- 5275,5281 ----
  
  	/* Bootstrap the commit log, too */
  	BootStrapCLOG();
+ 	BootStrapCommitTs();
  	BootStrapSUBTRANS();
  	BootStrapMultiXact();
  
***************
*** 6318,6323 **** StartupXLOG(void)
--- 6322,6330 ----
  	ereport(DEBUG1,
  			(errmsg("oldest MultiXactId: %u, in database %u",
  					checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
+ 	ereport(DEBUG1,
+ 			(errmsg("oldest CommitTs Xid: %u",
+ 					checkPoint.oldestCommitTs)));
  	if (!TransactionIdIsNormal(checkPoint.nextXid))
  		ereport(PANIC,
  				(errmsg("invalid next transaction ID")));
***************
*** 6329,6334 **** StartupXLOG(void)
--- 6336,6342 ----
  	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
  	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
  	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+ 	SetCommitTsLimit(checkPoint.oldestCommitTs);
  	XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
  	XLogCtl->ckptXid = checkPoint.nextXid;
  
***************
*** 6532,6541 **** StartupXLOG(void)
  			ProcArrayInitRecovery(ShmemVariableCache->nextXid);
  
  			/*
! 			 * Startup commit log and subtrans only. Other SLRUs are not
! 			 * maintained during recovery and need not be started yet.
  			 */
  			StartupCLOG();
  			StartupSUBTRANS(oldestActiveXID);
  
  			/*
--- 6540,6551 ----
  			ProcArrayInitRecovery(ShmemVariableCache->nextXid);
  
  			/*
! 			 * Startup commit log, commit timestamp, and subtrans only. Other
! 			 * SLRUs are not maintained during recovery and need not be started
! 			 * yet.
  			 */
  			StartupCLOG();
+ 			StartupCommitTs();
  			StartupSUBTRANS(oldestActiveXID);
  
  			/*
***************
*** 7191,7196 **** StartupXLOG(void)
--- 7201,7207 ----
  	if (standbyState == STANDBY_DISABLED)
  	{
  		StartupCLOG();
+ 		StartupCommitTs();
  		StartupSUBTRANS(oldestActiveXID);
  	}
  
***************
*** 7759,7764 **** ShutdownXLOG(int code, Datum arg)
--- 7770,7776 ----
  		CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
  	}
  	ShutdownCLOG();
+ 	ShutdownCommitTs();
  	ShutdownSUBTRANS();
  	ShutdownMultiXact();
  
***************
*** 8152,8157 **** CreateCheckPoint(int flags)
--- 8164,8173 ----
  	checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
  	LWLockRelease(XidGenLock);
  
+ 	LWLockAcquire(CommitTsControlLock, LW_SHARED);
+ 	checkPoint.oldestCommitTs = ShmemVariableCache->oldestCommitTs;
+ 	LWLockRelease(CommitTsControlLock);
+ 
  	/* Increase XID epoch if we've wrapped around since last checkpoint */
  	checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
  	if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
***************
*** 8392,8397 **** static void
--- 8408,8414 ----
  CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
  {
  	CheckPointCLOG();
+ 	CheckPointCommitTs();
  	CheckPointSUBTRANS();
  	CheckPointMultiXact();
  	CheckPointPredicate();
*** a/src/backend/commands/vacuum.c
--- b/src/backend/commands/vacuum.c
***************
*** 23,28 ****
--- 23,29 ----
  #include <math.h>
  
  #include "access/clog.h"
+ #include "access/committs.h"
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/htup_details.h"
***************
*** 894,901 **** vac_truncate_clog(TransactionId frozenXID, MultiXactId minMulti)
  		return;
  	}
  
! 	/* Truncate CLOG and Multi to the oldest computed value */
  	TruncateCLOG(frozenXID);
  	TruncateMultiXact(minMulti);
  
  	/*
--- 895,903 ----
  		return;
  	}
  
! 	/* Truncate CLOG, CommitTS and Multi to the oldest computed values */
  	TruncateCLOG(frozenXID);
+ 	TruncateCommitTs(frozenXID);
  	TruncateMultiXact(minMulti);
  
  	/*
***************
*** 906,911 **** vac_truncate_clog(TransactionId frozenXID, MultiXactId minMulti)
--- 908,914 ----
  	 */
  	SetTransactionIdLimit(frozenXID, oldestxid_datoid);
  	MultiXactAdvanceOldest(minMulti, minmulti_datoid);
+ 	SetCommitTsLimit(frozenXID);
  }
  
  
*** a/src/backend/storage/ipc/ipci.c
--- b/src/backend/storage/ipc/ipci.c
***************
*** 15,20 ****
--- 15,21 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/committs.h"
  #include "access/heapam.h"
  #include "access/multixact.h"
  #include "access/nbtree.h"
***************
*** 113,118 **** CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
--- 114,120 ----
  		size = add_size(size, ProcGlobalShmemSize());
  		size = add_size(size, XLOGShmemSize());
  		size = add_size(size, CLOGShmemSize());
+ 		size = add_size(size, CommitTsShmemSize());
  		size = add_size(size, SUBTRANSShmemSize());
  		size = add_size(size, TwoPhaseShmemSize());
  		size = add_size(size, BackgroundWorkerShmemSize());
***************
*** 195,200 **** CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
--- 197,203 ----
  	 */
  	XLOGShmemInit();
  	CLOGShmemInit();
+ 	CommitTsShmemInit();
  	SUBTRANSShmemInit();
  	MultiXactShmemInit();
  	InitBufferPool();
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 46,51 ****
--- 46,52 ----
  #include <signal.h>
  
  #include "access/clog.h"
+ #include "access/committs.h"
  #include "access/subtrans.h"
  #include "access/transam.h"
  #include "access/xact.h"
***************
*** 2692,2697 **** RecordKnownAssignedTransactionIds(TransactionId xid)
--- 2693,2699 ----
  		while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
  		{
  			ExtendCLOG(next_expected_xid);
+ 			ExtendCommitTs(next_expected_xid);
  			ExtendSUBTRANS(next_expected_xid);
  
  			TransactionIdAdvance(next_expected_xid);
*** a/src/backend/storage/lmgr/lwlock.c
--- b/src/backend/storage/lmgr/lwlock.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/committs.h"
  #include "access/multixact.h"
  #include "access/subtrans.h"
  #include "commands/async.h"
***************
*** 191,196 **** NumLWLocks(void)
--- 192,200 ----
  	/* clog.c needs one per CLOG buffer */
  	numLocks += CLOGShmemBuffers();
  
+ 	/* committs.c needs one per CommitTs buffer */
+ 	numLocks += CommitTsShmemBuffers();
+ 
  	/* subtrans.c needs one per SubTrans buffer */
  	numLocks += NUM_SUBTRANS_BUFFERS;
  
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 26,31 ****
--- 26,32 ----
  #include <syslog.h>
  #endif
  
+ #include "access/committs.h"
  #include "access/gin.h"
  #include "access/transam.h"
  #include "access/twophase.h"
***************
*** 792,797 **** static struct config_bool ConfigureNamesBool[] =
--- 793,807 ----
  		check_bonjour, NULL, NULL
  	},
  	{
+ 		{"track_commit_timestamp", PGC_POSTMASTER, REPLICATION,
+ 			gettext_noop("Collects transaction commit time."),
+ 			NULL
+ 		},
+ 		&commit_ts_enabled,
+ 		false,
+ 		NULL, NULL, NULL
+ 	},
+ 	{
  		{"ssl", PGC_POSTMASTER, CONN_AUTH_SECURITY,
  			gettext_noop("Enables SSL connections."),
  			NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 220,225 ****
--- 220,228 ----
  #wal_keep_segments = 0		# in logfile segments, 16MB each; 0 disables
  #wal_sender_timeout = 60s	# in milliseconds; 0 disables
  
+ #track_commit_timestamp = off	# collect timestamp of transaction commit
+ 				# (change requires restart)
+ 
  # - Master Server -
  
  # These settings are ignored on a standby server.
*** a/src/bin/initdb/initdb.c
--- b/src/bin/initdb/initdb.c
***************
*** 187,192 **** const char *subdirs[] = {
--- 187,193 ----
  	"pg_xlog",
  	"pg_xlog/archive_status",
  	"pg_clog",
+ 	"pg_committs",
  	"pg_dynshmem",
  	"pg_notify",
  	"pg_serial",
*** a/src/bin/pg_controldata/pg_controldata.c
--- b/src/bin/pg_controldata/pg_controldata.c
***************
*** 238,243 **** main(int argc, char *argv[])
--- 238,245 ----
  		   ControlFile.checkPointCopy.oldestMulti);
  	printf(_("Latest checkpoint's oldestMulti's DB: %u\n"),
  		   ControlFile.checkPointCopy.oldestMultiDB);
+ 	printf(_("Latest checkpoint's oldestCommitTs:   %u\n"),
+ 		   ControlFile.checkPointCopy.oldestCommitTs);
  	printf(_("Time of latest checkpoint:            %s\n"),
  		   ckpttime_str);
  	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
*** /dev/null
--- b/src/include/access/committs.h
***************
*** 0 ****
--- 1,61 ----
+ /*
+  * committs.h
+  *
+  * PostgreSQL commit timestamp manager
+  *
+  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * src/include/access/committs.h
+  */
+ #ifndef COMMITTS_H
+ #define COMMITTS_H
+ 
+ #include "access/xlog.h"
+ #include "datatype/timestamp.h"
+ 
+ 
+ extern PGDLLIMPORT bool	commit_ts_enabled;
+ 
+ typedef uint32 CommitExtraData;
+ 
+ extern void TransactionTreeSetCommitTimestamp(TransactionId xid, int nsubxids,
+ 								  TransactionId *subxids,
+ 								  TimestampTz timestamp,
+ 								  CommitExtraData data,
+ 								  bool do_xlog);
+ extern void TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
+ 							 CommitExtraData *data);
+ extern TransactionId GetLatestCommitTimestampData(TimestampTz *ts,
+ 							 CommitExtraData *extra);
+ 
+ extern Size CommitTsShmemBuffers(void);
+ extern Size CommitTsShmemSize(void);
+ extern void CommitTsShmemInit(void);
+ extern void BootStrapCommitTs(void);
+ extern void StartupCommitTs(void);
+ extern void ShutdownCommitTs(void);
+ extern void CheckPointCommitTs(void);
+ extern void ExtendCommitTs(TransactionId newestXact);
+ extern void TruncateCommitTs(TransactionId oldestXact);
+ extern void SetCommitTsLimit(TransactionId oldestXact);
+ 
+ /* XLOG stuff */
+ #define COMMITTS_ZEROPAGE		0x00
+ #define COMMITTS_TRUNCATE		0x10
+ #define COMMITTS_SETTS			0x20
+ 
+ typedef struct xl_committs_set
+ {
+ 	TimestampTz		timestamp;
+ 	CommitExtraData	data;
+ 	TransactionId	mainxid;
+ 	int				nsubxids;
+ 	TransactionId	subxids[FLEXIBLE_ARRAY_MEMBER];
+ } xl_committs_set;
+ 
+ 
+ extern void committs_redo(XLogRecPtr lsn, XLogRecord *record);
+ extern void committs_desc(StringInfo buf, uint8 xl_info, char *rec);
+ 
+ #endif   /* COMMITTS_H */
*** a/src/include/access/rmgrlist.h
--- b/src/include/access/rmgrlist.h
***************
*** 42,44 **** PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup
--- 42,45 ----
  PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL)
  PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, NULL, NULL, NULL)
  PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_xlog_startup, spg_xlog_cleanup, NULL)
+ PG_RMGR(RM_COMMITTS_ID, "CommitTs", committs_redo, committs_desc, NULL, NULL, NULL)
*** a/src/include/access/transam.h
--- b/src/include/access/transam.h
***************
*** 119,124 **** typedef struct VariableCacheData
--- 119,129 ----
  	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
  
  	/*
+ 	 * These fields are protected by CommitTsControlLock
+ 	 */
+ 	TransactionId oldestCommitTs;
+ 
+ 	/*
  	 * These fields are protected by ProcArrayLock.
  	 */
  	TransactionId latestCompletedXid;	/* newest XID that has committed or
*** a/src/include/catalog/pg_control.h
--- b/src/include/catalog/pg_control.h
***************
*** 46,51 **** typedef struct CheckPoint
--- 46,52 ----
  	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
  	Oid			oldestMultiDB;	/* database with minimum datminmxid */
  	pg_time_t	time;			/* time stamp of checkpoint */
+ 	TransactionId oldestCommitTs; /* oldest Xid with valid commit timestamp */
  
  	/*
  	 * Oldest XID still running. This is only needed to initialize hot standby
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 2914,2919 **** DESCR("view two-phase transactions");
--- 2914,2931 ----
  DATA(insert OID = 3819 (  pg_get_multixact_members PGNSP PGUID 12 1 1000 0 0 f f f f t t v 1 0 2249 "28" "{28,28,25}" "{i,o,o}" "{multixid,xid,mode}" _null_ pg_get_multixact_members _null_ _null_ _null_ ));
  DESCR("view members of a multixactid");
  
+ DATA(insert OID = 3461 ( pg_get_transaction_committime PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 1184 "28" _null_ _null_ _null_ _null_ pg_get_transaction_committime _null_ _null_ _null_ ));
+ DESCR("get commit time of transaction");
+ 
+ DATA(insert OID = 3462 ( pg_get_transaction_extradata PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 23 "28" _null_ _null_ _null_ _null_ pg_get_transaction_extradata _null_ _null_ _null_ ));
+ DESCR("get additional data from transaction commit timestamp record");
+ 
+ DATA(insert OID = 3463 ( pg_get_transaction_committime_data PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 2249 "28" "{28,1184,23}" "{i,o,o}" "{xid,committime,extradata}" _null_ pg_get_transaction_committime_data _null_ _null_ _null_ ));
+ DESCR("get commit time and additional data from transaction commit timestamp record");
+ 
+ DATA(insert OID = 3464 ( pg_get_latest_transaction_committime_data PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 2249 "" "{28,1184,23}" "{o,o,o}" "{xid,committime,extradata}" _null_ pg_get_latest_transaction_committime_data _null_ _null_ _null_ ));
+ DESCR("get transaction Id, commit timestamp and additional data of latest transaction commit");
+ 
  DATA(insert OID = 3537 (  pg_describe_object		PGNSP PGUID 12 1 0 0 0 f f f f t f s 3 0 25 "26 26 23" _null_ _null_ _null_ _null_ pg_describe_object _null_ _null_ _null_ ));
  DESCR("get identification of SQL object");
  
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 59,64 **** typedef enum LWLockId
--- 59,66 ----
  	CheckpointLock,
  	CLogControlLock,
  	SubtransControlLock,
+ 	CommitTsControlLock,
+ 	CommitTsLock,
  	MultiXactGenLock,
  	MultiXactOffsetControlLock,
  	MultiXactMemberControlLock,
*** a/src/include/utils/builtins.h
--- b/src/include/utils/builtins.h
***************
*** 1151,1156 **** extern Datum pg_prepared_xact(PG_FUNCTION_ARGS);
--- 1151,1162 ----
  /* access/transam/multixact.c */
  extern Datum pg_get_multixact_members(PG_FUNCTION_ARGS);
  
+ /* access/transam/committs.c */
+ extern Datum pg_get_transaction_committime(PG_FUNCTION_ARGS);
+ extern Datum pg_get_transaction_extradata(PG_FUNCTION_ARGS);
+ extern Datum pg_get_transaction_committime_data(PG_FUNCTION_ARGS);
+ extern Datum pg_get_latest_transaction_committime_data(PG_FUNCTION_ARGS);
+ 
  /* catalogs/dependency.c */
  extern Datum pg_describe_object(PG_FUNCTION_ARGS);
  extern Datum pg_identify_object(PG_FUNCTION_ARGS);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to