So I tried this, but -- perhaps not suprisingly -- I can't get it to work properly; the synchronization fails. I suspect I need some barriers, but I tried adding a few (probably some that are not really necessary) and that didn't have the expected effect. Strangely, all tests work for me, but the pg_upgrade one in particular fails.
(The attached is of course POC quality at best.) I'll have another look next week. -- Álvaro Herrera 39°49'30"S 73°17'W
>From 90507185357391a661cd856fc231b140079c8d78 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 29 Jan 2021 22:34:23 -0300 Subject: [PATCH v2 1/3] add pg_atomic_monotonic_advance_u64 --- src/include/port/atomics.h | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index 856338f161..752f39d66e 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -519,6 +519,25 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +static inline void +pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_) +{ +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + /* FIXME is this algorithm correct if we have u64 simulation? */ + while (true) + { + uint64 currval; + + currval = pg_atomic_read_u64(ptr); + if (currval > target_) + break; /* already done by somebody else */ + if (pg_atomic_compare_exchange_u64(ptr, &currval, target_)) + break; /* successfully done */ + } +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ -- 2.20.1
>From 84a62ac0594f8f0b13c06958cf6e94e2a2723082 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 29 Jan 2021 22:34:04 -0300 Subject: [PATCH v2 2/3] atomics in xlog.c --- src/backend/access/transam/xlog.c | 129 ++++++++++++++---------------- 1 file changed, 61 insertions(+), 68 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index f03bd473e2..8073a92ceb 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -413,7 +413,8 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; * which is updated when convenient. * * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst - * (protected by info_lck), but we don't need to cache any copies of it. + * (which use atomic access, so no locks are needed), but we don't need to + * cache any copies of it. * * info_lck is only held long enough to read/update the protected variables, * so it's a plain spinlock. The other locks are held longer (potentially @@ -433,17 +434,19 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; *---------- */ -typedef struct XLogwrtRqst +/* Write/Flush position to be used in shared memory */ +typedef struct XLogwrtAtomic { - XLogRecPtr Write; /* last byte + 1 to write out */ - XLogRecPtr Flush; /* last byte + 1 to flush */ -} XLogwrtRqst; + pg_atomic_uint64 Write; /* last byte + 1 of write position */ + pg_atomic_uint64 Flush; /* last byte + 1 of flush position */ +} XLogwrtAtomic; -typedef struct XLogwrtResult +/* Write/Flush position to be used in process-local memory */ +typedef struct XLogwrt { XLogRecPtr Write; /* last byte + 1 written out */ XLogRecPtr Flush; /* last byte + 1 flushed */ -} XLogwrtResult; +} XLogwrt; /* * Inserting to WAL is protected by a small fixed number of WAL insertion @@ -596,8 +599,10 @@ typedef struct XLogCtlData { XLogCtlInsert Insert; + XLogwrtAtomic LogwrtRqst; + XLogwrtAtomic LogwrtResult; + /* Protected by info_lck: */ - XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ FullTransactionId ckptFullXid; /* nextXid of latest checkpoint */ XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ @@ -613,12 +618,6 @@ typedef struct XLogCtlData pg_time_t lastSegSwitchTime; XLogRecPtr lastSegSwitchLSN; - /* - * Protected by info_lck and WALWriteLock (you must hold either lock to - * read it, but both to update) - */ - XLogwrtResult LogwrtResult; - /* * Latest initialized page in the cache (last byte position + 1). * @@ -779,7 +778,7 @@ static int UsableBytesInSegment; * Private, possibly out-of-date copy of shared LogwrtResult. * See discussion above. */ -static XLogwrtResult LogwrtResult = {0, 0}; +static XLogwrt LogwrtResult = {0, 0}; /* * Codes indicating where we got a WAL file from during recovery, or where @@ -910,7 +909,7 @@ static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic); static bool XLogCheckpointNeeded(XLogSegNo new_segno); -static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); +static void XLogWrite(XLogwrt WriteRqst, bool flexible); static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, bool find_free, XLogSegNo max_segno, bool use_lock); @@ -1168,13 +1167,11 @@ XLogInsertRecord(XLogRecData *rdata, */ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) { - SpinLockAcquire(&XLogCtl->info_lck); /* advance global request to include new block(s) */ - if (XLogCtl->LogwrtRqst.Write < EndPos) - XLogCtl->LogwrtRqst.Write = EndPos; - /* update local result copy while I have the chance */ - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, EndPos); + /* update local result copy */ + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); } /* @@ -2135,7 +2132,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) XLogCtlInsert *Insert = &XLogCtl->Insert; int nextidx; XLogRecPtr OldPageRqstPtr; - XLogwrtRqst WriteRqst; + XLogwrt WriteRqst; XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; XLogRecPtr NewPageBeginPtr; XLogPageHeader NewPage; @@ -2166,12 +2163,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) if (opportunistic) break; - /* Before waiting, get info_lck and update LogwrtResult */ - SpinLockAcquire(&XLogCtl->info_lck); - if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) - XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + /* Before waiting, update LogwrtResult */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, OldPageRqstPtr); + + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* * Now that we have an up-to-date LogwrtResult value, see if we @@ -2191,7 +2187,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (LogwrtResult.Write >= OldPageRqstPtr) { /* OK, someone wrote it already */ @@ -2421,7 +2418,7 @@ XLogCheckpointNeeded(XLogSegNo new_segno) * write. */ static void -XLogWrite(XLogwrtRqst WriteRqst, bool flexible) +XLogWrite(XLogwrt WriteRqst, bool flexible) { bool ispartialpage; bool last_iteration; @@ -2438,7 +2435,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) /* * Update local LogwrtResult (caller probably did this already, but...) */ - LogwrtResult = XLogCtl->LogwrtResult; + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* * Since successive pages in the xlog cache are consecutively allocated, @@ -2675,13 +2673,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) * code in a couple of places. */ { - SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->LogwrtResult = LogwrtResult; - if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) - XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; - if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) - XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; - SpinLockRelease(&XLogCtl->info_lck); + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write); + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush); + + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, LogwrtResult.Write); + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush); } } @@ -2696,8 +2692,9 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr WriteRqstPtr = asyncXactLSN; bool sleeping; + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; sleeping = XLogCtl->WalWriterSleeping; if (XLogCtl->asyncXactLSN < asyncXactLSN) XLogCtl->asyncXactLSN = asyncXactLSN; @@ -2856,7 +2853,7 @@ void XLogFlush(XLogRecPtr record) { XLogRecPtr WriteRqstPtr; - XLogwrtRqst WriteRqst; + XLogwrt WriteRqst; /* * During REDO, we are reading not writing WAL. Therefore, instead of @@ -2905,11 +2902,9 @@ XLogFlush(XLogRecPtr record) XLogRecPtr insertpos; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) - WriteRqstPtr = XLogCtl->LogwrtRqst.Write; - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + WriteRqstPtr = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write); + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* done already? */ if (record <= LogwrtResult.Flush) @@ -2939,7 +2934,8 @@ XLogFlush(XLogRecPtr record) } /* Got the lock; recheck whether request is satisfied */ - LogwrtResult = XLogCtl->LogwrtResult; + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (record <= LogwrtResult.Flush) { LWLockRelease(WALWriteLock); @@ -3044,7 +3040,7 @@ XLogFlush(XLogRecPtr record) bool XLogBackgroundFlush(void) { - XLogwrtRqst WriteRqst; + XLogwrt WriteRqst; bool flexible = true; static TimestampTz lastflush; TimestampTz now; @@ -3055,10 +3051,10 @@ XLogBackgroundFlush(void) return false; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - WriteRqst = XLogCtl->LogwrtRqst; - SpinLockRelease(&XLogCtl->info_lck); + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + WriteRqst.Write = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write); + WriteRqst.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Flush); /* back off to last completed page boundary */ WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; @@ -3140,7 +3136,8 @@ XLogBackgroundFlush(void) /* now wait for any in-progress insertions to finish and get write lock */ WaitXLogInsertionsToFinish(WriteRqst.Write); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (WriteRqst.Write > LogwrtResult.Write || WriteRqst.Flush > LogwrtResult.Flush) { @@ -3228,9 +3225,8 @@ XLogNeedsFlush(XLogRecPtr record) return false; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* check again */ if (record <= LogwrtResult.Flush) @@ -7779,10 +7775,11 @@ StartupXLOG(void) LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; - XLogCtl->LogwrtResult = LogwrtResult; + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write); + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush); - XLogCtl->LogwrtRqst.Write = EndOfLog; - XLogCtl->LogwrtRqst.Flush = EndOfLog; + pg_atomic_write_u64(&XLogCtl->LogwrtRqst.Write, EndOfLog); + pg_atomic_write_u64(&XLogCtl->LogwrtRqst.Flush, EndOfLog); /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE @@ -8484,9 +8481,7 @@ GetInsertRecPtr(void) { XLogRecPtr recptr; - SpinLockAcquire(&XLogCtl->info_lck); - recptr = XLogCtl->LogwrtRqst.Write; - SpinLockRelease(&XLogCtl->info_lck); + recptr = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write); return recptr; } @@ -8498,9 +8493,8 @@ GetInsertRecPtr(void) XLogRecPtr GetFlushRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); return LogwrtResult.Flush; } @@ -11649,9 +11643,8 @@ GetXLogInsertRecPtr(void) XLogRecPtr GetXLogWriteRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); return LogwrtResult.Write; } -- 2.20.1
>From b487979a362f90b866a86b1600791178b5574a75 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Fri, 29 Jan 2021 22:34:58 -0300 Subject: [PATCH v2 3/3] add barriers --- src/backend/access/transam/xlog.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8073a92ceb..458efc5a55 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1169,6 +1169,8 @@ XLogInsertRecord(XLogRecData *rdata, { /* advance global request to include new block(s) */ pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, EndPos); + pg_memory_barrier(); + /* update local result copy */ LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); @@ -2165,6 +2167,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) /* Before waiting, update LogwrtResult */ pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, OldPageRqstPtr); + pg_memory_barrier(); LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); @@ -2678,6 +2681,7 @@ XLogWrite(XLogwrt WriteRqst, bool flexible) pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, LogwrtResult.Write); pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush); + pg_write_barrier(); } } @@ -2905,6 +2909,7 @@ XLogFlush(XLogRecPtr record) WriteRqstPtr = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write); LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + pg_read_barrier(); /* done already? */ if (record <= LogwrtResult.Flush) @@ -2936,6 +2941,7 @@ XLogFlush(XLogRecPtr record) /* Got the lock; recheck whether request is satisfied */ LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + pg_read_barrier(); if (record <= LogwrtResult.Flush) { LWLockRelease(WALWriteLock); @@ -3055,6 +3061,7 @@ XLogBackgroundFlush(void) LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); WriteRqst.Write = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write); WriteRqst.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Flush); + pg_read_barrier(); /* back off to last completed page boundary */ WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; @@ -3138,6 +3145,7 @@ XLogBackgroundFlush(void) LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + pg_read_barrier(); if (WriteRqst.Write > LogwrtResult.Write || WriteRqst.Flush > LogwrtResult.Flush) { @@ -3227,6 +3235,7 @@ XLogNeedsFlush(XLogRecPtr record) /* read LogwrtResult and update local state */ LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + pg_read_barrier(); /* check again */ if (record <= LogwrtResult.Flush) @@ -7780,6 +7789,7 @@ StartupXLOG(void) pg_atomic_write_u64(&XLogCtl->LogwrtRqst.Write, EndOfLog); pg_atomic_write_u64(&XLogCtl->LogwrtRqst.Flush, EndOfLog); + pg_write_barrier(); /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE -- 2.20.1