On 2021-Aug-24, Bossart, Nathan wrote:

> I wonder if we need to move the call to RegisterSegmentBoundary() to
> somewhere before WALInsertLockRelease() for this to work as intended.
> Right now, boundary registration could take place after the flush
> pointer has been advanced, which means GetSafeFlushRecPtr() could
> still return an unsafe position.

Yeah, you're right -- that's a definite risk.  I didn't try to reproduce
a problem with that, but it is seems pretty obvious that it can happen.

I didn't have a lot of luck with a reliable reproducer script.  I was
able to reproduce the problem starting with Ryo Matsumura's script and
attaching a replica; most of the time the replica would recover by
restarting from a streaming position earlier than where the problem
occurred; but a few times it would just get stuck with a WAL segment
containing a bogus record.  Then, after patch, the problem no longer
occurs.

I attach the patch with the change you suggested.

-- 
Álvaro Herrera              Valdivia, Chile  —  https://www.EnterpriseDB.com/
Tom: There seems to be something broken here.
Teodor: I'm in sackcloth and ashes...  Fixed.
        http://archives.postgresql.org/message-id/482d1632.8010...@sigaev.ru
>From 9f39d6f186c3af55db85ee73427a9ed94750939e Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 23 Aug 2021 18:25:48 -0400
Subject: [PATCH v2] Don't stream non-final WAL segments

Avoid setting the physical-stream replication read pointer in the middle
of a WAL record.  This can occur if a record is split in two (or more)
across segment boundaries.  The reason to avoid it is that if we stream
the segment containing the first half, and then we crash before writing
the next segment, the primary will rewrite the tail of the segment with
a new WAL record (having discarded the incomplete record), but the
replica will be stuck trying to replay a broken file (since the next
segment will never contain the now-gone data).

To do this, change streaming replication to retreat the flush pointer
according to registered segment boundaries.
---
 src/backend/access/transam/xlog.c   | 80 +++++++++++++++++++++--------
 src/backend/replication/walsender.c |  2 +-
 src/include/access/xlog.h           |  1 +
 3 files changed, 62 insertions(+), 21 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 24165ab03e..4d010c86dd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -731,8 +731,10 @@ typedef struct XLogCtlData
 	 */
 	XLogSegNo	lastNotifiedSeg;
 	XLogSegNo	earliestSegBoundary;
+	XLogRecPtr	earliestSegBoundaryStartPtr;
 	XLogRecPtr	earliestSegBoundaryEndPtr;
 	XLogSegNo	latestSegBoundary;
+	XLogRecPtr	latestSegBoundaryStartPtr;
 	XLogRecPtr	latestSegBoundaryEndPtr;
 
 	slock_t		segtrack_lck;	/* locks shared variables shown above */
@@ -932,7 +934,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
 						   XLogSegNo *endlogSegNo);
 static void UpdateLastRemovedPtr(char *filename);
 static void ValidateXLOGDirectoryStructure(void);
-static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr pos);
+static void RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos);
 static void CleanupBackupHistory(void);
 static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
 static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
@@ -1022,6 +1024,8 @@ XLogInsertRecord(XLogRecData *rdata,
 							   info == XLOG_SWITCH);
 	XLogRecPtr	StartPos;
 	XLogRecPtr	EndPos;
+	XLogSegNo	StartSeg;
+	XLogSegNo	EndSeg;
 	bool		prevDoPageWrites = doPageWrites;
 
 	/* we assume that all of the record header is in the first chunk */
@@ -1157,6 +1161,30 @@ XLogInsertRecord(XLogRecData *rdata,
 		 */
 	}
 
+	/*
+	 * Before releasing our WAL insertion lock, register that we crossed the
+	 * segment boundary if that occurred.  We need to do it with the lock held
+	 * for GetSafeFlushRecPtr's sake: otherwise it could see the WAL flush
+	 * point advance but not see the registration, which would lead it to
+	 * wrongly conclude that our flush point is safe to use.
+	 */
+	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
+	{
+		XLByteToSeg(StartPos, StartSeg, wal_segment_size);
+		XLByteToSeg(EndPos, EndSeg, wal_segment_size);
+
+		/*
+		 * Register our crossing the segment boundary if that occurred.
+		 *
+		 * Note that we did not use XLByteToPrevSeg() for determining the
+		 * ending segment.  This is so that a record that fits perfectly into
+		 * the end of the segment causes said segment to get marked ready for
+		 * archival immediately.
+		 */
+		if (StartSeg != EndSeg && XLogArchivingActive())
+			RegisterSegmentBoundary(EndSeg, StartPos, EndPos);
+	}
+
 	/*
 	 * Done! Let others know that we're finished.
 	 */
@@ -1168,27 +1196,10 @@ XLogInsertRecord(XLogRecData *rdata,
 
 	/*
 	 * If we crossed page boundary, update LogwrtRqst.Write; if we crossed
-	 * segment boundary, register that and wake up walwriter.
+	 * segment boundary, wake up walwriter.
 	 */
 	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
 	{
-		XLogSegNo	StartSeg;
-		XLogSegNo	EndSeg;
-
-		XLByteToSeg(StartPos, StartSeg, wal_segment_size);
-		XLByteToSeg(EndPos, EndSeg, wal_segment_size);
-
-		/*
-		 * Register our crossing the segment boundary if that occurred.
-		 *
-		 * Note that we did not use XLByteToPrevSeg() for determining the
-		 * ending segment.  This is so that a record that fits perfectly into
-		 * the end of the segment causes the latter to get marked ready for
-		 * archival immediately.
-		 */
-		if (StartSeg != EndSeg && XLogArchivingActive())
-			RegisterSegmentBoundary(EndSeg, EndPos);
-
 		/*
 		 * Advance LogwrtRqst.Write so that it includes new block(s).
 		 *
@@ -4398,7 +4409,7 @@ ValidateXLOGDirectoryStructure(void)
  * to delay until the end segment is known flushed.
  */
 static void
-RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
+RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr startpos, XLogRecPtr endpos)
 {
 	XLogSegNo	segno PG_USED_FOR_ASSERTS_ONLY;
 
@@ -4415,6 +4426,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
 	if (XLogCtl->earliestSegBoundary == MaxXLogSegNo)
 	{
 		XLogCtl->earliestSegBoundary = seg;
+		XLogCtl->earliestSegBoundaryStartPtr = startpos;
 		XLogCtl->earliestSegBoundaryEndPtr = endpos;
 	}
 	else if (seg > XLogCtl->earliestSegBoundary &&
@@ -4422,6 +4434,7 @@ RegisterSegmentBoundary(XLogSegNo seg, XLogRecPtr endpos)
 			  seg > XLogCtl->latestSegBoundary))
 	{
 		XLogCtl->latestSegBoundary = seg;
+		XLogCtl->latestSegBoundaryStartPtr = startpos;
 		XLogCtl->latestSegBoundaryEndPtr = endpos;
 	}
 
@@ -4481,15 +4494,18 @@ NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr)
 	if (keep_latest)
 	{
 		XLogCtl->earliestSegBoundary = XLogCtl->latestSegBoundary;
+		XLogCtl->earliestSegBoundaryStartPtr = XLogCtl->latestSegBoundaryStartPtr;
 		XLogCtl->earliestSegBoundaryEndPtr = XLogCtl->latestSegBoundaryEndPtr;
 	}
 	else
 	{
 		XLogCtl->earliestSegBoundary = MaxXLogSegNo;
+		XLogCtl->earliestSegBoundaryStartPtr = InvalidXLogRecPtr;
 		XLogCtl->earliestSegBoundaryEndPtr = InvalidXLogRecPtr;
 	}
 
 	XLogCtl->latestSegBoundary = MaxXLogSegNo;
+	XLogCtl->latestSegBoundaryStartPtr = InvalidXLogRecPtr;
 	XLogCtl->latestSegBoundaryEndPtr = InvalidXLogRecPtr;
 
 	SpinLockRelease(&XLogCtl->segtrack_lck);
@@ -8776,6 +8792,30 @@ GetFlushRecPtr(void)
 	return LogwrtResult.Flush;
 }
 
+/*
+ * GetSafeFlushRecPtr -- Returns a "safe" flush position.
+ *
+ * Similar to the above, except that avoid reporting a location that might be
+ * overwritten if there's a crash before syncing the next segment.
+ */
+XLogRecPtr
+GetSafeFlushRecPtr(void)
+{
+	XLogRecPtr		flush;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	flush = XLogCtl->LogwrtResult.Flush;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	SpinLockAcquire(&XLogCtl->segtrack_lck);
+	if (XLogCtl->earliestSegBoundary != MaxXLogSegNo &&
+		XLogCtl->earliestSegBoundaryStartPtr < flush)
+		flush = XLogCtl->earliestSegBoundaryStartPtr;
+	SpinLockRelease(&XLogCtl->segtrack_lck);
+
+	return flush;
+}
+
 /*
  * GetLastImportantRecPtr -- Returns the LSN of the last important record
  * inserted. All records not explicitly marked as unimportant are considered
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3ca2a11389..4c98fecdce 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2650,7 +2650,7 @@ XLogSendPhysical(void)
 		 * primary: if the primary subsequently crashes and restarts, standbys
 		 * must not have applied any WAL that got lost on the primary.
 		 */
-		SendRqstPtr = GetFlushRecPtr();
+		SendRqstPtr = GetSafeFlushRecPtr();
 	}
 
 	/*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 6b6ae81c2d..1af59c36d4 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -313,6 +313,7 @@ extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
 extern XLogRecPtr GetRedoRecPtr(void);
 extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
+extern XLogRecPtr GetSafeFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void RemovePromoteSignalFiles(void);
 extern void NotifySegmentsReadyForArchive(XLogRecPtr flushRecPtr);
-- 
2.20.1

Reply via email to