Attached is v14 which uses a separate spinlock. -- Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/ "No me acuerdo, pero no es cierto. No es cierto, y si fuera cierto, no me acuerdo." (Augusto Pinochet a una corte de justicia)
>From 4180334f51a1f343ce5795d0b62f3aa298b472da Mon Sep 17 00:00:00 2001 From: Nathan Bossart <bossa...@amazon.com> Date: Fri, 20 Aug 2021 19:25:31 +0000 Subject: [PATCH v14] Avoid creating archive status ".ready" files too early
WAL records may span multiple segments, but XLogWrite() does not wait for the entire record to be written out to disk before creating archive status files. Instead, as soon as the last WAL page of the segment is written, the archive status file is created, and the archiver may process it. If PostgreSQL crashes before it is able to write and flush the rest of the record (in the next WAL segment), the wrong version of the first segment file lingers in the archive, which causes operations such as point-in-time restores to fail. To fix this, keep track of records that span across segments and ensure that segments are only marked ready-for-archival once such records have been completely written to disk. Author: Nathan Bossart <bossa...@amazon.com> Reviewed-by: Kyotaro Horiguchi <horikyota....@gmail.com> Reviewed-by: Ryo Matsumura <matsumura....@fujitsu.com> Reviewed-by: Andrey Borodin <x4...@yandex-team.ru> Discussion: https://postgr.es/m/cbddfa01-6e40-46bb-9f98-9340f4379...@amazon.com --- src/backend/access/transam/timeline.c | 2 +- src/backend/access/transam/xlog.c | 242 +++++++++++++++++++++-- src/backend/access/transam/xlogarchive.c | 17 +- src/backend/postmaster/walwriter.c | 7 + src/backend/replication/walreceiver.c | 6 +- src/include/access/xlog.h | 1 + src/include/access/xlogarchive.h | 4 +- src/include/access/xlogdefs.h | 1 + 8 files changed, 256 insertions(+), 24 deletions(-) diff --git a/src/backend/access/transam/timeline.c b/src/backend/access/transam/timeline.c index 8d0903c175..acd5c2431d 100644 --- a/src/backend/access/transam/timeline.c +++ b/src/backend/access/transam/timeline.c @@ -452,7 +452,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, if (XLogArchivingActive()) { TLHistoryFileName(histfname, newTLI); - XLogArchiveNotify(histfname); + XLogArchiveNotify(histfname, true); } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index e51a7a749d..95f03adef8 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -724,6 +724,18 @@ typedef struct XLogCtlData XLogRecPtr lastFpwDisableRecPtr; slock_t info_lck; /* locks shared variables shown above */ + + /* + * Variables used to track cross-segment records. Protected by + * segtrack_lck. + */ + XLogSegNo lastNotifiedSeg; + XLogSegNo earliestSegBoundary; + XLogRecPtr earliestSegBoundaryRecPtr; + XLogSegNo latestSegBoundary; + XLogRecPtr latestSegBoundaryRecPtr; + + slock_t segtrack_lck; /* locks shared variables shown above */ } XLogCtlData; static XLogCtlData *XLogCtl = NULL; @@ -920,6 +932,9 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo, XLogSegNo *endlogSegNo); static void UpdateLastRemovedPtr(char *filename); static void ValidateXLOGDirectoryStructure(void); +static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos); +static bool GetLatestSegmentBoundary(XLogRecPtr flushed, XLogSegNo *latest_boundary_seg); +static void RemoveSegmentBoundariesUpTo(XLogSegNo seg); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); static XLogRecord *ReadRecord(XLogReaderState *xlogreader, @@ -1154,23 +1169,56 @@ XLogInsertRecord(XLogRecData *rdata, END_CRIT_SECTION(); /* - * Update shared LogwrtRqst.Write, if we crossed page boundary. + * If we crossed page boundary, update LogwrtRqst.Write; if we crossed + * segment boundary, register that and wake up walwriter. */ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) { + XLogSegNo StartSeg; + XLogSegNo EndSeg; + + XLByteToSeg(StartPos, StartSeg, wal_segment_size); + XLByteToSeg(EndPos, EndSeg, wal_segment_size); + + /* + * Register our crossing the segment boundary if that occurred. + * + * Note that we did not use XLByteToPrevSeg() for determining the + * ending segment. This is so that a record that fits perfectly into + * the end of the segment is marked ready for archival as soon as the + * flushed pointer jumps to the next segment. + */ + if (StartSeg != EndSeg && XLogArchivingActive()) + RegisterSegmentBoundary(EndSeg, EndPos); + + /* + * Advance LogwrtRqst.Write so that it includes new block(s). + * + * We do this after registering the segment boundary so that the + * comparison with the flushed pointer below can use the latest value + * known globally. + */ SpinLockAcquire(&XLogCtl->info_lck); - /* advance global request to include new block(s) */ if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; /* update local result copy while I have the chance */ LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); + + /* + * There's a chance that the record was already flushed to disk and we + * missed marking segments as ready for archive. If this happens, we + * nudge the WALWriter, which will take care of notifying segments as + * needed. + */ + if (StartSeg != EndSeg && XLogArchivingActive() && + LogwrtResult.Flush >= EndPos && ProcGlobal->walwriterLatch) + SetLatch(ProcGlobal->walwriterLatch); } /* * If this was an XLOG_SWITCH record, flush the record and the empty - * padding space that fills the rest of the segment, and perform - * end-of-segment actions (eg, notifying archiver). + * padding space that fills the rest of the segment. */ if (isLogSwitch) { @@ -2421,6 +2469,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); + Assert(LWLockHeldByMe(WALWriteLock)); /* * Update local LogwrtResult (caller probably did this already, but...) @@ -2586,11 +2635,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) * later. Doing it here ensures that one and only one backend will * perform this fsync. * - * This is also the right place to notify the Archiver that the - * segment is ready to copy to archival storage, and to update the - * timer for archive_timeout, and to signal for a checkpoint if - * too many logfile segments have been used since the last - * checkpoint. + * If WAL archiving is active, we attempt to notify the archiver + * of any segments that are now ready for archival. + * + * This is also the right place to update the timer for + * archive_timeout and to signal for a checkpoint if too many + * logfile segments have been used since the last checkpoint. */ if (finishing_seg) { @@ -2602,7 +2652,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) - XLogArchiveNotifySeg(openLogSegNo); + NotifySegmentsReadyForArchive(LogwrtResult.Flush); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL); XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush; @@ -2690,6 +2740,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; SpinLockRelease(&XLogCtl->info_lck); } + + if (XLogArchivingActive()) + NotifySegmentsReadyForArchive(LogwrtResult.Flush); } /* @@ -4328,6 +4381,151 @@ ValidateXLOGDirectoryStructure(void) } } +/* + * RegisterSegmentBoundary + * Record a new segment boundary if needed. + */ +static void +RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos) +{ + XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY; + + /* verify caller computed segment number correctly */ + AssertArg((XLByteToSeg(pos, segno, wal_segment_size), segno == seg)); + + SpinLockAcquire(&XLogCtl->segtrack_lck); + + /* + * If no segment boundaries are registered, store the new segment boundary + * in earliestSegBoundary. Otherwise, store the greater segment boundaries in + * latestSegBoundary. + */ + if (XLogCtl->earliestSegBoundary == MaxXLogSegNo) + { + XLogCtl->earliestSegBoundary = seg; + XLogCtl->earliestSegBoundaryRecPtr = pos; + } + else if (seg > XLogCtl->earliestSegBoundary && + (XLogCtl->latestSegBoundary == MaxXLogSegNo || + seg > XLogCtl->latestSegBoundary)) + { + XLogCtl->latestSegBoundary = seg; + XLogCtl->latestSegBoundaryRecPtr = pos; + } + + SpinLockRelease(&XLogCtl->segtrack_lck); +} + +/* + * GetLatestSegmentBoundary + * + * Look up the latest segment boundary that is less than or equal to the + * given "flushed" pointer. If such a segment is found, + * latest_boundary_seg is populated and true is returned. Otherwise, false + * is returned. + * + * Callers should hold XLogCtl->segtrack_lck when calling this function. + */ +static bool +GetLatestSegmentBoundary(XLogRecPtr flushed, XLogSegNo *latest_boundary_seg) +{ + XLogSegNo flushed_seg; + + Assert(latest_boundary_seg != NULL); + + XLByteToSeg(flushed, flushed_seg, wal_segment_size); + + if (XLogCtl->latestSegBoundary <= flushed_seg && + XLogCtl->latestSegBoundaryRecPtr <= flushed) + { + *latest_boundary_seg = XLogCtl->latestSegBoundary; + return true; + } + + if (XLogCtl->earliestSegBoundary <= flushed_seg && + XLogCtl->earliestSegBoundaryRecPtr <= flushed) + { + *latest_boundary_seg = XLogCtl->earliestSegBoundary; + return true; + } + + return false; +} + +/* + * RemoveSegmentBoundariesUpTo + * + * Remove all segment boundaries with segment numbers up to and including + * seg. + * + * Callers should hold XLogCtl->segtrack_lck when calling this function. + */ +static void +RemoveSegmentBoundariesUpTo(XLogSegNo seg) +{ + if (XLogCtl->latestSegBoundary <= seg) + { + XLogCtl->earliestSegBoundary = MaxXLogSegNo; + XLogCtl->earliestSegBoundaryRecPtr = InvalidXLogRecPtr; + + XLogCtl->latestSegBoundary = MaxXLogSegNo; + XLogCtl->latestSegBoundaryRecPtr = InvalidXLogRecPtr; + } + else if (XLogCtl->earliestSegBoundary <= seg) + { + XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary; + XLogCtl->earliestSegBoundaryRecPtr = XLogCtl->latestSegBoundaryRecPtr; + + XLogCtl->latestSegBoundary = MaxXLogSegNo; + XLogCtl->latestSegBoundaryRecPtr = InvalidXLogRecPtr; + } +} + +/* + * NotifySegmentsReadyForArchive + * + * Mark segments as ready for archival, given that it is safe to do so. + * This function is idempotent. + */ +void +NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr) +{ + XLogSegNo latest_boundary_seg = 0; + + SpinLockAcquire(&XLogCtl->segtrack_lck); + + /* Retrieve the latest segment boundary to use for notifying segments. */ + if (GetLatestSegmentBoundary(flushRecPtr, &latest_boundary_seg)) + { + XLogSegNo last_notified = XLogCtl->lastNotifiedSeg; + + /* + * Update shared memory and discard segment boundaries that are no + * longer needed. + * + * It is safe to update shared memory before we attempt to create the + * .ready files. If our calls to XLogArchiveNotifySeg() fail, + * RemoveOldXlogFiles() will retry it as needed. + */ + if (last_notified < latest_boundary_seg - 1) + XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1; + RemoveSegmentBoundariesUpTo(latest_boundary_seg); + + SpinLockRelease(&XLogCtl->segtrack_lck); + + /* + * Notify archiver about segments that are ready for archival (by + * creating the corresponding .ready files). + */ + for (XLogSegNo seg = last_notified + 1; seg < latest_boundary_seg; seg++) + XLogArchiveNotifySeg(seg, false); + + PgArchWakeup(); + } + else + SpinLockRelease(&XLogCtl->segtrack_lck); +} + /* * Remove previous backup history files. This also retries creation of * .ready files for any backup history files for which XLogArchiveNotify @@ -5230,9 +5428,17 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); + SpinLockInit(&XLogCtl->segtrack_lck); SpinLockInit(&XLogCtl->ulsn_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogCtl->recoveryNotPausedCV); + + /* Initialize stuff for marking segments as ready for archival. */ + XLogCtl->lastNotifiedSeg = MaxXLogSegNo; + XLogCtl->earliestSegBoundary = MaxXLogSegNo; + XLogCtl->earliestSegBoundaryRecPtr = InvalidXLogRecPtr; + XLogCtl->latestSegBoundary = MaxXLogSegNo; + XLogCtl->latestSegBoundaryRecPtr = InvalidXLogRecPtr; } /* @@ -7873,6 +8079,20 @@ StartupXLOG(void) XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; + /* + * Initialize XLogCtl->lastNotifiedSeg to the previous WAL file. + */ + if (XLogArchivingActive()) + { + XLogSegNo EndOfLogSeg; + + XLByteToSeg(EndOfLog, EndOfLogSeg, wal_segment_size); + + SpinLockAcquire(&XLogCtl->segtrack_lck); + XLogCtl->lastNotifiedSeg = EndOfLogSeg - 1; + SpinLockRelease(&XLogCtl->segtrack_lck); + } + /* * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE * record before resource manager writes cleanup WAL records or checkpoint @@ -8000,7 +8220,7 @@ StartupXLOG(void) XLogArchiveCleanup(partialfname); durable_rename(origpath, partialpath, ERROR); - XLogArchiveNotify(partialfname); + XLogArchiveNotify(partialfname, true); } } } diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index 26b023e754..b9c19b2085 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -433,7 +433,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(xlogfname); else - XLogArchiveNotify(xlogfname); + XLogArchiveNotify(xlogfname, true); /* * If the existing file was replaced, since walsenders might have it open, @@ -462,9 +462,12 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) * by the archiver, e.g. we write 0000000100000001000000C6.ready * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6, * then when complete, rename it to 0000000100000001000000C6.done + * + * Optionally, nudge the archiver process so that it'll notice the file we + * create. */ void -XLogArchiveNotify(const char *xlog) +XLogArchiveNotify(const char *xlog, bool nudge) { char archiveStatusPath[MAXPGPATH]; FILE *fd; @@ -489,8 +492,8 @@ XLogArchiveNotify(const char *xlog) return; } - /* Notify archiver that it's got something to do */ - if (IsUnderPostmaster) + /* If caller requested, let archiver know it's got work to do */ + if (nudge) PgArchWakeup(); } @@ -498,12 +501,12 @@ XLogArchiveNotify(const char *xlog) * Convenience routine to notify using segment number representation of filename */ void -XLogArchiveNotifySeg(XLogSegNo segno) +XLogArchiveNotifySeg(XLogSegNo segno, bool nudge) { char xlog[MAXFNAMELEN]; XLogFileName(xlog, ThisTimeLineID, segno, wal_segment_size); - XLogArchiveNotify(xlog); + XLogArchiveNotify(xlog, nudge); } /* @@ -608,7 +611,7 @@ XLogArchiveCheckDone(const char *xlog) return true; /* Retry creation of the .ready file */ - XLogArchiveNotify(xlog); + XLogArchiveNotify(xlog, true); return false; } diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c index 626fae8454..6a1e16edc2 100644 --- a/src/backend/postmaster/walwriter.c +++ b/src/backend/postmaster/walwriter.c @@ -248,6 +248,13 @@ WalWriterMain(void) /* Process any signals received recently */ HandleWalWriterInterrupts(); + /* + * Notify the archiver of any WAL segments that are ready. We do this + * here to handle a race condition where WAL is flushed to disk prior + * to registering the segment boundary. + */ + NotifySegmentsReadyForArchive(GetFlushRecPtr()); + /* * Do what we're here for; then, if XLogBackgroundFlush() found useful * work to do, reset hibernation counter. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 9a2bc37fd7..60de3be92c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -622,7 +622,7 @@ WalReceiverMain(void) if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(xlogfname); else - XLogArchiveNotify(xlogfname); + XLogArchiveNotify(xlogfname, true); } recvFile = -1; @@ -760,7 +760,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(fname); else - XLogArchiveNotify(fname); + XLogArchiveNotify(fname, true); pfree(fname); pfree(content); @@ -915,7 +915,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) XLogArchiveForceDone(xlogfname); else - XLogArchiveNotify(xlogfname); + XLogArchiveNotify(xlogfname, true); } recvFile = -1; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0a8ede700d..6b6ae81c2d 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -315,6 +315,7 @@ extern XLogRecPtr GetInsertRecPtr(void); extern XLogRecPtr GetFlushRecPtr(void); extern XLogRecPtr GetLastImportantRecPtr(void); extern void RemovePromoteSignalFiles(void); +extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr); extern bool PromoteIsTriggered(void); extern bool CheckPromoteSignal(void); diff --git a/src/include/access/xlogarchive.h b/src/include/access/xlogarchive.h index 3edd1a976c..935b4cb02d 100644 --- a/src/include/access/xlogarchive.h +++ b/src/include/access/xlogarchive.h @@ -23,8 +23,8 @@ extern bool RestoreArchivedFile(char *path, const char *xlogfname, extern void ExecuteRecoveryCommand(const char *command, const char *commandName, bool failOnSignal); extern void KeepFileRestoredFromArchive(const char *path, const char *xlogfname); -extern void XLogArchiveNotify(const char *xlog); -extern void XLogArchiveNotifySeg(XLogSegNo segno); +extern void XLogArchiveNotify(const char *xlog, bool nudge); +extern void XLogArchiveNotifySeg(XLogSegNo segno, bool nudge); extern void XLogArchiveForceDone(const char *xlog); extern bool XLogArchiveCheckDone(const char *xlog); extern bool XLogArchiveIsBusy(const char *xlog); diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 60348d1850..9b455e88e3 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -46,6 +46,7 @@ typedef uint64 XLogRecPtr; * XLogSegNo - physical log file sequence number. */ typedef uint64 XLogSegNo; +#define MaxXLogSegNo ((XLogSegNo) 0xFFFFFFFFFFFFFFFF) /* * TimeLineID (TLI) - identifies different database histories to prevent -- 2.30.2