> And here is v10 which fixes conflicts with Heikki's WAL API changes (no
> changes otherwise).

After some slight additional changes, here's v11, which I intend to
commit early tomorrow.  The main change is moving the test module from
contrib to src/test/modules.

-- 
Álvaro Herrera                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
*** a/contrib/pg_upgrade/pg_upgrade.c
--- b/contrib/pg_upgrade/pg_upgrade.c
***************
*** 423,430 **** copy_clog_xlog_xid(void)
  	/* set the next transaction id and epoch of the new cluster */
  	prep_status("Setting next transaction ID and epoch for new cluster");
  	exec_prog(UTILITY_LOG_FILE, NULL, true,
! 			  "\"%s/pg_resetxlog\" -f -x %u \"%s\"",
! 			  new_cluster.bindir, old_cluster.controldata.chkpnt_nxtxid,
  			  new_cluster.pgdata);
  	exec_prog(UTILITY_LOG_FILE, NULL, true,
  			  "\"%s/pg_resetxlog\" -f -e %u \"%s\"",
--- 423,432 ----
  	/* set the next transaction id and epoch of the new cluster */
  	prep_status("Setting next transaction ID and epoch for new cluster");
  	exec_prog(UTILITY_LOG_FILE, NULL, true,
! 			  "\"%s/pg_resetxlog\" -f -x %u -c %u \"%s\"",
! 			  new_cluster.bindir,
! 			  old_cluster.controldata.chkpnt_nxtxid,
! 			  old_cluster.controldata.chkpnt_nxtxid,
  			  new_cluster.pgdata);
  	exec_prog(UTILITY_LOG_FILE, NULL, true,
  			  "\"%s/pg_resetxlog\" -f -e %u \"%s\"",
*** a/contrib/pg_xlogdump/rmgrdesc.c
--- b/contrib/pg_xlogdump/rmgrdesc.c
***************
*** 10,15 ****
--- 10,16 ----
  
  #include "access/brin_xlog.h"
  #include "access/clog.h"
+ #include "access/commit_ts.h"
  #include "access/gin.h"
  #include "access/gist_private.h"
  #include "access/hash.h"
*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 2673,2678 **** include_dir 'conf.d'
--- 2673,2692 ----
        </listitem>
       </varlistentry>
  
+      <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
+       <term><varname>track_commit_timestamp</varname> (<type>bool</type>)</term>
+       <indexterm>
+        <primary><varname>track_commit_timestamp</> configuration parameter</primary>
+       </indexterm>
+       <listitem>
+        <para>
+         Record commit time of transactions. This parameter
+         can only be set in <filename>postgresql.conf</> file or on the server
+         command line. The default value is <literal>off</literal>.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
       </variablelist>
      </sect2>
  
*** a/doc/src/sgml/func.sgml
--- b/doc/src/sgml/func.sgml
***************
*** 15923,15928 **** SELECT collation for ('foo' COLLATE "de_DE");
--- 15923,15960 ----
      For example <literal>10:20:10,14,15</literal> means
      <literal>xmin=10, xmax=20, xip_list=10, 14, 15</literal>.
     </para>
+ 
+    <para>
+     The functions shown in <xref linkend="functions-committs">
+     provide information about transactions that have been already committed.
+     These functions mainly provide information about when the transactions
+     were committed. They only provide useful data when
+     <xref linkend="guc-track-commit-timestamp"> configuration option is enabled
+     and only for transactions that were committed after it was enabled.
+    </para>
+ 
+    <table id="functions-committs">
+     <title>Committed transaction information</title>
+     <tgroup cols="3">
+      <thead>
+       <row><entry>Name</entry> <entry>Return Type</entry> <entry>Description</entry></row>
+      </thead>
+ 
+      <tbody>
+       <row>
+        <entry><literal><function>pg_xact_commit_timestamp(<parameter>xid</parameter>)</function></literal></entry>
+        <entry><type>timestamp with time zone</type></entry>
+        <entry>get commit timestamp of a transaction</entry>
+       </row>
+       <row>
+        <entry><literal><function>pg_last_committed_xact()</function></literal></entry>
+        <entry><parameter>xid</> <type>xid</>, <parameter>timestamp</> <type>timestamp with time zone</></entry>
+        <entry>get transaction Id and commit timestamp of latest transaction commit</entry>
+       </row>
+      </tbody>
+     </tgroup>
+    </table>
+ 
    </sect1>
  
    <sect1 id="functions-admin">
*** a/doc/src/sgml/ref/pg_resetxlog.sgml
--- b/doc/src/sgml/ref/pg_resetxlog.sgml
***************
*** 22,27 **** PostgreSQL documentation
--- 22,28 ----
   <refsynopsisdiv>
    <cmdsynopsis>
     <command>pg_resetxlog</command>
+    <arg choice="opt"><option>-c</option> <replaceable class="parameter">xid</replaceable></arg>
     <arg choice="opt"><option>-f</option></arg>
     <arg choice="opt"><option>-n</option></arg>
     <arg choice="opt"><option>-o</option> <replaceable class="parameter">oid</replaceable></arg>
***************
*** 77,88 **** PostgreSQL documentation
    </para>
  
    <para>
!    The <option>-o</>, <option>-x</>, <option>-e</>,
!    <option>-m</>, <option>-O</>,
!    and <option>-l</>
     options allow the next OID, next transaction ID, next transaction ID's
!    epoch, next and oldest multitransaction ID, next multitransaction offset, and WAL
!    starting address values to be set manually.  These are only needed when
     <command>pg_resetxlog</command> is unable to determine appropriate values
     by reading <filename>pg_control</>.  Safe values can be determined as
     follows:
--- 78,89 ----
    </para>
  
    <para>
!    The <option>-o</>, <option>-x</>, <option>-m</>, <option>-O</>,
!    <option>-l</> and <option>-e</>
     options allow the next OID, next transaction ID, next transaction ID's
!    epoch, next and oldest multitransaction ID, next multitransaction offset, WAL
!    starting address and the oldest transaction ID for which the commit time can
!    be retrieved values to be set manually.  These are only needed when
     <command>pg_resetxlog</command> is unable to determine appropriate values
     by reading <filename>pg_control</>.  Safe values can be determined as
     follows:
***************
*** 130,135 **** PostgreSQL documentation
--- 131,145 ----
  
      <listitem>
       <para>
+       A safe value for the oldest transaction ID for which the commit time can
+       be retrieved (<option>-c</>) can be determined by looking for the
+       numerically smallest file name in the directory <filename>pg_committs</>
+       under the data directory.  As above, the file names are in hexadecimal.
+      </para>
+     </listitem>
+ 
+     <listitem>
+      <para>
        The WAL starting address (<option>-l</>) should be
        larger than any WAL segment file name currently existing in
        the directory <filename>pg_xlog</> under the data directory.
*** a/doc/src/sgml/storage.sgml
--- b/doc/src/sgml/storage.sgml
***************
*** 67,72 **** Item
--- 67,77 ----
  </row>
  
  <row>
+  <entry><filename>pg_commit_ts</></entry>
+  <entry>Subdirectory containing transaction commit timestamp data</entry>
+ </row>
+ 
+ <row>
   <entry><filename>pg_clog</></entry>
   <entry>Subdirectory containing transaction commit status data</entry>
  </row>
*** a/src/backend/access/rmgrdesc/Makefile
--- b/src/backend/access/rmgrdesc/Makefile
***************
*** 8,14 **** subdir = src/backend/access/rmgrdesc
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = brindesc.o clogdesc.o dbasedesc.o gindesc.o gistdesc.o \
  	   hashdesc.o heapdesc.o \
  	   mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
  	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
--- 8,14 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
  	   hashdesc.o heapdesc.o \
  	   mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
  	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
*** /dev/null
--- b/src/backend/access/rmgrdesc/committsdesc.c
***************
*** 0 ****
--- 1,82 ----
+ /*-------------------------------------------------------------------------
+  *
+  * committsdesc.c
+  *    rmgr descriptor routines for access/transam/committs.c
+  *
+  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *    src/backend/access/rmgrdesc/committsdesc.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/commit_ts.h"
+ #include "utils/timestamp.h"
+ 
+ 
+ void
+ commit_ts_desc(StringInfo buf, XLogReaderState *record)
+ {
+ 	char	   *rec = XLogRecGetData(record);
+ 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ 
+ 	if (info == COMMIT_TS_ZEROPAGE)
+ 	{
+ 		int			pageno;
+ 
+ 		memcpy(&pageno, rec, sizeof(int));
+ 		appendStringInfo(buf, "%d", pageno);
+ 	}
+ 	else if (info == COMMIT_TS_TRUNCATE)
+ 	{
+ 		int			pageno;
+ 
+ 		memcpy(&pageno, rec, sizeof(int));
+ 		appendStringInfo(buf, "%d", pageno);
+ 	}
+ 	else if (info == COMMIT_TS_SETTS)
+ 	{
+ 		xl_commit_ts_set *xlrec = (xl_commit_ts_set *) rec;
+ 		int		nsubxids;
+ 
+ 		appendStringInfo(buf, "set %s/%d for: %u",
+ 						 timestamptz_to_str(xlrec->timestamp),
+ 						 xlrec->nodeid,
+ 						 xlrec->mainxid);
+ 		nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
+ 					sizeof(TransactionId));
+ 		if (nsubxids > 0)
+ 		{
+ 			int		i;
+ 			TransactionId *subxids;
+ 
+ 			subxids = palloc(sizeof(TransactionId) * nsubxids);
+ 			memcpy(subxids,
+ 				   XLogRecGetData(record) + SizeOfCommitTsSet,
+ 				   sizeof(TransactionId) * nsubxids);
+ 			for (i = 0; i < nsubxids; i++)
+ 				appendStringInfo(buf, ", %u", subxids[i]);
+ 			pfree(subxids);
+ 		}
+ 	}
+ }
+ 
+ const char *
+ commit_ts_identify(uint8 info)
+ {
+ 	switch (info)
+ 	{
+ 		case COMMIT_TS_ZEROPAGE:
+ 			return "ZEROPAGE";
+ 		case COMMIT_TS_TRUNCATE:
+ 			return "TRUNCATE";
+ 		case COMMIT_TS_SETTS:
+ 			return "SETTS";
+ 		default:
+ 			return NULL;
+ 	}
+ }
*** a/src/backend/access/rmgrdesc/xlogdesc.c
--- b/src/backend/access/rmgrdesc/xlogdesc.c
***************
*** 45,51 **** xlog_desc(StringInfo buf, XLogReaderState *record)
  		appendStringInfo(buf, "redo %X/%X; "
  						 "tli %u; prev tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
  						 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
! 						 "oldest running xid %u; %s",
  				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
  						 checkpoint->ThisTimeLineID,
  						 checkpoint->PrevTimeLineID,
--- 45,51 ----
  		appendStringInfo(buf, "redo %X/%X; "
  						 "tli %u; prev tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
  						 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
! 						 "oldest commit timestamp xid: %u; oldest running xid %u; %s",
  				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
  						 checkpoint->ThisTimeLineID,
  						 checkpoint->PrevTimeLineID,
***************
*** 58,63 **** xlog_desc(StringInfo buf, XLogReaderState *record)
--- 58,64 ----
  						 checkpoint->oldestXidDB,
  						 checkpoint->oldestMulti,
  						 checkpoint->oldestMultiDB,
+ 						 checkpoint->oldestCommitTs,
  						 checkpoint->oldestActiveXid,
  				 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
  	}
*** a/src/backend/access/transam/Makefile
--- b/src/backend/access/transam/Makefile
***************
*** 12,19 **** subdir = src/backend/access/transam
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
! 	timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
  	xloginsert.o xlogreader.o xlogutils.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 12,20 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \
! 	timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
! 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
  	xloginsert.o xlogreader.o xlogutils.o
  
  include $(top_srcdir)/src/backend/common.mk
*** a/src/backend/access/transam/clog.c
--- b/src/backend/access/transam/clog.c
***************
*** 419,425 **** TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
   *
   * Testing during the PostgreSQL 9.2 development cycle revealed that on a
   * large multi-processor system, it was possible to have more CLOG page
!  * requests in flight at one time than the numebr of CLOG buffers which existed
   * at that time, which was hardcoded to 8.  Further testing revealed that
   * performance dropped off with more than 32 CLOG buffers, possibly because
   * the linear buffer search algorithm doesn't scale well.
--- 419,425 ----
   *
   * Testing during the PostgreSQL 9.2 development cycle revealed that on a
   * large multi-processor system, it was possible to have more CLOG page
!  * requests in flight at one time than the number of CLOG buffers which existed
   * at that time, which was hardcoded to 8.  Further testing revealed that
   * performance dropped off with more than 32 CLOG buffers, possibly because
   * the linear buffer search algorithm doesn't scale well.
*** /dev/null
--- b/src/backend/access/transam/commit_ts.c
***************
*** 0 ****
--- 1,848 ----
+ /*-------------------------------------------------------------------------
+  *
+  * commit_ts.c
+  *		PostgreSQL commit timestamp manager
+  *
+  * This module is a pg_clog-like system that stores the commit timestamp
+  * for each transaction.
+  *
+  * XLOG interactions: this module generates an XLOG record whenever a new
+  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
+  * generated for setting of values when the caller requests it; this allows
+  * us to support values coming from places other than transaction commit.
+  * Other writes of CommitTS come from recording of transaction commit in
+  * xact.c, which generates its own XLOG records for these events and will
+  * re-perform the status update on redo; so we need make no additional XLOG
+  * entry here.
+  *
+  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * src/backend/access/transam/commit_ts.c
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/commit_ts.h"
+ #include "access/htup_details.h"
+ #include "access/slru.h"
+ #include "access/transam.h"
+ #include "catalog/pg_type.h"
+ #include "funcapi.h"
+ #include "miscadmin.h"
+ #include "pg_trace.h"
+ #include "utils/builtins.h"
+ #include "utils/snapmgr.h"
+ #include "utils/timestamp.h"
+ 
+ /*
+  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
+  * everywhere else in Postgres.
+  *
+  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+  * CommitTs page numbering also wraps around at
+  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
+  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
+  * explicit notice of that fact in this module, except when comparing segment
+  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
+  */
+ 
+ /*
+  * We need 8+4 bytes per xact.  Note that enlarging this struct might mean
+  * the largest possible file name is more than 5 chars long; see
+  * SlruScanDirectory.
+  */
+ typedef struct CommitTimestampEntry
+ {
+ 	TimestampTz		time;
+ 	CommitTsNodeId	nodeid;
+ } CommitTimestampEntry;
+ 
+ #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
+ 									sizeof(CommitTsNodeId))
+ 
+ #define COMMIT_TS_XACTS_PER_PAGE \
+ 	(BLCKSZ / SizeOfCommitTimestampEntry)
+ 
+ #define TransactionIdToCTsPage(xid)	\
+ 	((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
+ #define TransactionIdToCTsEntry(xid)	\
+ 	((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
+ 
+ /*
+  * Link to shared-memory data structures for CommitTs control
+  */
+ static SlruCtlData CommitTsCtlData;
+ 
+ #define CommitTsCtl (&CommitTsCtlData)
+ 
+ /*
+  * We keep a cache of the last value set in shared memory.  This is protected
+  * by CommitTsLock.
+  */
+ typedef struct CommitTimestampShared
+ {
+ 	TransactionId	xidLastCommit;
+ 	CommitTimestampEntry dataLastCommit;
+ } CommitTimestampShared;
+ 
+ CommitTimestampShared	*commitTsShared;
+ 
+ 
+ /* GUC variable */
+ bool	track_commit_timestamp;
+ 
+ static CommitTsNodeId default_node_id = InvalidCommitTsNodeId;
+ 
+ static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+ 					 TransactionId *subxids, TimestampTz ts,
+ 					 CommitTsNodeId nodeid, int pageno);
+ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
+ 						  CommitTsNodeId nodeid, int slotno);
+ static int	ZeroCommitTsPage(int pageno, bool writeXlog);
+ static bool CommitTsPagePrecedes(int page1, int page2);
+ static void WriteZeroPageXlogRec(int pageno);
+ static void WriteTruncateXlogRec(int pageno);
+ static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
+ 						 TransactionId *subxids, TimestampTz timestamp,
+ 						 CommitTsNodeId nodeid);
+ 
+ 
+ /*
+  * CommitTsSetDefaultNodeId
+  *
+  * Set default nodeid for current backend.
+  */
+ void
+ CommitTsSetDefaultNodeId(CommitTsNodeId nodeid)
+ {
+ 	default_node_id = nodeid;
+ }
+ 
+ /*
+  * CommitTsGetDefaultNodeId
+  *
+  * Set default nodeid for current backend.
+  */
+ CommitTsNodeId
+ CommitTsGetDefaultNodeId(void)
+ {
+ 	return default_node_id;
+ }
+ 
+ /*
+  * TransactionTreeSetCommitTsData
+  *
+  * Record the final commit timestamp of transaction entries in the commit log
+  * for a transaction and its subtransaction tree, as efficiently as possible.
+  *
+  * xid is the top level transaction id.
+  *
+  * subxids is an array of xids of length nsubxids, representing subtransactions
+  * in the tree of xid. In various cases nsubxids may be zero.
+  * The reason why tracking just the parent xid commit timestamp is not enough
+  * is that the subtrans SLRU does not stay valid across crashes (it's not
+  * permanent) so we need to keep the information about them here. If the
+  * subtrans implementation changes in the future, we might want to revisit the
+  * decision of storing timestamp info for each subxid.
+  *
+  * The do_xlog parameter tells us whether to include a XLog record of this
+  * or not.  Normal path through RecordTransactionCommit() will be related
+  * to a transaction commit XLog record, and so should pass "false" here.
+  * Other callers probably want to pass true, so that the given values persist
+  * in case of crashes.
+  */
+ void
+ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
+ 							   TransactionId *subxids, TimestampTz timestamp,
+ 							   CommitTsNodeId nodeid, bool do_xlog)
+ {
+ 	int			i;
+ 	TransactionId headxid;
+ 
+ 	Assert(xid != InvalidTransactionId);
+ 
+ 	if (!track_commit_timestamp)
+ 		return;
+ 
+ 	/*
+ 	 * Comply with the WAL-before-data rule: if caller specified it wants
+ 	 * this value to be recorded in WAL, do so before touching the data.
+ 	 */
+ 	if (do_xlog)
+ 		WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
+ 
+ 	/*
+ 	 * We split the xids to set the timestamp to in groups belonging to the
+ 	 * same SLRU page; the first element in each such set is its head.  The
+ 	 * first group has the main XID as the head; subsequent sets use the
+ 	 * first subxid not on the previous page as head.  This way, we only have
+ 	 * to lock/modify each SLRU page once.
+ 	 */
+ 	for (i = 0, headxid = xid;;)
+ 	{
+ 		int			pageno = TransactionIdToCTsPage(headxid);
+ 		int			j;
+ 
+ 		for (j = i; j < nsubxids; j++)
+ 		{
+ 			if (TransactionIdToCTsPage(subxids[j]) != pageno)
+ 				break;
+ 		}
+ 		/* subxids[i..j] are on the same page as the head */
+ 
+ 		SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
+ 							 pageno);
+ 
+ 		/* if we wrote out all subxids, we're done. */
+ 		if (j + 1 >= nsubxids)
+ 			break;
+ 
+ 		/*
+ 		 * Set the new head and skip over it, as well as over the subxids
+ 		 * we just wrote.
+ 		 */
+ 		headxid = subxids[j];
+ 		i += j - i + 1;
+ 	}
+ 
+ 	/*
+ 	 * Update the cached value in shared memory
+ 	 */
+ 	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+ 	commitTsShared->xidLastCommit = xid;
+ 	commitTsShared->dataLastCommit.time = timestamp;
+ 	commitTsShared->dataLastCommit.nodeid = nodeid;
+ 	LWLockRelease(CommitTsLock);
+ }
+ 
+ /*
+  * Record the commit timestamp of transaction entries in the commit log for all
+  * entries on a single page.  Atomic only on this page.
+  */
+ static void
+ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+ 					 TransactionId *subxids, TimestampTz ts,
+ 					 CommitTsNodeId nodeid, int pageno)
+ {
+ 	int			slotno;
+ 	int			i;
+ 
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 	slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
+ 
+ 	TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
+ 	for (i = 0; i < nsubxids; i++)
+ 		TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
+ 
+ 	CommitTsCtl->shared->page_dirty[slotno] = true;
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * Sets the commit timestamp of a single transaction.
+  *
+  * Must be called with CommitTsControlLock held
+  */
+ static void
+ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
+ 						 CommitTsNodeId nodeid, int slotno)
+ {
+ 	int			entryno = TransactionIdToCTsEntry(xid);
+ 	CommitTimestampEntry entry;
+ 
+ 	entry.time = ts;
+ 	entry.nodeid = nodeid;
+ 
+ 	memcpy(CommitTsCtl->shared->page_buffer[slotno] +
+ 		   SizeOfCommitTimestampEntry * entryno,
+ 		   &entry, SizeOfCommitTimestampEntry);
+ }
+ 
+ /*
+  * Interrogate the commit timestamp of a transaction.
+  */
+ void
+ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
+ 							 CommitTsNodeId *nodeid)
+ {
+ 	int			pageno = TransactionIdToCTsPage(xid);
+ 	int			entryno = TransactionIdToCTsEntry(xid);
+ 	int			slotno;
+ 	CommitTimestampEntry entry;
+ 	TransactionId oldestCommitTs;
+ 
+ 	/* Error if module not enabled */
+ 	if (!track_commit_timestamp)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ 				 errmsg("could not get commit timestamp data"),
+ 				 errhint("Make sure the configuration parameter \"%s\" is set.",
+ 						 "track_commit_timestamp")));
+ 
+ 	/*
+ 	 * Return empty if the requested value is older than what we have or newer
+ 	 * than newest we have.  The reason it's acceptable to use an unlocked read
+ 	 * for xidLastCommit is that that value can only move forwards, and it's
+ 	 * okay to read a value slightly older than the one we read below.
+ 	 */
+ 	LWLockAcquire(CommitTsControlLock, LW_SHARED);
+ 	oldestCommitTs = ShmemVariableCache->oldestCommitTs;
+ 	LWLockRelease(CommitTsControlLock);
+ 
+ 	if (!TransactionIdIsValid(oldestCommitTs) ||
+ 		TransactionIdPrecedes(xid, oldestCommitTs) ||
+ 		TransactionIdPrecedes(commitTsShared->xidLastCommit, xid))
+ 	{
+ 		if (ts)
+ 			TIMESTAMP_NOBEGIN(*ts);
+ 		if (nodeid)
+ 			*nodeid = InvalidCommitTsNodeId;
+ 		return;
+ 	}
+ 
+ 	/*
+ 	 * Use an unlocked atomic read on our cached value in shared memory; if
+ 	 * it's a hit, acquire a lock and read the data, after verifying that it's
+ 	 * still what we initially read.  Otherwise, fall through to read from
+ 	 * SLRU.
+ 	 */
+ 	if (commitTsShared->xidLastCommit == xid)
+ 	{
+ 		LWLockAcquire(CommitTsLock, LW_SHARED);
+ 		if (commitTsShared->xidLastCommit == xid)
+ 		{
+ 			if (ts)
+ 				*ts = commitTsShared->dataLastCommit.time;
+ 			if (nodeid)
+ 				*nodeid = commitTsShared->dataLastCommit.nodeid;
+ 			LWLockRelease(CommitTsLock);
+ 			return;
+ 		}
+ 		LWLockRelease(CommitTsLock);
+ 	}
+ 
+ 	/* lock is acquired by SimpleLruReadPage_ReadOnly */
+ 	slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
+ 	memcpy(&entry,
+ 		   CommitTsCtl->shared->page_buffer[slotno] +
+ 		   SizeOfCommitTimestampEntry * entryno,
+ 		   SizeOfCommitTimestampEntry);
+ 
+ 	if (ts)
+ 		*ts = entry.time;
+ 	if (nodeid)
+ 		*nodeid = entry.nodeid;
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * Return the Xid of the latest committed transaction.  (As far as this module
+  * is concerned, anyway; it's up to the caller to ensure the value is useful
+  * for its purposes.)
+  *
+  * ts and extra are filled with the corresponding data; they can be passed
+  * as NULL if not wanted.
+  */
+ TransactionId
+ GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid)
+ {
+ 	TransactionId	xid;
+ 
+ 	/* Error if module not enabled */
+ 	if (!track_commit_timestamp)
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ 				 errmsg("could not get commit timestamp data"),
+ 				 errhint("Make sure the configuration parameter \"%s\" is set.",
+ 						 "track_commit_timestamp")));
+ 
+ 	LWLockAcquire(CommitTsLock, LW_SHARED);
+ 	xid = commitTsShared->xidLastCommit;
+ 	if (ts)
+ 		*ts = commitTsShared->dataLastCommit.time;
+ 	if (nodeid)
+ 		*nodeid = commitTsShared->dataLastCommit.nodeid;
+ 	LWLockRelease(CommitTsLock);
+ 
+ 	return xid;
+ }
+ 
+ /*
+  * SQL-callable wrapper to obtain commit time of a transaction
+  */
+ Datum
+ pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
+ {
+ 	TransactionId	xid = PG_GETARG_UINT32(0);
+ 	TimestampTz		ts;
+ 
+ 	TransactionIdGetCommitTsData(xid, &ts, NULL);
+ 
+ 	if (TIMESTAMP_IS_NOBEGIN(ts))
+ 		PG_RETURN_NULL();
+ 
+ 	PG_RETURN_TIMESTAMPTZ(ts);
+ }
+ 
+ 
+ Datum
+ pg_last_committed_xact(PG_FUNCTION_ARGS)
+ {
+ 	TransactionId	xid;
+ 	TimestampTz		ts;
+ 	Datum       values[2];
+ 	bool        nulls[2];
+ 	TupleDesc   tupdesc;
+ 	HeapTuple	htup;
+ 
+ 	/* and construct a tuple with our data */
+ 	xid = GetLatestCommitTsData(&ts, NULL);
+ 
+ 	/*
+ 	 * Construct a tuple descriptor for the result row.  This must match this
+ 	 * function's pg_proc entry!
+ 	 */
+ 	tupdesc = CreateTemplateTupleDesc(2, false);
+ 	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
+ 					   XIDOID, -1, 0);
+ 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
+ 					   TIMESTAMPTZOID, -1, 0);
+ 	tupdesc = BlessTupleDesc(tupdesc);
+ 
+ 	if (xid == InvalidTransactionId)
+ 	{
+ 		memset(nulls, true, sizeof(nulls));
+ 	}
+ 	else
+ 	{
+ 		values[0] = TransactionIdGetDatum(xid);
+ 		nulls[0] = false;
+ 
+ 		values[1] = TimestampTzGetDatum(ts);
+ 		nulls[1] = false;
+ 	}
+ 
+ 	htup = heap_form_tuple(tupdesc, values, nulls);
+ 
+ 	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+ }
+ 
+ 
+ /*
+  * Number of shared CommitTS buffers.
+  *
+  * We use a very similar logic as for the number of CLOG buffers; see comments
+  * in CLOGShmemBuffers.
+  */
+ Size
+ CommitTsShmemBuffers(void)
+ {
+ 	return Min(16, Max(4, NBuffers / 1024));
+ }
+ 
+ /*
+  * Shared memory sizing for CommitTs
+  */
+ Size
+ CommitTsShmemSize(void)
+ {
+ 	return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
+ 		sizeof(CommitTimestampShared);
+ }
+ 
+ /*
+  * Initialize CommitTs at system startup (postmaster start or standalone
+  * backend)
+  */
+ void
+ CommitTsShmemInit(void)
+ {
+ 	bool	found;
+ 
+ 	CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
+ 	SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
+ 				  CommitTsControlLock, "pg_commit_ts");
+ 
+ 	commitTsShared = ShmemInitStruct("CommitTs shared",
+ 									 sizeof(CommitTimestampShared),
+ 									 &found);
+ 
+ 	if (!IsUnderPostmaster)
+ 	{
+ 		Assert(!found);
+ 
+ 		commitTsShared->xidLastCommit = InvalidTransactionId;
+ 		TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+ 		commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId;
+ 	}
+ 	else
+ 		Assert(found);
+ }
+ 
+ /*
+  * This function must be called ONCE on system install.
+  *
+  * (The CommitTs directory is assumed to have been created by initdb, and
+  * CommitTsShmemInit must have been called already.)
+  */
+ void
+ BootStrapCommitTs(void)
+ {
+ 	/*
+ 	 * Nothing to do here at present, unlike most other SLRU modules; segments
+ 	 * are created when the server is started with this module enabled.
+ 	 * See StartupCommitTs.
+ 	 */
+ }
+ 
+ /*
+  * Initialize (or reinitialize) a page of CommitTs to zeroes.
+  * If writeXlog is TRUE, also emit an XLOG record saying we did this.
+  *
+  * The page is not actually written, just set up in shared memory.
+  * The slot number of the new page is returned.
+  *
+  * Control lock must be held at entry, and will be held at exit.
+  */
+ static int
+ ZeroCommitTsPage(int pageno, bool writeXlog)
+ {
+ 	int			slotno;
+ 
+ 	slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
+ 
+ 	if (writeXlog)
+ 		WriteZeroPageXlogRec(pageno);
+ 
+ 	return slotno;
+ }
+ 
+ /*
+  * This must be called ONCE during postmaster or standalone-backend startup,
+  * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+  */
+ void
+ StartupCommitTs(void)
+ {
+ 	TransactionId xid = ShmemVariableCache->nextXid;
+ 	int			pageno = TransactionIdToCTsPage(xid);
+ 
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * Initialize our idea of the latest page number.
+ 	 */
+ 	CommitTsCtl->shared->latest_page_number = pageno;
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * This must be called ONCE during postmaster or standalone-backend startup,
+  * when commit timestamp is enabled.  Must be called after recovery has
+  * finished.
+  *
+  * This is in charge of creating the currently active segment, if it's not
+  * already there.  The reason for this is that the server might have been
+  * running with this module disabled for a while and thus might have skipped
+  * the normal creation point.
+  */
+ void
+ CompleteCommitTsInitialization(void)
+ {
+ 	TransactionId xid = ShmemVariableCache->nextXid;
+ 	int			pageno = TransactionIdToCTsPage(xid);
+ 
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 	/*
+ 	 * Re-Initialize our idea of the latest page number.
+ 	 */
+ 	CommitTsCtl->shared->latest_page_number = pageno;
+ 
+ 	/*
+ 	 * If this module is not currently enabled, make sure we don't hand back
+ 	 * possibly-invalid data; also remove segments of old data.
+ 	 */
+ 	if (!track_commit_timestamp)
+ 	{
+ 		ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
+ 		LWLockRelease(CommitTsControlLock);
+ 
+ 		TruncateCommitTs(ReadNewTransactionId());
+ 
+ 		return;
+ 	}
+ 
+ 	/*
+ 	 * If CommitTs is enabled, but it wasn't in the previous server run, we
+ 	 * need to set the oldest value to the next Xid; that way, we will not try
+ 	 * to read data that might not have been set.
+ 	 *
+ 	 * XXX does this have a problem if a server is started with commitTs
+ 	 * enabled, then started with commitTs disabled, then restarted with it
+ 	 * enabled again?  It doesn't look like it does, because there should be a
+ 	 * checkpoint that sets the value to InvalidTransactionId at end of
+ 	 * recovery; and so any chance of injecting new transactions without
+ 	 * CommitTs values would occur after the oldestCommitTs has been set to
+ 	 * Invalid temporarily.
+ 	 */
+ 	if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
+ 		ShmemVariableCache->oldestCommitTs = ReadNewTransactionId();
+ 
+ 	/* Finally, create the current segment file, if necessary */
+ 	if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
+ 	{
+ 		int		slotno;
+ 
+ 		slotno = ZeroCommitTsPage(pageno, false);
+ 		SimpleLruWritePage(CommitTsCtl, slotno);
+ 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+ 	}
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * This must be called ONCE during postmaster or standalone-backend shutdown
+  */
+ void
+ ShutdownCommitTs(void)
+ {
+ 	/* Flush dirty CommitTs pages to disk */
+ 	SimpleLruFlush(CommitTsCtl, false);
+ }
+ 
+ /*
+  * Perform a checkpoint --- either during shutdown, or on-the-fly
+  */
+ void
+ CheckPointCommitTs(void)
+ {
+ 	/* Flush dirty CommitTs pages to disk */
+ 	SimpleLruFlush(CommitTsCtl, true);
+ }
+ 
+ /*
+  * Make sure that CommitTs has room for a newly-allocated XID.
+  *
+  * NB: this is called while holding XidGenLock.  We want it to be very fast
+  * most of the time; even when it's not so fast, no actual I/O need happen
+  * unless we're forced to write out a dirty CommitTs or xlog page to make room
+  * in shared memory.
+  *
+  * NB: the current implementation relies on track_commit_timestamp being
+  * PGC_POSTMASTER.
+  */
+ void
+ ExtendCommitTs(TransactionId newestXact)
+ {
+ 	int			pageno;
+ 
+ 	/* nothing to do if module not enabled */
+ 	if (!track_commit_timestamp)
+ 		return;
+ 
+ 	/*
+ 	 * No work except at first XID of a page.  But beware: just after
+ 	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
+ 	 */
+ 	if (TransactionIdToCTsEntry(newestXact) != 0 &&
+ 		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
+ 		return;
+ 
+ 	pageno = TransactionIdToCTsPage(newestXact);
+ 
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 	/* Zero the page and make an XLOG entry about it */
+ 	ZeroCommitTsPage(pageno, !InRecovery);
+ 
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * Remove all CommitTs segments before the one holding the passed
+  * transaction ID.
+  *
+  * Note that we don't need to flush XLOG here.
+  */
+ void
+ TruncateCommitTs(TransactionId oldestXact)
+ {
+ 	int			cutoffPage;
+ 
+ 	/*
+ 	 * The cutoff point is the start of the segment containing oldestXact. We
+ 	 * pass the *page* containing oldestXact to SimpleLruTruncate.
+ 	 */
+ 	cutoffPage = TransactionIdToCTsPage(oldestXact);
+ 
+ 	/* Check to see if there's any files that could be removed */
+ 	if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
+ 						   &cutoffPage))
+ 		return;					/* nothing to remove */
+ 
+ 	/* Write XLOG record */
+ 	WriteTruncateXlogRec(cutoffPage);
+ 
+ 	/* Now we can remove the old CommitTs segment(s) */
+ 	SimpleLruTruncate(CommitTsCtl, cutoffPage);
+ }
+ 
+ /*
+  * Set the earliest value for which commit TS can be consulted.
+  */
+ void
+ SetCommitTsLimit(TransactionId oldestXact)
+ {
+ 	/*
+ 	 * Be careful not to overwrite values that are either further into the
+ 	 * "future" or signal a disabled committs.
+ 	 */
+ 	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 	if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
+ 		TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
+ 		ShmemVariableCache->oldestCommitTs = oldestXact;
+ 	LWLockRelease(CommitTsControlLock);
+ }
+ 
+ /*
+  * Decide which of two CLOG page numbers is "older" for truncation purposes.
+  *
+  * We need to use comparison of TransactionIds here in order to do the right
+  * thing with wraparound XID arithmetic.  However, if we are asked about
+  * page number zero, we don't want to hand InvalidTransactionId to
+  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
+  * offset both xids by FirstNormalTransactionId to avoid that.
+  */
+ static bool
+ CommitTsPagePrecedes(int page1, int page2)
+ {
+ 	TransactionId xid1;
+ 	TransactionId xid2;
+ 
+ 	xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
+ 	xid1 += FirstNormalTransactionId;
+ 	xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
+ 	xid2 += FirstNormalTransactionId;
+ 
+ 	return TransactionIdPrecedes(xid1, xid2);
+ }
+ 
+ 
+ /*
+  * Write a ZEROPAGE xlog record
+  */
+ static void
+ WriteZeroPageXlogRec(int pageno)
+ {
+ 	XLogBeginInsert();
+ 	XLogRegisterData((char *) (&pageno), sizeof(int));
+ 	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
+ }
+ 
+ /*
+  * Write a TRUNCATE xlog record
+  */
+ static void
+ WriteTruncateXlogRec(int pageno)
+ {
+ 	XLogBeginInsert();
+ 	XLogRegisterData((char *) (&pageno), sizeof(int));
+ 	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
+ }
+ 
+ /*
+  * Write a SETTS xlog record
+  */
+ static void
+ WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
+ 						 TransactionId *subxids, TimestampTz timestamp,
+ 						 CommitTsNodeId nodeid)
+ {
+ 	xl_commit_ts_set	record;
+ 
+ 	record.timestamp = timestamp;
+ 	record.nodeid = nodeid;
+ 	record.mainxid = mainxid;
+ 
+ 	XLogBeginInsert();
+ 	XLogRegisterData((char *) &record,
+ 					 offsetof(xl_commit_ts_set, mainxid) +
+ 					 sizeof(TransactionId));
+ 	XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
+ 	XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
+ }
+ 
+ /*
+  * CommitTS resource manager's routines
+  */
+ void
+ commit_ts_redo(XLogReaderState *record)
+ {
+ 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ 
+ 	/* Backup blocks are not used in commit_ts records */
+ 	Assert(!XLogRecHasAnyBlockRefs(record));
+ 
+ 	if (info == COMMIT_TS_ZEROPAGE)
+ 	{
+ 		int			pageno;
+ 		int			slotno;
+ 
+ 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ 
+ 		LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+ 
+ 		slotno = ZeroCommitTsPage(pageno, false);
+ 		SimpleLruWritePage(CommitTsCtl, slotno);
+ 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+ 
+ 		LWLockRelease(CommitTsControlLock);
+ 	}
+ 	else if (info == COMMIT_TS_TRUNCATE)
+ 	{
+ 		int			pageno;
+ 
+ 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+ 
+ 		/*
+ 		 * During XLOG replay, latest_page_number isn't set up yet; insert a
+ 		 * suitable value to bypass the sanity test in SimpleLruTruncate.
+ 		 */
+ 		CommitTsCtl->shared->latest_page_number = pageno;
+ 
+ 		SimpleLruTruncate(CommitTsCtl, pageno);
+ 	}
+ 	else if (info == COMMIT_TS_SETTS)
+ 	{
+ 		xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
+ 		int			nsubxids;
+ 		TransactionId *subxids;
+ 
+ 		nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
+ 					sizeof(TransactionId));
+ 		if (nsubxids > 0)
+ 		{
+ 			subxids = palloc(sizeof(TransactionId) * nsubxids);
+ 			memcpy(subxids,
+ 				   XLogRecGetData(record) + SizeOfCommitTsSet,
+ 				   sizeof(TransactionId) * nsubxids);
+ 		}
+ 		else
+ 			subxids = NULL;
+ 
+ 		TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
+ 									   setts->timestamp, setts->nodeid, false);
+ 		if (subxids)
+ 			pfree(subxids);
+ 	}
+ 	else
+ 		elog(PANIC, "commit_ts_redo: unknown op code %u", info);
+ }
*** a/src/backend/access/transam/rmgr.c
--- b/src/backend/access/transam/rmgr.c
***************
*** 8,13 ****
--- 8,14 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/commit_ts.h"
  #include "access/gin.h"
  #include "access/gist_private.h"
  #include "access/hash.h"
*** a/src/backend/access/transam/slru.c
--- b/src/backend/access/transam/slru.c
***************
*** 1297,1303 **** SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
  
  		len = strlen(clde->d_name);
  
! 		if ((len == 4 || len == 5) &&
  			strspn(clde->d_name, "0123456789ABCDEF") == len)
  		{
  			segno = (int) strtol(clde->d_name, NULL, 16);
--- 1297,1303 ----
  
  		len = strlen(clde->d_name);
  
! 		if ((len == 4 || len == 5 || len == 6) &&
  			strspn(clde->d_name, "0123456789ABCDEF") == len)
  		{
  			segno = (int) strtol(clde->d_name, NULL, 16);
*** a/src/backend/access/transam/varsup.c
--- b/src/backend/access/transam/varsup.c
***************
*** 14,19 ****
--- 14,20 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/commit_ts.h"
  #include "access/subtrans.h"
  #include "access/transam.h"
  #include "access/xact.h"
***************
*** 158,166 **** GetNewTransactionId(bool isSubXact)
  	 * XID before we zero the page.  Fortunately, a page of the commit log
  	 * holds 32K or more transactions, so we don't have to do this very often.
  	 *
! 	 * Extend pg_subtrans too.
  	 */
  	ExtendCLOG(xid);
  	ExtendSUBTRANS(xid);
  
  	/*
--- 159,168 ----
  	 * XID before we zero the page.  Fortunately, a page of the commit log
  	 * holds 32K or more transactions, so we don't have to do this very often.
  	 *
! 	 * Extend pg_subtrans and pg_commit_ts too.
  	 */
  	ExtendCLOG(xid);
+ 	ExtendCommitTs(xid);
  	ExtendSUBTRANS(xid);
  
  	/*
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 20,25 ****
--- 20,26 ----
  #include <time.h>
  #include <unistd.h>
  
+ #include "access/commit_ts.h"
  #include "access/multixact.h"
  #include "access/subtrans.h"
  #include "access/transam.h"
***************
*** 1135,1140 **** RecordTransactionCommit(void)
--- 1136,1156 ----
  	}
  
  	/*
+ 	 * We only need to log the commit timestamp separately if the node
+ 	 * identifier is a valid value; the commit record above already contains
+ 	 * the timestamp info otherwise, and will be used to load it.
+ 	 */
+ 	if (markXidCommitted)
+ 	{
+ 		CommitTsNodeId		node_id;
+ 
+ 		node_id = CommitTsGetDefaultNodeId();
+ 		TransactionTreeSetCommitTsData(xid, nchildren, children,
+ 									   xactStopTimestamp,
+ 									   node_id, node_id != InvalidCommitTsNodeId);
+ 	}
+ 
+ 	/*
  	 * Check if we want to commit asynchronously.  We can allow the XLOG flush
  	 * to happen asynchronously if synchronous_commit=off, or if the current
  	 * transaction has not performed any WAL-logged operation.  The latter
***************
*** 4644,4649 **** xactGetCommittedChildren(TransactionId **ptr)
--- 4660,4666 ----
   */
  static void
  xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
+ 						  TimestampTz commit_time,
  						  TransactionId *sub_xids, int nsubxacts,
  						  SharedInvalidationMessage *inval_msgs, int nmsgs,
  						  RelFileNode *xnodes, int nrels,
***************
*** 4671,4676 **** xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
--- 4688,4697 ----
  		LWLockRelease(XidGenLock);
  	}
  
+ 	/* Set the transaction commit timestamp and metadata */
+ 	TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
+ 								   commit_time, InvalidCommitTsNodeId, false);
+ 
  	if (standbyState == STANDBY_DISABLED)
  	{
  		/*
***************
*** 4790,4796 **** xact_redo_commit(xl_xact_commit *xlrec,
  	/* invalidation messages array follows subxids */
  	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
  
! 	xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
  							  inval_msgs, xlrec->nmsgs,
  							  xlrec->xnodes, xlrec->nrels,
  							  xlrec->dbId,
--- 4811,4818 ----
  	/* invalidation messages array follows subxids */
  	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
  
! 	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
! 							  subxacts, xlrec->nsubxacts,
  							  inval_msgs, xlrec->nmsgs,
  							  xlrec->xnodes, xlrec->nrels,
  							  xlrec->dbId,
***************
*** 4805,4811 **** static void
  xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
  						 TransactionId xid, XLogRecPtr lsn)
  {
! 	xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
  							  NULL, 0,	/* inval msgs */
  							  NULL, 0,	/* relfilenodes */
  							  InvalidOid,		/* dbId */
--- 4827,4834 ----
  xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
  						 TransactionId xid, XLogRecPtr lsn)
  {
! 	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
! 							  xlrec->subxacts, xlrec->nsubxacts,
  							  NULL, 0,	/* inval msgs */
  							  NULL, 0,	/* relfilenodes */
  							  InvalidOid,		/* dbId */
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 22,27 ****
--- 22,28 ----
  #include <unistd.h>
  
  #include "access/clog.h"
+ #include "access/commit_ts.h"
  #include "access/multixact.h"
  #include "access/rewriteheap.h"
  #include "access/subtrans.h"
***************
*** 4518,4523 **** BootStrapXLOG(void)
--- 4519,4525 ----
  	checkPoint.oldestXidDB = TemplateDbOid;
  	checkPoint.oldestMulti = FirstMultiXactId;
  	checkPoint.oldestMultiDB = TemplateDbOid;
+ 	checkPoint.oldestCommitTs = InvalidTransactionId;
  	checkPoint.time = (pg_time_t) time(NULL);
  	checkPoint.oldestActiveXid = InvalidTransactionId;
  
***************
*** 4527,4532 **** BootStrapXLOG(void)
--- 4529,4535 ----
  	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
  	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
  	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+ 	SetCommitTsLimit(InvalidTransactionId);
  
  	/* Set up the XLOG page header */
  	page->xlp_magic = XLOG_PAGE_MAGIC;
***************
*** 4606,4611 **** BootStrapXLOG(void)
--- 4609,4615 ----
  	ControlFile->max_locks_per_xact = max_locks_per_xact;
  	ControlFile->wal_level = wal_level;
  	ControlFile->wal_log_hints = wal_log_hints;
+ 	ControlFile->track_commit_timestamp = track_commit_timestamp;
  	ControlFile->data_checksum_version = bootstrap_data_checksum_version;
  
  	/* some additional ControlFile fields are set in WriteControlFile() */
***************
*** 4614,4619 **** BootStrapXLOG(void)
--- 4618,4624 ----
  
  	/* Bootstrap the commit log, too */
  	BootStrapCLOG();
+ 	BootStrapCommitTs();
  	BootStrapSUBTRANS();
  	BootStrapMultiXact();
  
***************
*** 5865,5870 **** StartupXLOG(void)
--- 5870,5878 ----
  	ereport(DEBUG1,
  			(errmsg("oldest MultiXactId: %u, in database %u",
  					checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
+ 	ereport(DEBUG1,
+ 			(errmsg("oldest commit timestamp Xid: %u",
+ 					checkPoint.oldestCommitTs)));
  	if (!TransactionIdIsNormal(checkPoint.nextXid))
  		ereport(PANIC,
  				(errmsg("invalid next transaction ID")));
***************
*** 5876,5881 **** StartupXLOG(void)
--- 5884,5890 ----
  	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
  	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
  	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+ 	SetCommitTsLimit(checkPoint.oldestCommitTs);
  	MultiXactSetSafeTruncate(checkPoint.oldestMulti);
  	XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
  	XLogCtl->ckptXid = checkPoint.nextXid;
***************
*** 6098,6108 **** StartupXLOG(void)
  			ProcArrayInitRecovery(ShmemVariableCache->nextXid);
  
  			/*
! 			 * Startup commit log and subtrans only. MultiXact has already
! 			 * been started up and other SLRUs are not maintained during
! 			 * recovery and need not be started yet.
  			 */
  			StartupCLOG();
  			StartupSUBTRANS(oldestActiveXID);
  
  			/*
--- 6107,6118 ----
  			ProcArrayInitRecovery(ShmemVariableCache->nextXid);
  
  			/*
! 			 * Startup commit log, commit timestamp and subtrans only.
! 			 * MultiXact has already been started up and other SLRUs are not
! 			 * maintained during recovery and need not be started yet.
  			 */
  			StartupCLOG();
+ 			StartupCommitTs();
  			StartupSUBTRANS(oldestActiveXID);
  
  			/*
***************
*** 6751,6762 **** StartupXLOG(void)
  	LWLockRelease(ProcArrayLock);
  
  	/*
! 	 * Start up the commit log and subtrans, if not already done for hot
! 	 * standby.
  	 */
  	if (standbyState == STANDBY_DISABLED)
  	{
  		StartupCLOG();
  		StartupSUBTRANS(oldestActiveXID);
  	}
  
--- 6761,6773 ----
  	LWLockRelease(ProcArrayLock);
  
  	/*
! 	 * Start up the commit log, commit timestamp and subtrans, if not already
! 	 * done for hot standby.
  	 */
  	if (standbyState == STANDBY_DISABLED)
  	{
  		StartupCLOG();
+ 		StartupCommitTs();
  		StartupSUBTRANS(oldestActiveXID);
  	}
  
***************
*** 6792,6797 **** StartupXLOG(void)
--- 6803,6814 ----
  	XLogReportParameters();
  
  	/*
+ 	 * Local WAL inserts enabled, so it's time to finish initialization
+ 	 * of commit timestamp.
+ 	 */
+ 	CompleteCommitTsInitialization();
+ 
+ 	/*
  	 * All done.  Allow backends to write WAL.  (Although the bool flag is
  	 * probably atomic in itself, we use the info_lck here to ensure that
  	 * there are no race conditions concerning visibility of other recent
***************
*** 7358,7363 **** ShutdownXLOG(int code, Datum arg)
--- 7375,7381 ----
  		CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
  	}
  	ShutdownCLOG();
+ 	ShutdownCommitTs();
  	ShutdownSUBTRANS();
  	ShutdownMultiXact();
  
***************
*** 7684,7689 **** CreateCheckPoint(int flags)
--- 7702,7711 ----
  	checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
  	LWLockRelease(XidGenLock);
  
+ 	LWLockAcquire(CommitTsControlLock, LW_SHARED);
+ 	checkPoint.oldestCommitTs = ShmemVariableCache->oldestCommitTs;
+ 	LWLockRelease(CommitTsControlLock);
+ 
  	/* Increase XID epoch if we've wrapped around since last checkpoint */
  	checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
  	if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
***************
*** 7961,7966 **** static void
--- 7983,7989 ----
  CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
  {
  	CheckPointCLOG();
+ 	CheckPointCommitTs();
  	CheckPointSUBTRANS();
  	CheckPointMultiXact();
  	CheckPointPredicate();
***************
*** 8389,8395 **** XLogReportParameters(void)
  		MaxConnections != ControlFile->MaxConnections ||
  		max_worker_processes != ControlFile->max_worker_processes ||
  		max_prepared_xacts != ControlFile->max_prepared_xacts ||
! 		max_locks_per_xact != ControlFile->max_locks_per_xact)
  	{
  		/*
  		 * The change in number of backend slots doesn't need to be WAL-logged
--- 8412,8419 ----
  		MaxConnections != ControlFile->MaxConnections ||
  		max_worker_processes != ControlFile->max_worker_processes ||
  		max_prepared_xacts != ControlFile->max_prepared_xacts ||
! 		max_locks_per_xact != ControlFile->max_locks_per_xact ||
! 		track_commit_timestamp != ControlFile->track_commit_timestamp)
  	{
  		/*
  		 * The change in number of backend slots doesn't need to be WAL-logged
***************
*** 8409,8414 **** XLogReportParameters(void)
--- 8433,8439 ----
  			xlrec.max_locks_per_xact = max_locks_per_xact;
  			xlrec.wal_level = wal_level;
  			xlrec.wal_log_hints = wal_log_hints;
+ 			xlrec.track_commit_timestamp = track_commit_timestamp;
  
  			XLogBeginInsert();
  			XLogRegisterData((char *) &xlrec, sizeof(xlrec));
***************
*** 8423,8428 **** XLogReportParameters(void)
--- 8448,8454 ----
  		ControlFile->max_locks_per_xact = max_locks_per_xact;
  		ControlFile->wal_level = wal_level;
  		ControlFile->wal_log_hints = wal_log_hints;
+ 		ControlFile->track_commit_timestamp = track_commit_timestamp;
  		UpdateControlFile();
  	}
  }
***************
*** 8799,8804 **** xlog_redo(XLogReaderState *record)
--- 8825,8831 ----
  		ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
  		ControlFile->wal_level = xlrec.wal_level;
  		ControlFile->wal_log_hints = wal_log_hints;
+ 		ControlFile->track_commit_timestamp = track_commit_timestamp;
  
  		/*
  		 * Update minRecoveryPoint to ensure that if recovery is aborted, we
*** a/src/backend/access/transam/xloginsert.c
--- b/src/backend/access/transam/xloginsert.c
***************
*** 299,305 **** XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
   * Add data to the WAL record that's being constructed.
   *
   * The data is appended to the "main chunk", available at replay with
!  * XLogGetRecData().
   */
  void
  XLogRegisterData(char *data, int len)
--- 299,305 ----
   * Add data to the WAL record that's being constructed.
   *
   * The data is appended to the "main chunk", available at replay with
!  * XLogRecGetData().
   */
  void
  XLogRegisterData(char *data, int len)
*** a/src/backend/commands/vacuum.c
--- b/src/backend/commands/vacuum.c
***************
*** 23,28 ****
--- 23,29 ----
  #include <math.h>
  
  #include "access/clog.h"
+ #include "access/commit_ts.h"
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/htup_details.h"
***************
*** 1071,1080 **** vac_truncate_clog(TransactionId frozenXID,
  		return;
  
  	/*
! 	 * Truncate CLOG to the oldest computed value.  Note we don't truncate
! 	 * multixacts; that will be done by the next checkpoint.
  	 */
  	TruncateCLOG(frozenXID);
  
  	/*
  	 * Update the wrap limit for GetNewTransactionId and creation of new
--- 1072,1083 ----
  		return;
  
  	/*
! 	 * Truncate CLOG and CommitTs to the oldest computed value.
! 	 * Note we don't truncate multixacts; that will be done by the next
! 	 * checkpoint.
  	 */
  	TruncateCLOG(frozenXID);
+ 	TruncateCommitTs(frozenXID);
  
  	/*
  	 * Update the wrap limit for GetNewTransactionId and creation of new
***************
*** 1084,1089 **** vac_truncate_clog(TransactionId frozenXID,
--- 1087,1093 ----
  	 */
  	SetTransactionIdLimit(frozenXID, oldestxid_datoid);
  	SetMultiXactIdLimit(minMulti, minmulti_datoid);
+ 	SetCommitTsLimit(frozenXID);
  }
  
  
*** a/src/backend/libpq/hba.c
--- b/src/backend/libpq/hba.c
***************
*** 1440,1446 **** parse_hba_auth_opt(char *name, char *val, HbaLine *hbaline, int line_num)
  				ereport(LOG,
  						(errcode(ERRCODE_CONFIG_FILE_ERROR),
  						 errmsg("client certificates can only be checked if a root certificate store is available"),
! 						 errhint("Make sure the configuration parameter \"ssl_ca_file\" is set."),
  						 errcontext("line %d of configuration file \"%s\"",
  									line_num, HbaFileName)));
  				return false;
--- 1440,1446 ----
  				ereport(LOG,
  						(errcode(ERRCODE_CONFIG_FILE_ERROR),
  						 errmsg("client certificates can only be checked if a root certificate store is available"),
! 						 errhint("Make sure the configuration parameter \"%s\" is set.", "ssl_ca_file"),
  						 errcontext("line %d of configuration file \"%s\"",
  									line_num, HbaFileName)));
  				return false;
*** a/src/backend/replication/logical/decode.c
--- b/src/backend/replication/logical/decode.c
***************
*** 133,138 **** LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
--- 133,139 ----
  		case RM_SEQ_ID:
  		case RM_SPGIST_ID:
  		case RM_BRIN_ID:
+ 		case RM_COMMIT_TS_ID:
  			break;
  		case RM_NEXT_ID:
  			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
*** a/src/backend/storage/ipc/ipci.c
--- b/src/backend/storage/ipc/ipci.c
***************
*** 15,20 ****
--- 15,21 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/commit_ts.h"
  #include "access/heapam.h"
  #include "access/multixact.h"
  #include "access/nbtree.h"
***************
*** 117,122 **** CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
--- 118,124 ----
  		size = add_size(size, ProcGlobalShmemSize());
  		size = add_size(size, XLOGShmemSize());
  		size = add_size(size, CLOGShmemSize());
+ 		size = add_size(size, CommitTsShmemSize());
  		size = add_size(size, SUBTRANSShmemSize());
  		size = add_size(size, TwoPhaseShmemSize());
  		size = add_size(size, BackgroundWorkerShmemSize());
***************
*** 198,203 **** CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
--- 200,206 ----
  	 */
  	XLOGShmemInit();
  	CLOGShmemInit();
+ 	CommitTsShmemInit();
  	SUBTRANSShmemInit();
  	MultiXactShmemInit();
  	InitBufferPool();
*** a/src/backend/storage/lmgr/lwlock.c
--- b/src/backend/storage/lmgr/lwlock.c
***************
*** 29,34 ****
--- 29,35 ----
  #include "postgres.h"
  
  #include "access/clog.h"
+ #include "access/commit_ts.h"
  #include "access/multixact.h"
  #include "access/subtrans.h"
  #include "commands/async.h"
***************
*** 259,264 **** NumLWLocks(void)
--- 260,268 ----
  	/* clog.c needs one per CLOG buffer */
  	numLocks += CLOGShmemBuffers();
  
+ 	/* commit_ts.c needs one per CommitTs buffer */
+ 	numLocks += CommitTsShmemBuffers();
+ 
  	/* subtrans.c needs one per SubTrans buffer */
  	numLocks += NUM_SUBTRANS_BUFFERS;
  
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 26,31 ****
--- 26,32 ----
  #include <syslog.h>
  #endif
  
+ #include "access/commit_ts.h"
  #include "access/gin.h"
  #include "access/transam.h"
  #include "access/twophase.h"
***************
*** 826,831 **** static struct config_bool ConfigureNamesBool[] =
--- 827,841 ----
  		check_bonjour, NULL, NULL
  	},
  	{
+ 		{"track_commit_timestamp", PGC_POSTMASTER, REPLICATION,
+ 			gettext_noop("Collects transaction commit time."),
+ 			NULL
+ 		},
+ 		&track_commit_timestamp,
+ 		false,
+ 		NULL, NULL, NULL
+ 	},
+ 	{
  		{"ssl", PGC_POSTMASTER, CONN_AUTH_SECURITY,
  			gettext_noop("Enables SSL connections."),
  			NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 228,233 ****
--- 228,235 ----
  
  #max_replication_slots = 0	# max number of replication slots
  				# (change requires restart)
+ #track_commit_timestamp = off	# collect timestamp of transaction commit
+ 				# (change requires restart)
  
  # - Master Server -
  
*** a/src/bin/initdb/initdb.c
--- b/src/bin/initdb/initdb.c
***************
*** 186,191 **** static const char *subdirs[] = {
--- 186,192 ----
  	"pg_xlog",
  	"pg_xlog/archive_status",
  	"pg_clog",
+ 	"pg_commit_ts",
  	"pg_dynshmem",
  	"pg_notify",
  	"pg_serial",
*** a/src/bin/pg_controldata/pg_controldata.c
--- b/src/bin/pg_controldata/pg_controldata.c
***************
*** 270,275 **** main(int argc, char *argv[])
--- 270,277 ----
  		   ControlFile.checkPointCopy.oldestMulti);
  	printf(_("Latest checkpoint's oldestMulti's DB: %u\n"),
  		   ControlFile.checkPointCopy.oldestMultiDB);
+ 	printf(_("Latest checkpoint's oldestCommitTs:   %u\n"),
+ 		   ControlFile.checkPointCopy.oldestCommitTs);
  	printf(_("Time of latest checkpoint:            %s\n"),
  		   ckpttime_str);
  	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
***************
*** 300,305 **** main(int argc, char *argv[])
--- 302,309 ----
  		   ControlFile.max_prepared_xacts);
  	printf(_("Current max_locks_per_xact setting:   %d\n"),
  		   ControlFile.max_locks_per_xact);
+ 	printf(_("Current track_commit_timestamp setting: %s\n"),
+ 		   ControlFile.track_commit_timestamp ? _("on") : _("off"));
  	printf(_("Maximum data alignment:               %u\n"),
  		   ControlFile.maxAlign);
  	/* we don't print floatFormat since can't say much useful about it */
*** a/src/bin/pg_resetxlog/pg_resetxlog.c
--- b/src/bin/pg_resetxlog/pg_resetxlog.c
***************
*** 63,68 **** static bool guessed = false;	/* T if we had to guess at any values */
--- 63,69 ----
  static const char *progname;
  static uint32 set_xid_epoch = (uint32) -1;
  static TransactionId set_xid = 0;
+ static TransactionId set_commit_ts = 0;
  static Oid	set_oid = 0;
  static MultiXactId set_mxid = 0;
  static MultiXactOffset set_mxoff = (MultiXactOffset) -1;
***************
*** 112,118 **** main(int argc, char *argv[])
  	}
  
  
! 	while ((c = getopt(argc, argv, "D:fl:m:no:O:x:e:")) != -1)
  	{
  		switch (c)
  		{
--- 113,119 ----
  	}
  
  
! 	while ((c = getopt(argc, argv, "c:D:e:fl:m:no:O:x:")) != -1)
  	{
  		switch (c)
  		{
***************
*** 158,163 **** main(int argc, char *argv[])
--- 159,179 ----
  				}
  				break;
  
+ 			case 'c':
+ 				set_commit_ts = strtoul(optarg, &endptr, 0);
+ 				if (endptr == optarg || *endptr != '\0')
+ 				{
+ 					fprintf(stderr, _("%s: invalid argument for option -c\n"), progname);
+ 					fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+ 					exit(1);
+ 				}
+ 				if (set_commit_ts == 0)
+ 				{
+ 					fprintf(stderr, _("%s: transaction ID (-c) must not be 0\n"), progname);
+ 					exit(1);
+ 				}
+ 				break;
+ 
  			case 'o':
  				set_oid = strtoul(optarg, &endptr, 0);
  				if (endptr == optarg || *endptr != '\0')
***************
*** 345,350 **** main(int argc, char *argv[])
--- 361,369 ----
  		ControlFile.checkPointCopy.oldestXidDB = InvalidOid;
  	}
  
+ 	if (set_commit_ts != 0)
+ 		ControlFile.checkPointCopy.oldestCommitTs = set_commit_ts;
+ 
  	if (set_oid != 0)
  		ControlFile.checkPointCopy.nextOid = set_oid;
  
***************
*** 539,544 **** GuessControlValues(void)
--- 558,564 ----
  
  	ControlFile.wal_level = WAL_LEVEL_MINIMAL;
  	ControlFile.wal_log_hints = false;
+ 	ControlFile.track_commit_timestamp = false;
  	ControlFile.MaxConnections = 100;
  	ControlFile.max_worker_processes = 8;
  	ControlFile.max_prepared_xacts = 0;
***************
*** 621,626 **** PrintControlValues(bool guessed)
--- 641,648 ----
  		   ControlFile.checkPointCopy.oldestMulti);
  	printf(_("Latest checkpoint's oldestMulti's DB: %u\n"),
  		   ControlFile.checkPointCopy.oldestMultiDB);
+ 	printf(_("Latest checkpoint's oldestCommitTs:   %u\n"),
+ 		   ControlFile.checkPointCopy.oldestCommitTs);
  	printf(_("Maximum data alignment:               %u\n"),
  		   ControlFile.maxAlign);
  	/* we don't print floatFormat since can't say much useful about it */
***************
*** 702,707 **** PrintNewControlValues()
--- 724,735 ----
  		printf(_("NextXID epoch:                        %u\n"),
  			   ControlFile.checkPointCopy.nextXidEpoch);
  	}
+ 
+ 	if (set_commit_ts != 0)
+ 	{
+ 		printf(_("oldestCommitTs:                       %u\n"),
+ 			   ControlFile.checkPointCopy.oldestCommitTs);
+ 	}
  }
  
  
***************
*** 739,744 **** RewriteControlFile(void)
--- 767,773 ----
  	 */
  	ControlFile.wal_level = WAL_LEVEL_MINIMAL;
  	ControlFile.wal_log_hints = false;
+ 	ControlFile.track_commit_timestamp = false;
  	ControlFile.MaxConnections = 100;
  	ControlFile.max_worker_processes = 8;
  	ControlFile.max_prepared_xacts = 0;
***************
*** 1099,1104 **** usage(void)
--- 1128,1134 ----
  	printf(_("%s resets the PostgreSQL transaction log.\n\n"), progname);
  	printf(_("Usage:\n  %s [OPTION]... {[-D] DATADIR}\n\n"), progname);
  	printf(_("Options:\n"));
+ 	printf(_("  -c XID           set the oldest transaction with retrievable commit timestamp\n"));
  	printf(_("  -e XIDEPOCH      set next transaction ID epoch\n"));
  	printf(_("  -f               force update to be done\n"));
  	printf(_("  -l XLOGFILE      force minimum WAL starting location for new transaction log\n"));
*** /dev/null
--- b/src/include/access/commit_ts.h
***************
*** 0 ****
--- 1,70 ----
+ /*
+  * commit_ts.h
+  *
+  * PostgreSQL commit timestamp manager
+  *
+  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * src/include/access/commit_ts.h
+  */
+ #ifndef COMMIT_TS_H
+ #define COMMIT_TS_H
+ 
+ #include "access/xlog.h"
+ #include "datatype/timestamp.h"
+ #include "utils/guc.h"
+ 
+ 
+ extern PGDLLIMPORT bool	track_commit_timestamp;
+ 
+ extern bool check_track_commit_timestamp(bool *newval, void **extra,
+ 							 GucSource source);
+ 
+ typedef uint32 CommitTsNodeId;
+ #define InvalidCommitTsNodeId 0
+ 
+ extern void CommitTsSetDefaultNodeId(CommitTsNodeId nodeid);
+ extern CommitTsNodeId CommitTsGetDefaultNodeId(void);
+ extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
+ 							   TransactionId *subxids, TimestampTz timestamp,
+ 							   CommitTsNodeId nodeid, bool do_xlog);
+ extern void TransactionIdGetCommitTsData(TransactionId xid,
+ 							 TimestampTz *ts, CommitTsNodeId *nodeid);
+ extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
+ 					  CommitTsNodeId *nodeid);
+ 
+ extern Size CommitTsShmemBuffers(void);
+ extern Size CommitTsShmemSize(void);
+ extern void CommitTsShmemInit(void);
+ extern void BootStrapCommitTs(void);
+ extern void StartupCommitTs(void);
+ extern void CompleteCommitTsInitialization(void);
+ extern void ShutdownCommitTs(void);
+ extern void CheckPointCommitTs(void);
+ extern void ExtendCommitTs(TransactionId newestXact);
+ extern void TruncateCommitTs(TransactionId oldestXact);
+ extern void SetCommitTsLimit(TransactionId oldestXact);
+ 
+ /* XLOG stuff */
+ #define COMMIT_TS_ZEROPAGE		0x00
+ #define COMMIT_TS_TRUNCATE		0x10
+ #define COMMIT_TS_SETTS			0x20
+ 
+ typedef struct xl_commit_ts_set
+ {
+ 	TimestampTz		timestamp;
+ 	CommitTsNodeId	nodeid;
+ 	TransactionId	mainxid;
+ 	/* subxact Xids follow */
+ } xl_commit_ts_set;
+ 
+ #define SizeOfCommitTsSet	(offsetof(xl_commit_ts_set, mainxid) + \
+ 							 sizeof(TransactionId))
+ 
+ 
+ extern void commit_ts_redo(XLogReaderState *record);
+ extern void commit_ts_desc(StringInfo buf, XLogReaderState *record);
+ extern const char *commit_ts_identify(uint8 info);
+ 
+ #endif   /* COMMITTS_H */
*** a/src/include/access/rmgrlist.h
--- b/src/include/access/rmgrlist.h
***************
*** 24,30 ****
   * Changes to this list possibly need a XLOG_PAGE_MAGIC bump.
   */
  
! /* symbol name, textual name, redo, desc, startup, cleanup */
  PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL)
  PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL)
  PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL)
--- 24,30 ----
   * Changes to this list possibly need a XLOG_PAGE_MAGIC bump.
   */
  
! /* symbol name, textual name, redo, desc, identify, startup, cleanup */
  PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL)
  PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL)
  PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL)
***************
*** 43,45 **** PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_start
--- 43,46 ----
  PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
  PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
  PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
+ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
*** a/src/include/access/transam.h
--- b/src/include/access/transam.h
***************
*** 124,129 **** typedef struct VariableCacheData
--- 124,134 ----
  	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
  
  	/*
+ 	 * These fields are protected by CommitTsControlLock
+ 	 */
+ 	TransactionId oldestCommitTs;
+ 
+ 	/*
  	 * These fields are protected by ProcArrayLock.
  	 */
  	TransactionId latestCompletedXid;	/* newest XID that has committed or
*** a/src/include/access/xlog_internal.h
--- b/src/include/access/xlog_internal.h
***************
*** 186,191 **** typedef struct xl_parameter_change
--- 186,192 ----
  	int			max_locks_per_xact;
  	int			wal_level;
  	bool		wal_log_hints;
+ 	bool		track_commit_timestamp;
  } xl_parameter_change;
  
  /* logs restore point */
*** a/src/include/catalog/catversion.h
--- b/src/include/catalog/catversion.h
***************
*** 53,58 ****
   */
  
  /*							yyyymmddN */
! #define CATALOG_VERSION_NO	201411241
  
  #endif
--- 53,58 ----
   */
  
  /*							yyyymmddN */
! #define CATALOG_VERSION_NO	201411242
  
  #endif
*** a/src/include/catalog/pg_control.h
--- b/src/include/catalog/pg_control.h
***************
*** 46,51 **** typedef struct CheckPoint
--- 46,52 ----
  	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
  	Oid			oldestMultiDB;	/* database with minimum datminmxid */
  	pg_time_t	time;			/* time stamp of checkpoint */
+ 	TransactionId oldestCommitTs; /* oldest Xid with valid commit timestamp */
  
  	/*
  	 * Oldest XID still running. This is only needed to initialize hot standby
***************
*** 177,182 **** typedef struct ControlFileData
--- 178,184 ----
  	int			max_worker_processes;
  	int			max_prepared_xacts;
  	int			max_locks_per_xact;
+ 	bool		track_commit_timestamp;
  
  	/*
  	 * This data is used to check for hardware-architecture compatibility of
*** a/src/include/catalog/pg_proc.h
--- b/src/include/catalog/pg_proc.h
***************
*** 3017,3022 **** DESCR("view two-phase transactions");
--- 3017,3028 ----
  DATA(insert OID = 3819 (  pg_get_multixact_members PGNSP PGUID 12 1 1000 0 0 f f f f t t v 1 0 2249 "28" "{28,28,25}" "{i,o,o}" "{multixid,xid,mode}" _null_ pg_get_multixact_members _null_ _null_ _null_ ));
  DESCR("view members of a multixactid");
  
+ DATA(insert OID = 3581 ( pg_xact_commit_timestamp PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 1184 "28" _null_ _null_ _null_ _null_ pg_xact_commit_timestamp _null_ _null_ _null_ ));
+ DESCR("get commit timestamp of a transaction");
+ 
+ DATA(insert OID = 3583 ( pg_last_committed_xact PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 2249 "" "{28,1184}" "{o,o}" "{xid,timestamp}" _null_ pg_last_committed_xact _null_ _null_ _null_ ));
+ DESCR("get transaction Id and commit timestamp of latest transaction commit");
+ 
  DATA(insert OID = 3537 (  pg_describe_object		PGNSP PGUID 12 1 0 0 0 f f f f t f s 3 0 25 "26 26 23" _null_ _null_ _null_ _null_ pg_describe_object _null_ _null_ _null_ ));
  DESCR("get identification of SQL object");
  
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 127,133 **** extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
  #define AutoFileLock				(&MainLWLockArray[35].lock)
  #define ReplicationSlotAllocationLock	(&MainLWLockArray[36].lock)
  #define ReplicationSlotControlLock		(&MainLWLockArray[37].lock)
! #define NUM_INDIVIDUAL_LWLOCKS		38
  
  /*
   * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
--- 127,136 ----
  #define AutoFileLock				(&MainLWLockArray[35].lock)
  #define ReplicationSlotAllocationLock	(&MainLWLockArray[36].lock)
  #define ReplicationSlotControlLock		(&MainLWLockArray[37].lock)
! #define CommitTsControlLock			(&MainLWLockArray[38].lock)
! #define CommitTsLock				(&MainLWLockArray[39].lock)
! 
! #define NUM_INDIVIDUAL_LWLOCKS		40
  
  /*
   * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
*** a/src/include/utils/builtins.h
--- b/src/include/utils/builtins.h
***************
*** 1187,1192 **** extern Datum pg_prepared_xact(PG_FUNCTION_ARGS);
--- 1187,1196 ----
  /* access/transam/multixact.c */
  extern Datum pg_get_multixact_members(PG_FUNCTION_ARGS);
  
+ /* access/transam/committs.c */
+ extern Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS);
+ extern Datum pg_last_committed_xact(PG_FUNCTION_ARGS);
+ 
  /* catalogs/dependency.c */
  extern Datum pg_describe_object(PG_FUNCTION_ARGS);
  extern Datum pg_identify_object(PG_FUNCTION_ARGS);
*** a/src/test/Makefile
--- b/src/test/Makefile
***************
*** 12,17 **** subdir = src/test
  top_builddir = ../..
  include $(top_builddir)/src/Makefile.global
  
! SUBDIRS = regress isolation
  
  $(recurse)
--- 12,17 ----
  top_builddir = ../..
  include $(top_builddir)/src/Makefile.global
  
! SUBDIRS = regress isolation modules
  
  $(recurse)
*** /dev/null
--- b/src/test/modules/Makefile
***************
*** 0 ****
--- 1,10 ----
+ # src/test/modules/Makefile
+ 
+ subdir = src/test/modules
+ top_builddir = ../../..
+ include $(top_builddir)/src/Makefile.global
+ 
+ SUBDIRS = \
+ 		  commit_ts
+ 
+ $(recurse)
*** /dev/null
--- b/src/test/modules/commit_ts/.gitignore
***************
*** 0 ****
--- 1,5 ----
+ # Generated subdirectories
+ /log/
+ /isolation_output/
+ /regression_output/
+ /tmp_check/
*** /dev/null
--- b/src/test/modules/commit_ts/Makefile
***************
*** 0 ****
--- 1,39 ----
+ # Note: because we don't tell the Makefile there are any regression tests,
+ # we have to clean those result files explicitly
+ EXTRA_CLEAN = $(pg_regress_clean_files) ./regression_output
+ 
+ subdir = src/test/modules/commit_ts
+ top_builddir = ../../../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+ 
+ # We can't support installcheck because normally installcheck users don't have
+ # the required track_commit_timestamp on
+ installcheck:;
+ 
+ check: regresscheck
+ 
+ submake-regress:
+ 	$(MAKE) -C $(top_builddir)/src/test/regress all
+ 
+ submake-test_commit_ts:
+ 	$(MAKE) -C $(top_builddir)/src/test/modules/commit_ts
+ 
+ REGRESSCHECKS=commit_timestamp
+ 
+ regresscheck: all | submake-regress submake-test_commit_ts
+ 	$(MKDIR_P) regression_output
+ 	$(pg_regress_check) \
+ 	    --temp-config $(top_srcdir)/src/test/modules/commit_ts/commit_ts.conf \
+ 	    --temp-install=./tmp_check \
+ 	    --extra-install=src/test/modules/commit_ts \
+ 	    --outputdir=./regression_output \
+ 	    $(REGRESSCHECKS)
+ 
+ regresscheck-install-force: | submake-regress submake-test_commit_ts
+ 	$(pg_regress_installcheck) \
+ 	    --extra-install=src/test/modules/commit_ts \
+ 	    $(REGRESSCHECKS)
+ 
+ PHONY: submake-test_commit_ts submake-regress check \
+ 	regresscheck regresscheck-install-force
*** /dev/null
--- b/src/test/modules/commit_ts/commit_ts.conf
***************
*** 0 ****
--- 1 ----
+ track_commit_timestamp = on
\ No newline at end of file
\ No newline at end of file
*** /dev/null
--- b/src/test/modules/commit_ts/expected/commit_timestamp.out
***************
*** 0 ****
--- 1,33 ----
+ --
+ -- Commit Timestamp
+ --
+ CREATE TABLE committs_test(id serial, ts timestamptz default now());
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ SELECT id,
+        pg_xact_commit_timestamp(xmin) >= ts,
+        pg_xact_commit_timestamp(xmin) < now(),
+        pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+ FROM committs_test
+ ORDER BY id;
+  id | ?column? | ?column? | ?column? 
+ ----+----------+----------+----------
+   1 | t        | t        | t
+   2 | t        | t        | t
+   3 | t        | t        | t
+ (3 rows)
+ 
+ DROP TABLE committs_test;
+ SELECT pg_xact_commit_timestamp('0'::xid);
+  pg_xact_commit_timestamp 
+ --------------------------
+  
+ (1 row)
+ 
+ SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp < now() FROM pg_last_committed_xact() x;
+  ?column? | ?column? | ?column? 
+ ----------+----------+----------
+  t        | t        | t
+ (1 row)
+ 
*** /dev/null
--- b/src/test/modules/commit_ts/sql/commit_timestamp.sql
***************
*** 0 ****
--- 1,21 ----
+ --
+ -- Commit Timestamp
+ --
+ CREATE TABLE committs_test(id serial, ts timestamptz default now());
+ 
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ 
+ SELECT id,
+        pg_xact_commit_timestamp(xmin) >= ts,
+        pg_xact_commit_timestamp(xmin) < now(),
+        pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+ FROM committs_test
+ ORDER BY id;
+ 
+ DROP TABLE committs_test;
+ 
+ SELECT pg_xact_commit_timestamp('0'::xid);
+ 
+ SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp < now() FROM pg_last_committed_xact() x;
*** /dev/null
--- b/src/test/regress/expected/commit_ts.out
***************
*** 0 ****
--- 1,28 ----
+ --
+ -- Commit Timestamp
+ --
+ SHOW track_commit_timestamp;
+  track_commit_timestamp 
+ ------------------------
+  off
+ (1 row)
+ 
+ CREATE TABLE committs_test(id serial, ts timestamptz default now());
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ SELECT id,
+        pg_xact_commit_timestamp(xmin) >= ts,
+        pg_xact_commit_timestamp(xmin) < now(),
+        pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+ FROM committs_test
+ ORDER BY id;
+ ERROR:  could not get commit timestamp data
+ HINT:   Make sure the configuration parameter "track_commit_timestamp" is set.
+ DROP TABLE committs_test;
+ SELECT pg_xact_commit_timestamp('0'::xid);
+ ERROR:  could not get commit timestamp data
+ HINT:   Make sure the configuration parameter "track_commit_timestamp" is set.
+ SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp < now() FROM pg_last_committed_xact() x;
+ ERROR:  could not get commit timestamp data
+ HINT:   Make sure the configuration parameter "track_commit_timestamp" is set.
*** /dev/null
--- b/src/test/regress/expected/commit_ts_1.out
***************
*** 0 ****
--- 1,39 ----
+ --
+ -- Commit Timestamp
+ --
+ SHOW track_commit_timestamp;
+  track_commit_timestamp 
+ ------------------------
+  on
+ (1 row)
+ 
+ CREATE TABLE committs_test(id serial, ts timestamptz default now());
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ SELECT id,
+        pg_xact_commit_timestamp(xmin) >= ts,
+        pg_xact_commit_timestamp(xmin) < now(),
+        pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+ FROM committs_test
+ ORDER BY id;
+  id | ?column? | ?column? | ?column? 
+ ----+----------+----------+----------
+   1 | t        | t        | t
+   2 | t        | t        | t
+   3 | t        | t        | t
+ (3 rows)
+ 
+ DROP TABLE committs_test;
+ SELECT pg_xact_commit_timestamp('0'::xid);
+  pg_xact_commit_timestamp 
+ --------------------------
+  
+ (1 row)
+ 
+ SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp < now() FROM pg_last_committed_xact() x;
+  ?column? | ?column? | ?column? 
+ ----------+----------+----------
+  t        | t        | t
+ (1 row)
+ 
*** a/src/test/regress/parallel_schedule
--- b/src/test/regress/parallel_schedule
***************
*** 88,94 **** test: brin gin gist spgist privileges security_label collate matview lock replic
  # ----------
  # Another group of parallel tests
  # ----------
! test: alter_generic misc psql async
  
  # rules cannot run concurrently with any test that creates a view
  test: rules
--- 88,94 ----
  # ----------
  # Another group of parallel tests
  # ----------
! test: alter_generic misc psql async commit_ts
  
  # rules cannot run concurrently with any test that creates a view
  test: rules
*** a/src/test/regress/serial_schedule
--- b/src/test/regress/serial_schedule
***************
*** 110,115 **** test: alter_generic
--- 110,116 ----
  test: misc
  test: psql
  test: async
+ test: commit_ts
  test: rules
  test: event_trigger
  test: select_views
*** /dev/null
--- b/src/test/regress/sql/commit_ts.sql
***************
*** 0 ****
--- 1,23 ----
+ --
+ -- Commit Timestamp
+ --
+ SHOW track_commit_timestamp;
+ 
+ CREATE TABLE committs_test(id serial, ts timestamptz default now());
+ 
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ INSERT INTO committs_test DEFAULT VALUES;
+ 
+ SELECT id,
+        pg_xact_commit_timestamp(xmin) >= ts,
+        pg_xact_commit_timestamp(xmin) < now(),
+        pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+ FROM committs_test
+ ORDER BY id;
+ 
+ DROP TABLE committs_test;
+ 
+ SELECT pg_xact_commit_timestamp('0'::xid);
+ 
+ SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp < now() FROM pg_last_committed_xact() x;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to