There was an earlier comment by Andres that asyncXactLSN should also be atomic, to avoid an ugly spinlock interaction with the new atomic-based logwrtResult. The 0002 here is an attempt at doing that; I found that it also needed to change WalWriterSleeping to use atomics, to avoid XLogSetAsyncXactLSN having to grab the spinlock for that.
-- Álvaro Herrera Valdivia, Chile — https://www.EnterpriseDB.com/ "Learn about compilers. Then everything looks like either a compiler or a database, and now you have two problems but one of them is fun." https://twitter.com/thingskatedid/status/1456027786158776329
>From 0d48422df1a0e738721bfba5cd2d3f1ede5db423 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 2 Feb 2021 14:03:43 -0300 Subject: [PATCH v5 1/2] make LogwrtResult atomic --- src/backend/access/transam/xlog.c | 192 +++++++++++++++--------------- src/include/port/atomics.h | 29 +++++ src/tools/pgindent/typedefs.list | 1 + 3 files changed, 123 insertions(+), 99 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 221e4cb34f..225ac8bc2d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -382,16 +382,14 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; * * LogwrtRqst indicates a byte position that we need to write and/or fsync * the log up to (all records before that point must be written or fsynced). - * LogwrtResult indicates the byte positions we have already written/fsynced. + * LogWriteResult, LogFlushResult indicate the byte positions we have already + * written/fsynced. * These structs are identical but are declared separately to indicate their * slightly different functions. * - * To read XLogCtl->LogwrtResult, you must hold either info_lck or - * WALWriteLock. To update it, you need to hold both locks. The point of - * this arrangement is that the value can be examined by code that already - * holds WALWriteLock without needing to grab info_lck as well. In addition - * to the shared variable, each backend has a private copy of LogwrtResult, - * which is updated when convenient. + * XLogCtl->LogwrtResult is read and written using atomic operations. + * In addition to the shared variable, each backend has a private copy of + * LogwrtResult, each member of which is separately updated when convenient. * * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst * (protected by info_lck), but we don't need to cache any copies of it. @@ -414,18 +412,18 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; *---------- */ +typedef struct XLogwrtAtomic +{ + pg_atomic_uint64 Write; /* last byte + 1 of write position */ + pg_atomic_uint64 Flush; /* last byte + 1 of flush position */ +} XLogwrtAtomic; + typedef struct XLogwrtRqst { XLogRecPtr Write; /* last byte + 1 to write out */ XLogRecPtr Flush; /* last byte + 1 to flush */ } XLogwrtRqst; -typedef struct XLogwrtResult -{ - XLogRecPtr Write; /* last byte + 1 written out */ - XLogRecPtr Flush; /* last byte + 1 flushed */ -} XLogwrtResult; - /* * Inserting to WAL is protected by a small fixed number of WAL insertion * locks. To insert to the WAL, you must hold one of the locks - it doesn't @@ -577,6 +575,7 @@ typedef struct XLogCtlData { XLogCtlInsert Insert; + XLogwrtAtomic LogwrtResult; /* uses atomics */ /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ @@ -594,12 +593,6 @@ typedef struct XLogCtlData pg_time_t lastSegSwitchTime; XLogRecPtr lastSegSwitchLSN; - /* - * Protected by info_lck and WALWriteLock (you must hold either lock to - * read it, but both to update) - */ - XLogwrtResult LogwrtResult; - /* * Latest initialized page in the cache (last byte position + 1). * @@ -770,10 +763,11 @@ static ControlFileData *ControlFile = NULL; static int UsableBytesInSegment; /* - * Private, possibly out-of-date copy of shared LogwrtResult. + * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult. * See discussion above. */ -static XLogwrtResult LogwrtResult = {0, 0}; +static XLogRecPtr LogWriteResult = 0; +static XLogRecPtr LogFlushResult = 0; /* * Codes indicating where we got a WAL file from during recovery, or where @@ -1201,8 +1195,6 @@ XLogInsertRecord(XLogRecData *rdata, /* advance global request to include new block(s) */ if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; - /* update local result copy while I have the chance */ - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); } @@ -2174,6 +2166,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * Now that we have the lock, check if someone initialized the page * already. */ + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); while (upto >= XLogCtl->InitializedUpTo || opportunistic) { nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); @@ -2184,7 +2177,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) * already written out. */ OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; - if (LogwrtResult.Write < OldPageRqstPtr) + if (LogWriteResult < OldPageRqstPtr) { /* * Nope, got work to do. If we just want to pre-initialize as much @@ -2193,18 +2186,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) if (opportunistic) break; - /* Before waiting, get info_lck and update LogwrtResult */ + /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); /* - * Now that we have an up-to-date LogwrtResult value, see if we - * still need to write it or if someone else already did. + * Before waiting, update LogWriteResult and see if we still need + * to write it or if someone else already did. */ - if (LogwrtResult.Write < OldPageRqstPtr) + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + if (LogWriteResult < OldPageRqstPtr) { /* * Must acquire write lock. Release WALBufMappingLock first, @@ -2218,8 +2211,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; - if (LogwrtResult.Write >= OldPageRqstPtr) + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + if (LogWriteResult >= OldPageRqstPtr) { /* OK, someone wrote it already */ LWLockRelease(WALWriteLock); @@ -2476,7 +2469,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* * Update local LogwrtResult (caller probably did this already, but...) */ - LogwrtResult = XLogCtl->LogwrtResult; + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); /* * Since successive pages in the xlog cache are consecutively allocated, @@ -2496,9 +2489,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) * consider writing. Begin at the buffer containing the next unwritten * page, or last partially written page. */ - curridx = XLogRecPtrToBufIdx(LogwrtResult.Write); + curridx = XLogRecPtrToBufIdx(LogWriteResult); - while (LogwrtResult.Write < WriteRqst.Write) + while (LogWriteResult < WriteRqst.Write) { /* * Make sure we're not ahead of the insert process. This could happen @@ -2507,16 +2500,16 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) */ XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx]; - if (LogwrtResult.Write >= EndPtr) + if (LogWriteResult >= EndPtr) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", - LSN_FORMAT_ARGS(LogwrtResult.Write), + LSN_FORMAT_ARGS(LogWriteResult), LSN_FORMAT_ARGS(EndPtr)); - /* Advance LogwrtResult.Write to end of current buffer page */ - LogwrtResult.Write = EndPtr; - ispartialpage = WriteRqst.Write < LogwrtResult.Write; + /* Advance LogWriteResult to end of current buffer page */ + LogWriteResult = EndPtr; + ispartialpage = WriteRqst.Write < LogWriteResult; - if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, + if (!XLByteInPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size)) { /* @@ -2526,7 +2519,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) Assert(npages == 0); if (openLogFile >= 0) XLogFileClose(); - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, + XLByteToPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size); openLogTLI = tli; @@ -2538,7 +2531,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* Make sure we have the current logfile open */ if (openLogFile < 0) { - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, + XLByteToPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size); openLogTLI = tli; openLogFile = XLogFileOpen(openLogSegNo, tli); @@ -2550,7 +2543,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) { /* first of group */ startidx = curridx; - startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ, + startoffset = XLogSegmentOffset(LogWriteResult - XLOG_BLCKSZ, wal_segment_size); } npages++; @@ -2561,7 +2554,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) * contiguous in memory), or if we are at the end of the logfile * segment. */ - last_iteration = WriteRqst.Write <= LogwrtResult.Write; + last_iteration = WriteRqst.Write <= LogWriteResult; finishing_seg = !ispartialpage && (startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size; @@ -2652,13 +2645,13 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* signal that we need to wakeup walsenders later */ WalSndWakeupRequest(); - LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ + LogFlushResult = LogWriteResult; /* end of page */ if (XLogArchivingActive()) XLogArchiveNotifySeg(openLogSegNo, tli); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); - XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; + XLogCtl->lastSegSwitchLSN = LogFlushResult; /* * Request a checkpoint if we've consumed too much xlog since @@ -2679,7 +2672,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) if (ispartialpage) { /* Only asked to write a partial page */ - LogwrtResult.Write = WriteRqst.Write; + LogWriteResult = WriteRqst.Write; break; } curridx = NextBufIdx(curridx); @@ -2691,12 +2684,14 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) Assert(npages == 0); + /* Publish current write result position */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogWriteResult); + /* * If asked to flush, do so */ - if (LogwrtResult.Flush < WriteRqst.Flush && - LogwrtResult.Flush < LogwrtResult.Write) - + if (LogFlushResult < WriteRqst.Flush && + LogFlushResult < LogWriteResult) { /* * Could get here without iterating above loop, in which case we might @@ -2707,12 +2702,12 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) sync_method != SYNC_METHOD_OPEN_DSYNC) { if (openLogFile >= 0 && - !XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, + !XLByteInPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size)) XLogFileClose(); if (openLogFile < 0) { - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, + XLByteToPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size); openLogTLI = tli; openLogFile = XLogFileOpen(openLogSegNo, tli); @@ -2725,23 +2720,23 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) /* signal that we need to wakeup walsenders later */ WalSndWakeupRequest(); - LogwrtResult.Flush = LogwrtResult.Write; + LogFlushResult = LogWriteResult; } + /* Publish current flush result position */ + pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogFlushResult); + /* - * Update shared-memory status - * - * We make sure that the shared 'request' values do not fall behind the + * Make sure that the shared 'request' values do not fall behind the * 'result' values. This is not absolutely essential, but it saves some * code in a couple of places. */ { SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->LogwrtResult = LogwrtResult; - if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) - XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; - if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) - XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; + if (XLogCtl->LogwrtRqst.Write < LogWriteResult) + XLogCtl->LogwrtRqst.Write = LogWriteResult; + if (XLogCtl->LogwrtRqst.Flush < LogFlushResult) + XLogCtl->LogwrtRqst.Flush = LogFlushResult; SpinLockRelease(&XLogCtl->info_lck); } } @@ -2757,8 +2752,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr WriteRqstPtr = asyncXactLSN; bool sleeping; + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; sleeping = XLogCtl->WalWriterSleeping; if (XLogCtl->asyncXactLSN < asyncXactLSN) XLogCtl->asyncXactLSN = asyncXactLSN; @@ -2775,7 +2770,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ; /* if we have already flushed that far, we're done */ - if (WriteRqstPtr <= LogwrtResult.Flush) + if (WriteRqstPtr <= LogFlushResult) return; } @@ -2931,15 +2926,15 @@ XLogFlush(XLogRecPtr record) } /* Quick exit if already known flushed */ - if (record <= LogwrtResult.Flush) + if (record <= LogFlushResult) return; #ifdef WAL_DEBUG if (XLOG_DEBUG) elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X", LSN_FORMAT_ARGS(record), - LSN_FORMAT_ARGS(LogwrtResult.Write), - LSN_FORMAT_ARGS(LogwrtResult.Flush)); + LSN_FORMAT_ARGS(LogWriteResult), + LSN_FORMAT_ARGS(LogFlushResult)); #endif START_CRIT_SECTION(); @@ -2948,8 +2943,8 @@ XLogFlush(XLogRecPtr record) * Since fsync is usually a horribly expensive operation, we try to * piggyback as much data as we can on each fsync: if we see any more data * entered into the xlog buffer, we'll write and fsync that too, so that - * the final value of LogwrtResult.Flush is as large as possible. This - * gives us some chance of avoiding another fsync immediately after. + * the final value of LogFlushResult is as large as possible. This gives + * us some chance of avoiding another fsync immediately after. */ /* initialize to given target; may increase below */ @@ -2967,11 +2962,11 @@ XLogFlush(XLogRecPtr record) SpinLockAcquire(&XLogCtl->info_lck); if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) WriteRqstPtr = XLogCtl->LogwrtRqst.Write; - LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); /* done already? */ - if (record <= LogwrtResult.Flush) + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + if (record <= LogFlushResult) break; /* @@ -2998,8 +2993,8 @@ XLogFlush(XLogRecPtr record) } /* Got the lock; recheck whether request is satisfied */ - LogwrtResult = XLogCtl->LogwrtResult; - if (record <= LogwrtResult.Flush) + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + if (record <= LogFlushResult) { LWLockRelease(WALWriteLock); break; @@ -3069,11 +3064,11 @@ XLogFlush(XLogRecPtr record) * calls from bufmgr.c are not within critical sections and so we will not * force a restart for a bad LSN on a data page. */ - if (LogwrtResult.Flush < record) + if (LogFlushResult < record) elog(ERROR, "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X", LSN_FORMAT_ARGS(record), - LSN_FORMAT_ARGS(LogwrtResult.Flush)); + LSN_FORMAT_ARGS(LogFlushResult)); } /* @@ -3122,7 +3117,6 @@ XLogBackgroundFlush(void) /* read LogwrtResult and update local state */ SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; WriteRqst = XLogCtl->LogwrtRqst; SpinLockRelease(&XLogCtl->info_lck); @@ -3130,8 +3124,10 @@ XLogBackgroundFlush(void) WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; /* if we have already flushed that far, consider async commit records */ - if (WriteRqst.Write <= LogwrtResult.Flush) + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + if (WriteRqst.Write <= LogFlushResult) { + pg_memory_barrier(); SpinLockAcquire(&XLogCtl->info_lck); WriteRqst.Write = XLogCtl->asyncXactLSN; SpinLockRelease(&XLogCtl->info_lck); @@ -3143,11 +3139,12 @@ XLogBackgroundFlush(void) * holding an open file handle to a logfile that's no longer in use, * preventing the file from being deleted. */ - if (WriteRqst.Write <= LogwrtResult.Flush) + if (WriteRqst.Write <= LogFlushResult) { if (openLogFile >= 0) { - if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + if (!XLByteInPrevSeg(LogWriteResult, openLogSegNo, wal_segment_size)) { XLogFileClose(); @@ -3162,7 +3159,7 @@ XLogBackgroundFlush(void) */ now = GetCurrentTimestamp(); flushbytes = - WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ; + WriteRqst.Write / XLOG_BLCKSZ - LogFlushResult / XLOG_BLCKSZ; if (WalWriterFlushAfter == 0 || lastflush == 0) { @@ -3197,8 +3194,8 @@ XLogBackgroundFlush(void) elog(LOG, "xlog bg flush request write %X/%X; flush: %X/%X, current is write %X/%X; flush %X/%X", LSN_FORMAT_ARGS(WriteRqst.Write), LSN_FORMAT_ARGS(WriteRqst.Flush), - LSN_FORMAT_ARGS(LogwrtResult.Write), - LSN_FORMAT_ARGS(LogwrtResult.Flush)); + LSN_FORMAT_ARGS(LogWriteResult), + LSN_FORMAT_ARGS(LogFlushResult)); #endif START_CRIT_SECTION(); @@ -3206,9 +3203,10 @@ XLogBackgroundFlush(void) /* now wait for any in-progress insertions to finish and get write lock */ WaitXLogInsertionsToFinish(WriteRqst.Write); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; - if (WriteRqst.Write > LogwrtResult.Write || - WriteRqst.Flush > LogwrtResult.Flush) + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + if (WriteRqst.Write > LogWriteResult || + WriteRqst.Flush > LogFlushResult) { XLogWrite(WriteRqst, insertTLI, flexible); } @@ -3290,16 +3288,14 @@ XLogNeedsFlush(XLogRecPtr record) } /* Quick exit if already known flushed */ - if (record <= LogwrtResult.Flush) + if (record <= LogFlushResult) return false; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* check again */ - if (record <= LogwrtResult.Flush) + if (record <= LogFlushResult) return false; return true; @@ -8086,9 +8082,11 @@ StartupXLOG(void) XLogCtl->InitializedUpTo = EndOfLog; } - LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + LogWriteResult = LogFlushResult = EndOfLog; - XLogCtl->LogwrtResult = LogwrtResult; + /* XXX OK to write without WALWriteLock? */ + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog); + pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; @@ -8750,9 +8748,7 @@ GetFlushRecPtr(TimeLineID *insertTLI) { Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); /* * If we're writing and flushing WAL, the time line can't be changing, @@ -8761,7 +8757,7 @@ GetFlushRecPtr(TimeLineID *insertTLI) if (insertTLI) *insertTLI = XLogCtl->InsertTimeLineID; - return LogwrtResult.Flush; + return LogFlushResult; } /* @@ -12041,11 +12037,9 @@ GetXLogInsertRecPtr(void) XLogRecPtr GetXLogWriteRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); + LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); - return LogwrtResult.Write; + return LogWriteResult; } /* diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h index 856338f161..4ff9ce9864 100644 --- a/src/include/port/atomics.h +++ b/src/include/port/atomics.h @@ -519,6 +519,35 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_) return pg_atomic_sub_fetch_u64_impl(ptr, sub_); } +/* + * Monotonically advance the given variable using only atomic operations until + * it's at least the target value. + * + * Full barrier semantics (even when value is unchanged). + */ +static inline void +pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_) +{ + uint64 currval; + +#ifndef PG_HAVE_ATOMIC_U64_SIMULATION + AssertPointerAlignment(ptr, 8); +#endif + + currval = pg_atomic_read_u64(ptr); + if (currval >= target_) + { + pg_memory_barrier(); + return; + } + + while (currval < target_) + { + if (pg_atomic_compare_exchange_u64(ptr, &currval, target_)) + break; + } +} + #undef INSIDE_ATOMICS_H #endif /* ATOMICS_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index da6ac8ed83..edc9bfa5ce 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2943,6 +2943,7 @@ XLogRecordBuffer XLogRedoAction XLogSegNo XLogSource +XLogwrtAtomic XLogwrtResult XLogwrtRqst XPVIV -- 2.30.2
>From 7da32bd3b130cab4b1358f8da0014bde154c9ffc Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Mon, 22 Nov 2021 18:13:09 -0300 Subject: [PATCH v5 2/2] more atomics --- src/backend/access/transam/xlog.c | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 225ac8bc2d..57885488da 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -576,11 +576,12 @@ typedef struct XLogCtlData XLogCtlInsert Insert; XLogwrtAtomic LogwrtResult; /* uses atomics */ + pg_atomic_uint64 asyncXactLSN; /* LSN of newest async commit/abort */ + /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ FullTransactionId ckptFullXid; /* nextXid of latest checkpoint */ - XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ XLogRecPtr replicationSlotMinLSN; /* oldest LSN needed by any slot */ XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG segment */ @@ -656,9 +657,9 @@ typedef struct XLogCtlData /* * WalWriterSleeping indicates whether the WAL writer is currently in * low-power mode (and hence should be nudged if an async commit occurs). - * Protected by info_lck. + * Uses atomics. */ - bool WalWriterSleeping; + pg_atomic_uint32 WalWriterSleeping; /* * recoveryWakeupLatch is used to wake up the startup process to continue @@ -2752,12 +2753,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr WriteRqstPtr = asyncXactLSN; bool sleeping; - LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); - SpinLockAcquire(&XLogCtl->info_lck); - sleeping = XLogCtl->WalWriterSleeping; - if (XLogCtl->asyncXactLSN < asyncXactLSN) - XLogCtl->asyncXactLSN = asyncXactLSN; - SpinLockRelease(&XLogCtl->info_lck); + sleeping = (bool) pg_atomic_read_u32(&XLogCtl->WalWriterSleeping); + pg_atomic_monotonic_advance_u64(&XLogCtl->asyncXactLSN, asyncXactLSN); /* * If the WALWriter is sleeping, we should kick it to make it come out of @@ -2770,6 +2767,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ; /* if we have already flushed that far, we're done */ + LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (WriteRqstPtr <= LogFlushResult) return; } @@ -3127,10 +3125,8 @@ XLogBackgroundFlush(void) LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); if (WriteRqst.Write <= LogFlushResult) { - pg_memory_barrier(); - SpinLockAcquire(&XLogCtl->info_lck); - WriteRqst.Write = XLogCtl->asyncXactLSN; - SpinLockRelease(&XLogCtl->info_lck); + pg_read_barrier(); + WriteRqst.Write = pg_atomic_read_u64(&XLogCtl->asyncXactLSN); flexible = false; /* ensure it all gets written */ } @@ -3205,6 +3201,7 @@ XLogBackgroundFlush(void) LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write); LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush); + /* no barrier needed here */ if (WriteRqst.Write > LogWriteResult || WriteRqst.Flush > LogFlushResult) { @@ -5318,7 +5315,7 @@ XLOGShmemInit(void) XLogCtl->SharedHotStandbyActive = false; XLogCtl->InstallXLogFileSegmentActive = false; XLogCtl->SharedPromoteIsTriggered = false; - XLogCtl->WalWriterSleeping = false; + pg_atomic_write_u32(&XLogCtl->WalWriterSleeping, (uint32) false); SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); @@ -13256,13 +13253,13 @@ WakeupRecovery(void) /* * Update the WalWriterSleeping flag. + * + * Note there is no memory barrier here. Caller must insert one if needed. */ void SetWalWriterSleeping(bool sleeping) { - SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->WalWriterSleeping = sleeping; - SpinLockRelease(&XLogCtl->info_lck); + pg_atomic_write_u32(&XLogCtl->WalWriterSleeping, (uint32) sleeping); } /* -- 2.30.2