Thanks for looking into this.

On Thu, Feb 22, 2024 at 1:54 AM Jeff Davis <pg...@j-davis.com> wrote:
>
> > 3.
> > @@ -6371,7 +6373,9 @@ GetFlushRecPtr(TimeLineID *insertTLI)
> >  If at all, a read
> > barrier is warranted here, we can use atomic read with full barrier
>
> I don't think we need a full barrier but I'm fine with using
> pg_atomic_read_membarrier_u64() if it's better for whatever reason.

For the sake of clarity and correctness, I've used
pg_atomic_read_membarrier_u64 everywhere for reading
XLogCtl->LogwrtResult.Write and XLogCtl->LogwrtResult.Flush.

> > 5. I guess we'd better use pg_atomic_read_u64_impl and
> > pg_atomic_compare_exchange_u64_impl in
> > pg_atomic_monotonic_advance_u64
> > to reduce one level of function indirections.
>
> Aren't they inlined?

Yes, all of them are inlined. But, it seems like XXX_impl functions
are being used in implementing exposed functions as a convention.
Therefore, having pg_atomic_read_u64_impl and
pg_atomic_compare_exchange_u64_impl doesn't sound bad IMV.

> > 6.
> > + * Full barrier semantics (even when value is unchanged).
> >
> > +    currval = pg_atomic_read_u64(ptr);
> > +    if (currval >= target_)
> > +    {
> > +        pg_memory_barrier();
>
> I don't think they are exactly equivalent: in the current patch, the
> first pg_atomic_read_u64() could be reordered with earlier reads;
> whereas that wouldn't work if using pg_atomic_read_membarrier_u64() it
> could not be. I'm not sure whether that could create a performance
> problem or not.

I left it as-is.

> > 9.
> > +    copyptr = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Copy);
> > +    if (startptr + count > copyptr)
> > +        ereport(WARNING,
> > +                (errmsg("request to read past end of generated WAL;
> > request %X/%X, current position %X/%X",
> > +                        LSN_FORMAT_ARGS(startptr + count),
> > LSN_FORMAT_ARGS(copyptr))));
> >
> > Any specific reason for this to be a WARNING rather than an ERROR?
>
> Good question. WaitXLogInsertionsToFinish() uses a LOG level message
> for the same situation. They should probably be the same log level, and
> I would think it would be either PANIC or WARNING. I have no idea why
> LOG was chosen.

WaitXLogInsertionsToFinish adjusts upto after LOG so that the wait is
never past the current insert position even if a caller asks for
reading WAL that doesn't yet exist. And the comment there says "Here
we just assume that to mean that all WAL that has been reserved needs
to be finished."

In contrast, WALReadFromBuffers kind of enforces callers to do
WaitXLogInsertionsToFinish (IOW asks callers to send in the WAL that
exists in the server). Therefore, an ERROR seems a reasonable choice
to me, if PANIC sounds rather strong affecting all the postgres
processes.

FWIW, a PANIC when requested to flush past the end of WAL in
WaitXLogInsertionsToFinish instead of LOG seems to be good. CF bot
animals don't complain -
https://github.com/BRupireddy2/postgres/tree/be_harsh_when_request_to_flush_past_end_of_WAL_WIP.

> 0001:
>
> * The comments on the two versions of the functions are redundant, and
> the style in that header seems to be to omit the comment from the u64
> version.

Removed comments atop 64-bit version.

> * I'm not sure the AssertPointerAlignment is needed in the u32 version?

Borrowed them from pg_atomic_read_u32 and
pg_atomic_compare_exchange_u32, just like how they assert before
calling XXX_impl versions. I don't see any problem with them.

> 0002:
>
> * All updates to the non-shared LogwrtResult should update both values.
> It's confusing to update those local values independently, because it
> violates the invariant that LogwrtResult.Flush <= LogwrtResult.Write.
>
> > 2. I guess we need to update both the Write and Flush local copies in
> > AdvanceXLInsertBuffer.
>
> I agree. Whenever we update the non-shared LogwrtResult, let's update
> the whole thing. Otherwise it's confusing.

Yes, it's done that way now with a macro XLogUpdateLocalLogWrtResult
using pg_atomic_read_membarrier_u64 to read both Write and Flush ptrs.

> * pg_memory_barrier() is not needed right before a spinlock

Got rid of it as we read both Flush and Write local copies with
pg_atomic_read_membarrier_u64.

> * As mentioned above, I think GetFlushRecPtr() needs two read barriers.
> Also, I think the same for GetXLogWriteRecPtr().

Yes, it's done that way now with a macro XLogUpdateLocalLogWrtResult
using pg_atomic_read_membarrier_u64 to read both Write and Flush ptrs.

> * In general, for any place using both Write and Flush, I think Flush
> should be loaded first, followed by a read barrier, followed by a load
> of the Write pointer.

Why read Flush first rather than Write? I think it's enough to do
{read Write,  read barrier, read Flush}. This works because Write is
monotonically advanced first before Flush using full barriers and we
don't get reordering issues between the readers and writers no? Am I
missing anything here?

> And I think in most of those places there should
> be a read barrier before the load of Flush, too, to avoid a stale value
> in places that might matter.

Yes, using pg_atomic_read_membarrier_u64 for both Write and Flush makes it easy.

> 0003:
>
> * We need to maintain the invariant that Copy >= Write >= Flush. I
> believe that's always satisfied, because the
> XLogWaitInsertionsToFinish() is always called before XLogWrite(). But
> we should add an assert or runtime check of this invariant somewhere.

Yes, that invariant is already maintained by the server. Although, I'm
not fully agree, I added an assertion to WaitXLogInsertionsToFinish
after updating XLogCtl->LogwrtResult.Copy. CF bot is happy with it -
https://github.com/BRupireddy2/postgres/tree/atomic_LogwrtResult_v13.

Please see the attached v13 patch set for further review.

--
Bharath Rupireddy
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com
From d9bca4b50c604e3ffe9a7c8128509203f103c7fb Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 4 Mar 2024 14:53:28 +0000
Subject: [PATCH v13 1/3] Add monotonic advancement functions for atomics

This commit adds functions for monotonically advancing the given
atomic variable with full barrier semantics.

An upcoming commit uses 64-bit monotonic function. But this commit
also adds 32-bit version just for the sake of completeness.
---
 src/include/port/atomics.h | 56 ++++++++++++++++++++++++++++++++++++++
 1 file changed, 56 insertions(+)

diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index ff47782cdb..53e1a337a4 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -438,6 +438,35 @@ pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
 	return pg_atomic_sub_fetch_u32_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline void
+pg_atomic_monotonic_advance_u32(volatile pg_atomic_uint32 *ptr, uint32 target_)
+{
+	uint32		currval;
+
+	AssertPointerAlignment(ptr, 4);
+
+	currval = pg_atomic_read_u32_impl(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+	AssertPointerAlignment(&currval, 4);
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u32_impl(ptr, &currval, target_))
+			break;
+	}
+}
+
 /* ----
  * The 64 bit operations have the same semantics as their 32bit counterparts
  * if they are available. Check the corresponding 32bit function for
@@ -570,6 +599,33 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64_impl(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(&currval, 8);
+#endif
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
-- 
2.34.1

From bc9219e7dfb47e77c08ff6f8602320d8844dff69 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 4 Mar 2024 14:54:18 +0000
Subject: [PATCH v13 2/3] Make XLogCtl->LogwrtResult accessible with atomics

Currently, access to LogwrtResult is protected by a spinlock.  This
becomes severely contended in some scenarios, such as with a largish
replication flock: walsenders all calling GetFlushRecPtr repeatedly
cause the processor heat up to the point where eggs can be fried on top.

This can be reduced to a non-problem by replacing XLogCtl->LogwrtResult
with a struct containing a pair of atomically accessed variables. Do so.
In a few places, we can adjust the exact location where the locals are
updated to account for the fact that we no longer need the spinlock.
---
 src/backend/access/transam/xlog.c | 106 ++++++++++++++----------------
 src/tools/pgindent/typedefs.list  |   1 +
 2 files changed, 52 insertions(+), 55 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 20a5f86209..ffbf93690e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -290,16 +290,13 @@ static bool doPageWrites;
  *
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
- * LogwrtResult indicates the byte positions we have already written/fsynced.
- * These structs are identical but are declared separately to indicate their
- * slightly different functions.
+ * LogWrtResult indicates the byte positions we have already written/fsynced.
+ * These structs are similar but are declared separately to indicate their
+ * slightly different functions; in addition, the latter is read and written
+ * using atomic operations.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, which is updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -322,6 +319,12 @@ static bool doPageWrites;
  *----------
  */
 
+typedef struct XLogwrtAtomic
+{
+	pg_atomic_uint64 Write;		/* last byte + 1 written out */
+	pg_atomic_uint64 Flush;		/* last byte + 1 flushed */
+} XLogwrtAtomic;
+
 typedef struct XLogwrtRqst
 {
 	XLogRecPtr	Write;			/* last byte + 1 to write out */
@@ -334,6 +337,13 @@ typedef struct XLogwrtResult
 	XLogRecPtr	Flush;			/* last byte + 1 flushed */
 } XLogwrtResult;
 
+/*
+ * Update local copy of shared XLogCtl->LogwrtResult.
+ */
+#define XLogUpdateLocalLogWrtResult() \
+		LogwrtResult.Write = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Write); \
+		LogwrtResult.Flush = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Flush);
+
 /*
  * Inserting to WAL is protected by a small fixed number of WAL insertion
  * locks. To insert to the WAL, you must hold one of the locks - it doesn't
@@ -457,6 +467,8 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtResult;
+
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
@@ -473,12 +485,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -608,7 +614,7 @@ static ControlFileData *ControlFile = NULL;
 static int	UsableBytesInSegment;
 
 /*
- * Private, possibly out-of-date copy of shared LogwrtResult.
+ * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult.
  * See discussion above.
  */
 static XLogwrtResult LogwrtResult = {0, 0};
@@ -958,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
+		XLogUpdateLocalLogWrtResult();
 	}
 
 	/*
@@ -1977,12 +1982,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
 			SpinLockRelease(&XLogCtl->info_lck);
+			XLogUpdateLocalLogWrtResult();
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
@@ -2002,7 +2007,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				XLogUpdateLocalLogWrtResult();
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2286,7 +2291,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogWrtResult();
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2501,6 +2506,10 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 
 	Assert(npages == 0);
 
+	/* Publish current write result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write,
+									LogwrtResult.Write);
+
 	/*
 	 * If asked to flush, do so
 	 */
@@ -2537,22 +2546,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		LogwrtResult.Flush = LogwrtResult.Write;
 	}
 
-	/*
-	 * Update shared-memory status
-	 *
-	 * We make sure that the shared 'request' values do not fall behind the
-	 * 'result' values.  This is not absolutely essential, but it saves some
-	 * code in a couple of places.
-	 */
-	{
-		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
-		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
-			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
-		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
-			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
-		SpinLockRelease(&XLogCtl->info_lck);
-	}
+	/* Publish current flush result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush,
+									LogwrtResult.Flush);
 }
 
 /*
@@ -2569,12 +2565,12 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	prevAsyncXactLSN;
 
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	prevAsyncXactLSN = XLogCtl->asyncXactLSN;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	/*
 	 * If somebody else already called this function with a more aggressive
@@ -2781,8 +2777,8 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
+		XLogUpdateLocalLogWrtResult();
 
 		/* done already? */
 		if (record <= LogwrtResult.Flush)
@@ -2812,7 +2808,7 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		XLogUpdateLocalLogWrtResult();
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -2936,9 +2932,9 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	/* back off to last completed page boundary */
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
@@ -3024,7 +3020,7 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogWrtResult();
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3112,9 +3108,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -4907,6 +4901,9 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
+	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Write, InvalidXLogRecPtr);
+	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Flush, InvalidXLogRecPtr);
+
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
 	pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
@@ -5925,10 +5922,13 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
+	/*
+	 * Update local and shared status. This is OK to do without any locks
+	 * because no other process can be reading or writing WAL yet.
+	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
-
-	XLogCtl->LogwrtResult = LogwrtResult;
-
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
@@ -6367,9 +6367,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9281,9 +9279,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	return LogwrtResult.Write;
 }
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 95ae7845d8..63f60df7bf 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3129,6 +3129,7 @@ XLogRedoAction
 XLogSegNo
 XLogSource
 XLogStats
+XLogwrtAtomic
 XLogwrtResult
 XLogwrtRqst
 XPV
-- 
2.34.1

From 9b490b29103987bb236db351bcbd4e2d4ce23514 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Mon, 4 Mar 2024 14:54:51 +0000
Subject: [PATCH v13 3/3] Add Copy pointer to track data copied to WAL buffers

---
 src/backend/access/transam/xlog.c | 34 ++++++++++++++++++++++++++++++-
 1 file changed, 33 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ffbf93690e..2568d39c11 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -321,6 +321,7 @@ static bool doPageWrites;
 
 typedef struct XLogwrtAtomic
 {
+	pg_atomic_uint64 Copy;		/* last byte + 1 copied to WAL buffers */
 	pg_atomic_uint64 Write;		/* last byte + 1 written out */
 	pg_atomic_uint64 Flush;		/* last byte + 1 flushed */
 } XLogwrtAtomic;
@@ -1497,6 +1498,7 @@ static XLogRecPtr
 WaitXLogInsertionsToFinish(XLogRecPtr upto)
 {
 	uint64		bytepos;
+	XLogRecPtr	copyptr;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -1505,6 +1507,11 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	if (MyProc == NULL)
 		elog(PANIC, "cannot wait without a PGPROC structure");
 
+	/* check if there's any work to do */
+	copyptr = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Copy);
+	if (upto <= copyptr)
+		return copyptr;
+
 	/* Read the current insert position */
 	SpinLockAcquire(&Insert->insertpos_lck);
 	bytepos = Insert->CurrBytePos;
@@ -1584,6 +1591,19 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
 			finishedUpto = insertingat;
 	}
+
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Copy, finishedUpto);
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		XLogRecPtr	Copy = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Copy);
+		XLogRecPtr	Write = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Write);
+		XLogRecPtr	Flush = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Flush);
+
+		Assert(Copy >= Write && Write >= Flush);
+	}
+#endif
+
 	return finishedUpto;
 }
 
@@ -1725,13 +1745,23 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
 {
 	char	   *pdst = dstbuf;
 	XLogRecPtr	recptr = startptr;
+	XLogRecPtr	copyptr;
 	Size		nbytes = count;
 
 	if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
 		return 0;
 
 	Assert(!XLogRecPtrIsInvalid(startptr));
-	Assert(startptr + count <= LogwrtResult.Write);
+
+	/*
+	 * Caller should ensure that the requested data has been copied to WAL
+	 * buffers before we try to read it.
+	 */
+	copyptr = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Copy);
+	if (startptr + count > copyptr)
+		ereport(ERROR,
+				(errmsg("request to read past end of generated WAL; request %X/%X, current position %X/%X",
+						LSN_FORMAT_ARGS(startptr + count), LSN_FORMAT_ARGS(copyptr))));
 
 	/*
 	 * Loop through the buffers without a lock. For each buffer, atomically
@@ -4901,6 +4931,7 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
+	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Copy, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Write, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Flush, InvalidXLogRecPtr);
 
@@ -5927,6 +5958,7 @@ StartupXLOG(void)
 	 * because no other process can be reading or writing WAL yet.
 	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Copy, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
-- 
2.34.1

Reply via email to