On 11/6/24 21:30, Zhijie Hou (Fujitsu) wrote:
Thanks for the patch! I am reading the patch and noticed few minor things.
1.
+ /*
+ * This is a local transaction. Make sure that the xact_time
+ * higher than any timestamp we have seen thus far.
+ *
+ * TODO: This is not postmaster restart safe. If the local
+ * system clock is further behind other nodes than it takes
+ * for the postmaster to restart (time between it stops
+ * accepting new transactions and time when it becomes ready
+ * to accept new transactions), local transactions will not
+ * be bumped into the future correctly.
+ */
The TODO section mentions other nodes, but I believe think patch currently do
not have the handling of timestamps for other nodes. Should we either remove
this part or add a brief explanation to clarify the relationship with other
nodes?
That TODO is actually obsolete. I understood from Amit Kapila that the
community does assume that NTP synchronization is good enough. And it
indeed is. Even my servers here at home are using a GPS based NTP server
connected to the LAN and are usually in sync by single-digit
microseconds. I removed it.
2.
+/*
+ * Hook function to be called while holding the WAL insert spinlock.
+ * to adjust commit timestamps via Lamport clock if needed.
+ */
The second line seems can be improved:
"to adjust commit timestamps .." => "It adjusts commit timestamps ..."
How about
/*
* Hook function to be called while holding the WAL insert spinlock.
* It guarantees that commit timestamps are advancing in LSN order.
*/
static void EnsureMonotonicTransactionStopTimestamp(void *data);
Thank you for looking at this and your input. New patch attached.
Best Regards, Jan
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b7ebcc2a55..0406dc44be 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -370,6 +370,12 @@ static void ShowTransactionStateRec(const char *str, TransactionState s);
static const char *BlockStateAsString(TBlockState blockState);
static const char *TransStateAsString(TransState state);
+/*
+ * Hook function to be called while holding the WAL insert spinlock.
+ * It guarantees that commit timestamps are advancing in LSN order.
+ */
+static void EnsureMonotonicTransactionStopTimestamp(void *data);
+
/* ----------------------------------------------------------------
* transaction state accessors
@@ -2214,6 +2220,13 @@ StartTransaction(void)
if (TransactionTimeout > 0)
enable_timeout_after(TRANSACTION_TIMEOUT, TransactionTimeout);
+ /*
+ * Reset XLogReserveInsertHook (function called while holding
+ * the WAL insert spinlock)
+ */
+ XLogReserveInsertHook = NULL;
+ XLogReserveInsertHookData = NULL;
+
ShowTransactionState("StartTransaction");
}
@@ -5831,6 +5844,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_twophase xl_twophase;
xl_xact_origin xl_origin;
uint8 info;
+ XLogRecPtr result;
Assert(CritSectionCount > 0);
@@ -5974,7 +5988,19 @@ XactLogCommitRecord(TimestampTz commit_time,
/* we allow filtering by xacts */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
- return XLogInsert(RM_XACT_ID, info);
+ /*
+ * Install our hook for the call to ReserveXLogInsertLocation() so that
+ * we can modify the xactStopTimestamp and the xact_time of the xlrec
+ * while holding the lock that determines the commit-LSN to ensure
+ * the commit timestamps are monotonically increasing.
+ */
+ XLogReserveInsertHook = EnsureMonotonicTransactionStopTimestamp;
+ XLogReserveInsertHookData = (void *)&xlrec;
+ result = XLogInsert(RM_XACT_ID, info);
+ XLogReserveInsertHook = NULL;
+ XLogReserveInsertHookData = NULL;
+
+ return result;
}
/*
@@ -6445,3 +6471,33 @@ xact_redo(XLogReaderState *record)
else
elog(PANIC, "xact_redo: unknown op code %u", info);
}
+
+/*
+ * Hook function used in XactLogCommitRecord() to ensure that the
+ * commit timestamp is monotonically increasing in commit-LSN order.
+ */
+static void
+EnsureMonotonicTransactionStopTimestamp(void *data)
+{
+ xl_xact_commit *xlrec = (xl_xact_commit *)data;
+ TimestampTz logical_clock;
+
+ logical_clock = XLogGetLastTransactionStopTimestamp();
+
+ /*
+ * This is a local transaction. Make sure that the xact_time
+ * higher than any timestamp we have seen thus far.
+ */
+ if (logical_clock >= xlrec->xact_time)
+ {
+ logical_clock++;
+ xlrec->xact_time = logical_clock;
+ xactStopTimestamp = logical_clock;
+
+ XLogReserveInsertHookModifiedRecord = true;
+ }
+ else
+ logical_clock = xlrec->xact_time;
+
+ XLogSetLastTransactionStopTimestamp(logical_clock);
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f58412bca..bb70a7de09 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -155,6 +155,16 @@ int wal_segment_size = DEFAULT_XLOG_SEG_SIZE;
*/
int CheckPointSegments;
+/*
+ * Hook to be called inside of ReserveXLogInsertLocation() for
+ * operations that need to be performed while holding the WAL
+ * insert spinlock. Currently this is used to guarantee a monotonically
+ * increasing commit-timestamp in LSN order.
+ */
+XLogReserveInsertHookType XLogReserveInsertHook = NULL;
+void *XLogReserveInsertHookData = NULL;
+bool XLogReserveInsertHookModifiedRecord = false;
+
/* Estimated distance between checkpoints, in bytes */
static double CheckPointDistanceEstimate = 0;
static double PrevCheckPointDistance = 0;
@@ -551,6 +561,14 @@ typedef struct XLogCtlData
XLogRecPtr lastFpwDisableRecPtr;
slock_t info_lck; /* locks shared variables shown above */
+
+ /*
+ * This is our shared, logical clock that we use to force
+ * commit timestamps to be monotonically increasing in
+ * commit-LSN order. This is protected by the Wal-insert
+ * spinlock.
+ */
+ TimestampTz lastTransactionStopTimestamp;
} XLogCtlData;
/*
@@ -700,6 +718,7 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
XLogRecData *rdata,
XLogRecPtr StartPos, XLogRecPtr EndPos,
TimeLineID tli);
+static void XLogRecordCorrectCRC(XLogRecData *rdata);
static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
@@ -778,6 +797,12 @@ XLogInsertRecord(XLogRecData *rdata,
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
+ /*
+ * Make sure the flag telling that ReserveXLog...() modified the
+ * record is false at this point.
+ */
+ XLogReserveInsertHookModifiedRecord = false;
+
/*
* Given that we're not in recovery, InsertTimeLineID is set and can't
* change, so we can read it without a lock.
@@ -906,6 +931,16 @@ XLogInsertRecord(XLogRecData *rdata,
if (inserted)
{
+ /*
+ * If our reserve hook modified the XLog Record,
+ * recalculate the CRC.
+ */
+ if (XLogReserveInsertHookModifiedRecord)
+ {
+ XLogRecordCorrectCRC(rdata);
+ XLogReserveInsertHookModifiedRecord = false;
+ }
+
/*
* Now that xl_prev has been filled in, calculate CRC of the record
* header.
@@ -1086,6 +1121,25 @@ XLogInsertRecord(XLogRecData *rdata,
return EndPos;
}
+/*
+ * Function to recalculate the WAL Record's CRC in case it was
+ * altered during the callback from ReserveXLogInsertLocation().
+ */
+static void
+XLogRecordCorrectCRC(XLogRecData *rdata)
+{
+ XLogRecData *rdt;
+ XLogRecord *rechdr = (XLogRecord *)rdata->data;
+ pg_crc32c rdata_crc;
+
+ INIT_CRC32C(rdata_crc);
+ COMP_CRC32C(rdata_crc, rdata->data + SizeOfXLogRecord, rdata->len - SizeOfXLogRecord);
+ for (rdt = rdata->next; rdt != NULL; rdt = rdt->next)
+ COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+
+ rechdr->xl_crc = rdata_crc;
+}
+
/*
* Reserves the right amount of space for a record of given size from the WAL.
* *StartPos is set to the beginning of the reserved section, *EndPos to
@@ -1130,6 +1184,12 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
*/
SpinLockAcquire(&Insert->insertpos_lck);
+ /*
+ * If set call the XLogReserveInsertHook function
+ */
+ if (XLogReserveInsertHook != NULL)
+ XLogReserveInsertHook(XLogReserveInsertHookData);
+
startbytepos = Insert->CurrBytePos;
endbytepos = startbytepos + size;
prevbytepos = Insert->PrevBytePos;
@@ -1189,6 +1249,12 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
return false;
}
+ /*
+ * If set call the XLogReserveInsertHook function
+ */
+ if (XLogReserveInsertHook != NULL)
+ XLogReserveInsertHook(XLogReserveInsertHookData);
+
endbytepos = startbytepos + size;
prevbytepos = Insert->PrevBytePos;
@@ -9510,3 +9576,19 @@ SetWalWriterSleeping(bool sleeping)
XLogCtl->WalWriterSleeping = sleeping;
SpinLockRelease(&XLogCtl->info_lck);
}
+
+/*
+ * Get/set the last-transaction-stop-timestamp in shared memory.
+ * Caller must ensure that the WAL-insert spinlock is held.
+ */
+extern TimestampTz
+XLogGetLastTransactionStopTimestamp(void)
+{
+ return XLogCtl->lastTransactionStopTimestamp;
+}
+
+extern void
+XLogSetLastTransactionStopTimestamp(TimestampTz ts)
+{
+ XLogCtl->lastTransactionStopTimestamp = ts;
+}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 34ad46c067..187d1dc9bc 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -199,6 +199,17 @@ typedef enum WALAvailability
struct XLogRecData;
struct XLogReaderState;
+/*
+ * Hook called from inside of holding the lock that determines
+ * the LSN order of WAL records. We currently use this to ensure that
+ * commit timestamps are monotonically increasing in their LSN
+ * order.
+ */
+typedef void (*XLogReserveInsertHookType)(void *data);
+extern XLogReserveInsertHookType XLogReserveInsertHook;
+extern void *XLogReserveInsertHookData;
+extern bool XLogReserveInsertHookModifiedRecord;
+
extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
XLogRecPtr fpw_lsn,
uint8 flags,
@@ -270,6 +281,13 @@ extern void SetInstallXLogFileSegmentActive(void);
extern bool IsInstallXLogFileSegmentActive(void);
extern void XLogShutdownWalRcv(void);
+/*
+ * Functions to access the last commit Lamport timestamp held in
+ * XLogCtl.
+ */
+extern TimestampTz XLogGetLastTransactionStopTimestamp(void);
+extern void XLogSetLastTransactionStopTimestamp(TimestampTz tz);
+
/*
* Routines to start, stop, and get status of a base backup.
*/