So I'm again distracted by something else, so here's what will have to
pass for v3 for the time being.
--
Álvaro Herrera 39°49'30"S 73°17'W — https://www.EnterpriseDB.com/
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 24165ab03e..43495b8260 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -731,8 +731,10 @@ typedef struct XLogCtlData
*/
XLogSegNo lastNotifiedSeg;
XLogSegNo earliestSegBoundary;
+ XLogRecPtr earliestSegBoundaryStartPtr;
XLogRecPtr earliestSegBoundaryEndPtr;
XLogSegNo latestSegBoundary;
+ XLogRecPtr latestSegBoundaryStartPtr;
XLogRecPtr latestSegBoundaryEndPtr;
slock_t segtrack_lck; /* locks shared variables shown above */
@@ -932,7 +934,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
XLogSegNo *endlogSegNo);
static void UpdateLastRemovedPtr(char *filename);
static void ValidateXLOGDirectoryStructure(void);
-static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos);
+static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos);
static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
@@ -1022,6 +1024,8 @@ XLogInsertRecord(XLogRecData *rdata,
info == XLOG_SWITCH);
XLogRecPtr StartPos;
XLogRecPtr EndPos;
+ XLogSegNo StartSeg;
+ XLogSegNo EndSeg;
bool prevDoPageWrites = doPageWrites;
/* we assume that all of the record header is in the first chunk */
@@ -1157,6 +1161,31 @@ XLogInsertRecord(XLogRecData *rdata,
*/
}
+ /*
+ * Before releasing our WAL insertion lock, register that we crossed the
+ * segment boundary if that occurred. We need to do it with the lock held
+ * for GetSafeFlushRecPtr's sake: otherwise it could see the WAL flush
+ * point advance but not see the registration, which would lead it to
+ * wrongly conclude that our flush point is safe to use.
+ */
+ if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+ {
+ XLByteToSeg(StartPos, StartSeg, wal_segment_size);
+ XLByteToSeg(EndPos, EndSeg, wal_segment_size);
+
+ /*
+ * Register our crossing the segment boundary if that occurred.
+ *
+ * Note that we did not use XLByteToPrevSeg() for determining the
+ * ending segment. This is so that a record that fits perfectly into
+ * the end of the segment causes said segment to get marked ready for
+ * archival immediately.
+ */
+ if (StartSeg != EndSeg &&
+ (XLogArchivingActive() || XLogStandbyInfoActive()))
+ RegisterSegmentBoundary(EndSeg, StartPos, EndPos);
+ }
+
/*
* Done! Let others know that we're finished.
*/
@@ -1168,27 +1197,10 @@ XLogInsertRecord(XLogRecData *rdata,
/*
* If we crossed page boundary, update LogwrtRqst.Write; if we crossed
- * segment boundary, register that and wake up walwriter.
+ * segment boundary, wake up walwriter.
*/
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
- XLogSegNo StartSeg;
- XLogSegNo EndSeg;
-
- XLByteToSeg(StartPos, StartSeg, wal_segment_size);
- XLByteToSeg(EndPos, EndSeg, wal_segment_size);
-
- /*
- * Register our crossing the segment boundary if that occurred.
- *
- * Note that we did not use XLByteToPrevSeg() for determining the
- * ending segment. This is so that a record that fits perfectly into
- * the end of the segment causes the latter to get marked ready for
- * archival immediately.
- */
- if (StartSeg != EndSeg && XLogArchivingActive())
- RegisterSegmentBoundary(EndSeg, EndPos);
-
/*
* Advance LogwrtRqst.Write so that it includes new block(s).
*
@@ -2649,7 +2661,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
- if (XLogArchivingActive())
+ if (XLogArchivingActive() || XLogStandbyInfoActive())
NotifySegmentsReadyForArchive(LogwrtResult.Flush);
XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
@@ -2739,7 +2751,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
SpinLockRelease(&XLogCtl->info_lck);
}
- if (XLogArchivingActive())
+ if (XLogArchivingActive() || XLogStandbyInfoActive())
NotifySegmentsReadyForArchive(LogwrtResult.Flush);
}
@@ -4398,7 +4410,7 @@ ValidateXLOGDirectoryStructure(void)
* to delay until the end segment is known flushed.
*/
static void
-RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
+RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos)
{
XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY;
@@ -4415,6 +4427,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
if (XLogCtl->earliestSegBoundary == MaxXLogSegNo)
{
XLogCtl->earliestSegBoundary = seg;
+ XLogCtl->earliestSegBoundaryStartPtr = startpos;
XLogCtl->earliestSegBoundaryEndPtr = endpos;
}
else if (seg > XLogCtl->earliestSegBoundary &&
@@ -4422,6 +4435,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
seg > XLogCtl->latestSegBoundary))
{
XLogCtl->latestSegBoundary = seg;
+ XLogCtl->latestSegBoundaryStartPtr = startpos;
XLogCtl->latestSegBoundaryEndPtr = endpos;
}
@@ -4438,10 +4452,8 @@ void
NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
{
XLogSegNo latest_boundary_seg;
- XLogSegNo last_notified;
XLogSegNo flushed_seg;
XLogSegNo seg;
- bool keep_latest;
XLByteToSeg(flushRecPtr, flushed_seg, wal_segment_size);
@@ -4451,13 +4463,17 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
XLogCtl->latestSegBoundaryEndPtr <= flushRecPtr)
{
latest_boundary_seg = XLogCtl->latestSegBoundary;
- keep_latest = false;
+ XLogCtl->earliestSegBoundary = MaxXLogSegNo;
+ XLogCtl->earliestSegBoundaryStartPtr = InvalidXLogRecPtr;
+ XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
}
else if (XLogCtl->earliestSegBoundary <= flushed_seg &&
XLogCtl->earliestSegBoundaryEndPtr <= flushRecPtr)
{
latest_boundary_seg = XLogCtl->earliestSegBoundary;
- keep_latest = true;
+ XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
+ XLogCtl->earliestSegBoundaryStartPtr = XLogCtl->latestSegBoundaryStartPtr;
+ XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
}
else
{
@@ -4465,41 +4481,38 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
return;
}
- last_notified = XLogCtl->lastNotifiedSeg;
-
- /*
- * Update shared memory and discard segment boundaries that are no longer
- * needed.
- *
- * It is safe to update shared memory before we attempt to create the
- * .ready files. If our calls to XLogArchiveNotifySeg() fail,
- * RemoveOldXlogFiles() will retry it as needed.
- */
- if (last_notified < latest_boundary_seg - 1)
- XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1;
-
- if (keep_latest)
- {
- XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
- XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
- }
- else
- {
- XLogCtl->earliestSegBoundary = MaxXLogSegNo;
- XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
- }
-
XLogCtl->latestSegBoundary = MaxXLogSegNo;
+ XLogCtl->latestSegBoundaryStartPtr = InvalidXLogRecPtr;
XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
- SpinLockRelease(&XLogCtl->segtrack_lck);
+ if (XLogArchivingActive())
+ {
+ XLogSegNo last_notified;
- /*
- * Notify archiver about segments that are ready for archival (by creating
- * the corresponding .ready files).
- */
- for (seg = last_notified + 1; seg < latest_boundary_seg; seg++)
- XLogArchiveNotifySeg(seg, false);
+ last_notified = XLogCtl->lastNotifiedSeg;
+
+ /*
+ * Update shared memory and discard segment boundaries that are no
+ * longer needed.
+ *
+ * It is safe to have updated shared memory before we attempt to
+ * create the .ready files. If our calls to XLogArchiveNotifySeg()
+ * fail, RemoveOldXlogFiles() will retry it as needed.
+ */
+ if (last_notified < latest_boundary_seg - 1)
+ XLogCtl->lastNotifiedSeg = latest_boundary_seg - 1;
+
+ SpinLockRelease(&XLogCtl->segtrack_lck);
+
+ /*
+ * Notify archiver about segments that are ready for archival (by
+ * creating the corresponding .ready files).
+ */
+ for (seg = last_notified + 1; seg < latest_boundary_seg; seg++)
+ XLogArchiveNotifySeg(seg, false);
+ }
+ else
+ SpinLockRelease(&XLogCtl->segtrack_lck);
PgArchWakeup();
}
@@ -8776,6 +8789,30 @@ GetFlushRecPtr(void)
return LogwrtResult.Flush;
}
+/*
+ * GetSafeFlushRecPtr -- Returns a "safe" flush position.
+ *
+ * Similar to the above, except that avoid reporting a location that might be
+ * overwritten if there's a crash before syncing the next segment.
+ */
+XLogRecPtr
+GetSafeFlushRecPtr(void)
+{
+ XLogRecPtr flush;
+
+ SpinLockAcquire(&XLogCtl->info_lck);
+ flush = XLogCtl->LogwrtResult.Flush;
+ SpinLockRelease(&XLogCtl->info_lck);
+
+ SpinLockAcquire(&XLogCtl->segtrack_lck);
+ if (XLogCtl->earliestSegBoundary != MaxXLogSegNo &&
+ XLogCtl->earliestSegBoundaryStartPtr < flush)
+ flush = XLogCtl->earliestSegBoundaryStartPtr;
+ SpinLockRelease(&XLogCtl->segtrack_lck);
+
+ return flush;
+}
+
/*
* GetLastImportantRecPtr -- Returns the LSN of the last important record
* inserted. All records not explicitly marked as unimportant are considered
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..aafb3e91de 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -621,7 +621,7 @@ StartReplication(StartReplicationCmd *cmd)
FlushPtr = GetStandbyFlushRecPtr();
}
else
- FlushPtr = GetFlushRecPtr();
+ FlushPtr = GetSafeFlushRecPtr();
if (cmd->timeline != 0)
{
@@ -2650,7 +2650,7 @@ XLogSendPhysical(void)
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = GetSafeFlushRecPtr();
}
/*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 6b6ae81c2d..1af59c36d4 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -313,6 +313,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
extern XLogRecPtr GetRedoRecPtr(void);
extern XLogRecPtr GetInsertRecPtr(void);
extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetSafeFlushRecPtr(void);
extern XLogRecPtr GetLastImportantRecPtr(void);
extern void RemovePromoteSignalFiles(void);
extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);