From 1edaedf40d85baad7c6e1d12adae4e5ccc72077e Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Fri, 8 Nov 2024 15:10:48 +0800
Subject: [PATCH v4] Make commit timestamp monotonic increase

---
 src/backend/access/transam/xact.c |  18 ++++-
 src/backend/access/transam/xlog.c | 129 ++++++++++++++++++++++++++++--
 src/include/access/xact.h         |   2 +
 3 files changed, 140 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b7ebcc2a55..0caaeae913 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -278,7 +278,7 @@ static bool currentCommandIdUsed;
  */
 static TimestampTz xactStartTimestamp;
 static TimestampTz stmtStartTimestamp;
-static TimestampTz xactStopTimestamp;
+TimestampTz xactStopTimestamp;
 
 /*
  * GID to be used for preparing the current transaction.  This is also
@@ -325,6 +325,7 @@ typedef struct SubXactCallbackItem
 
 static SubXactCallbackItem *SubXact_callbacks = NULL;
 
+xl_xact_commit *xlcommitrec = NULL;
 
 /* local function prototypes */
 static void AssignTransactionId(TransactionState s);
@@ -2214,6 +2215,9 @@ StartTransaction(void)
 	if (TransactionTimeout > 0)
 		enable_timeout_after(TRANSACTION_TIMEOUT, TransactionTimeout);
 
+	/* Reset xlcommitrec */
+	xlcommitrec = NULL;
+
 	ShowTransactionState("StartTransaction");
 }
 
@@ -5831,6 +5835,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_twophase xl_twophase;
 	xl_xact_origin xl_origin;
 	uint8		info;
+	XLogRecPtr	result;
 
 	Assert(CritSectionCount > 0);
 
@@ -5974,7 +5979,16 @@ XactLogCommitRecord(TimestampTz commit_time,
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	/*
+	 * Save the commit xlrec so that we can modify the xactStopTimestamp and
+	 * the xact_time of the xlrec while holding the lock that determines the
+	 * commit-LSN to ensure the commit timestamps are monotonically increasing.
+	 */
+	xlcommitrec = &xlrec;
+	result = XLogInsert(RM_XACT_ID, info);
+	xlcommitrec = NULL;
+
+	return result;
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f58412bca..8efad83df3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -155,6 +155,12 @@ int			wal_segment_size = DEFAULT_XLOG_SEG_SIZE;
  */
 int			CheckPointSegments;
 
+/*
+ * Whether the xact_time of xlrec is modified to ensure the commit timestamps
+ * are monotonically increasing.
+ */
+static bool	XlogRecordModified = false;
+
 /* Estimated distance between checkpoints, in bytes */
 static double CheckPointDistanceEstimate = 0;
 static double PrevCheckPointDistance = 0;
@@ -551,6 +557,14 @@ typedef struct XLogCtlData
 	XLogRecPtr	lastFpwDisableRecPtr;
 
 	slock_t		info_lck;		/* locks shared variables shown above */
+
+	/*
+	 * This is our shared, logical clock that we use to force
+	 * commit timestamps to be monotonically increasing in
+	 * commit-LSN order. This is protected by the Wal-insert
+	 * spinlock.
+	 */
+	TimestampTz lastXactStopTime;
 } XLogCtlData;
 
 /*
@@ -700,6 +714,7 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
 								XLogRecData *rdata,
 								XLogRecPtr StartPos, XLogRecPtr EndPos,
 								TimeLineID tli);
+static void XLogRecordCorrectCRC(XLogRecData *rdata);
 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
@@ -778,6 +793,12 @@ XLogInsertRecord(XLogRecData *rdata,
 	if (!XLogInsertAllowed())
 		elog(ERROR, "cannot make new WAL entries during recovery");
 
+	/*
+	 * Make sure the flag telling that ReserveXLog...() modified the record is
+	 * false at this point.
+	 */
+	XlogRecordModified = false;
+
 	/*
 	 * Given that we're not in recovery, InsertTimeLineID is set and can't
 	 * change, so we can read it without a lock.
@@ -906,6 +927,15 @@ XLogInsertRecord(XLogRecData *rdata,
 
 	if (inserted)
 	{
+		/*
+		 * If modified the XLog Record, recalculate the CRC.
+		 */
+		if (XlogRecordModified)
+		{
+			XLogRecordCorrectCRC(rdata);
+			XlogRecordModified = false;
+		}
+
 		/*
 		 * Now that xl_prev has been filled in, calculate CRC of the record
 		 * header.
@@ -1086,6 +1116,25 @@ XLogInsertRecord(XLogRecData *rdata,
 	return EndPos;
 }
 
+/*
+ * Function to recalculate the WAL Record's CRC in case it was altered to
+ * ensure a monotonically increasing commit timestamp in LSN order.
+ */
+static void
+XLogRecordCorrectCRC(XLogRecData *rdata)
+{
+	XLogRecData *rdt;
+	XLogRecord *rechdr = (XLogRecord *) rdata->data;
+	pg_crc32c	rdata_crc;
+
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, rdata->data + SizeOfXLogRecord, rdata->len - SizeOfXLogRecord);
+	for (rdt = rdata->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Reserves the right amount of space for a record of given size from the WAL.
  * *StartPos is set to the beginning of the reserved section, *EndPos to
@@ -1112,6 +1161,10 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 	uint64		startbytepos;
 	uint64		endbytepos;
 	uint64		prevbytepos;
+	TimestampTz	orgxacttime = 0;
+
+	if (xlcommitrec)
+		orgxacttime = xlcommitrec->xact_time;
 
 	size = MAXALIGN(size);
 
@@ -1127,21 +1180,56 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 	 * positions (XLogRecPtrs) can be done outside the locked region, and
 	 * because the usable byte position doesn't include any headers, reserving
 	 * X bytes from WAL is almost as simple as "CurrBytePos += X".
+	 *
+	 * Note that for the commit record, we need to ensure that the commit
+	 * timestamp is monotonically increased in the commit-LSN order to avoid
+	 * inconsistency between the two.
 	 */
-	SpinLockAcquire(&Insert->insertpos_lck);
+	if (xlcommitrec)
+	{
+		SpinLockAcquire(&Insert->insertpos_lck);
 
-	startbytepos = Insert->CurrBytePos;
-	endbytepos = startbytepos + size;
-	prevbytepos = Insert->PrevBytePos;
-	Insert->CurrBytePos = endbytepos;
-	Insert->PrevBytePos = startbytepos;
+		/*
+		 * This is a local transaction. Make sure that the xact_time higher
+		 * than any timestamp we have seen thus far.
+		 */
+		if (unlikely(XLogCtl->lastXactStopTime >= xlcommitrec->xact_time))
+		{
+			XLogCtl->lastXactStopTime++;
+			xlcommitrec->xact_time = XLogCtl->lastXactStopTime;
+			xactStopTimestamp = XLogCtl->lastXactStopTime;
+		}
+		else
+			XLogCtl->lastXactStopTime = xlcommitrec->xact_time;
 
-	SpinLockRelease(&Insert->insertpos_lck);
+		startbytepos = Insert->CurrBytePos;
+		endbytepos = startbytepos + size;
+		prevbytepos = Insert->PrevBytePos;
+		Insert->CurrBytePos = endbytepos;
+		Insert->PrevBytePos = startbytepos;
+
+		SpinLockRelease(&Insert->insertpos_lck);
+	}
+	else
+	{
+		SpinLockAcquire(&Insert->insertpos_lck);
+
+		startbytepos = Insert->CurrBytePos;
+		endbytepos = startbytepos + size;
+		prevbytepos = Insert->PrevBytePos;
+		Insert->CurrBytePos = endbytepos;
+		Insert->PrevBytePos = startbytepos;
+
+		SpinLockRelease(&Insert->insertpos_lck);
+	}
 
 	*StartPos = XLogBytePosToRecPtr(startbytepos);
 	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
 	*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
+	if (xlcommitrec && orgxacttime != xlcommitrec->xact_time)
+		XlogRecordModified = true;
+
 	/*
 	 * Check that the conversions between "usable byte positions" and
 	 * XLogRecPtrs work consistently in both directions.
@@ -1170,12 +1258,20 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 	uint32		size = MAXALIGN(SizeOfXLogRecord);
 	XLogRecPtr	ptr;
 	uint32		segleft;
+	TimestampTz	orgxacttime = 0;
+
+	if (xlcommitrec)
+		orgxacttime = xlcommitrec->xact_time;
 
 	/*
 	 * These calculations are a bit heavy-weight to be done while holding a
 	 * spinlock, but since we're holding all the WAL insertion locks, there
 	 * are no other inserters competing for it. GetXLogInsertRecPtr() does
 	 * compete for it, but that's not called very frequently.
+	 *
+	 * Note that for the commit record, we need to ensure that the commit
+	 * timestamp is monotonically increased in the commit-LSN order to avoid
+	 * inconsistency between the two.
 	 */
 	SpinLockAcquire(&Insert->insertpos_lck);
 
@@ -1189,6 +1285,22 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 		return false;
 	}
 
+	if (xlcommitrec)
+	{
+		/*
+		 * This is a local transaction. Make sure that the xact_time higher
+		 * than any timestamp we have seen thus far.
+		 */
+		if (unlikely(XLogCtl->lastXactStopTime >= xlcommitrec->xact_time))
+		{
+			XLogCtl->lastXactStopTime++;
+			xlcommitrec->xact_time = XLogCtl->lastXactStopTime;
+			xactStopTimestamp = XLogCtl->lastXactStopTime;
+		}
+		else
+			XLogCtl->lastXactStopTime = xlcommitrec->xact_time;
+	}
+
 	endbytepos = startbytepos + size;
 	prevbytepos = Insert->PrevBytePos;
 
@@ -1209,6 +1321,9 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 
 	*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
+	if (xlcommitrec && orgxacttime != xlcommitrec->xact_time)
+		XlogRecordModified = true;
+
 	Assert(XLogSegmentOffset(*EndPos, wal_segment_size) == 0);
 	Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
 	Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index fb64d7413a..43e4b5aa18 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -431,6 +431,8 @@ typedef struct xl_xact_parsed_abort
 	TimestampTz origin_timestamp;
 } xl_xact_parsed_abort;
 
+extern PGDLLIMPORT xl_xact_commit *xlcommitrec;
+extern PGDLLIMPORT TimestampTz xactStopTimestamp;
 
 /* ----------------
  *		extern definitions
-- 
2.30.0.windows.2

