From 01d0c6e440d344a06475bccbb6d31373957c7ad7 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 27 Apr 2022 10:17:11 +0000
Subject: [PATCH v2] Fix pg_walinspect race against flush LSN.

Instability in the TAP test for pg_walinspect revealed that
pg_get_wal_records_info_till_end_of_wal(x) would try to decode all the
records beginning in the range [x, flush LSN), even though that might
include a partial record at the end of the range.  In that case, an
error would be reported by read_local_xlog_page_no_wait when it tried to
read past the flush LSN.  That caused a test failure only on a BF
animal that had been restarted recently, but could be expected to happen
in the wild quite easily depending on the alignment of various
parameters.

Adjust pg_walinspect the tolerate errors without messages, as a way to
discover the end of the WAL region to decode.

Discussion: https://postgr.es/m/Ymd/e5eeZMNAkrXo%40paquier.xyz
Discussion: https://postgr.es/m/111657.1650910309@sss.pgh.pa.us

Authors: Thomas Munro, Bharath Rupireddy.
---
 contrib/pg_walinspect/pg_walinspect.c  | 64 +++++++++++++-------------
 src/backend/access/transam/xlogutils.c | 12 +++++
 src/include/access/xlogutils.h         |  6 +++
 3 files changed, 51 insertions(+), 31 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index bf38863ff1..88b15eb753 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -89,6 +89,7 @@ static XLogReaderState *
 InitXLogReaderState(XLogRecPtr lsn, XLogRecPtr *first_record)
 {
 	XLogReaderState *xlogreader;
+	ReadLocalXLogPageNoWaitPrivate *private_data;
 
 	/*
 	 * Reading WAL below the first page of the first segments isn't allowed.
@@ -100,11 +101,14 @@ InitXLogReaderState(XLogRecPtr lsn, XLogRecPtr *first_record)
 				(errmsg("could not read WAL at LSN %X/%X",
 						LSN_FORMAT_ARGS(lsn))));
 
+	private_data = (ReadLocalXLogPageNoWaitPrivate *)
+						palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
+
 	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
 									XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
 											   .segment_open = &wal_segment_open,
 											   .segment_close = &wal_segment_close),
-									NULL);
+									private_data);
 
 	if (xlogreader == NULL)
 		ereport(ERROR,
@@ -132,7 +136,8 @@ InitXLogReaderState(XLogRecPtr lsn, XLogRecPtr *first_record)
  *
  * We guard against ordinary errors trying to read WAL that hasn't been
  * written yet by limiting end_lsn to the flushed WAL, but that can also
- * encounter errors if the flush pointer falls in the middle of a record.
+ * encounter errors if the flush pointer falls in the middle of a record. In
+ * that case we'll return NULL.
  */
 static XLogRecord *
 ReadNextXLogRecord(XLogReaderState *xlogreader, XLogRecPtr first_record)
@@ -144,16 +149,20 @@ ReadNextXLogRecord(XLogReaderState *xlogreader, XLogRecPtr first_record)
 
 	if (record == NULL)
 	{
+		ReadLocalXLogPageNoWaitPrivate *private_data;
+
+		/* return NULL, if end of WAL is reached */
+		private_data = (ReadLocalXLogPageNoWaitPrivate *)
+							xlogreader->private_data;
+
+		if (private_data->end_of_wal)
+			return NULL;
+
 		if (errormsg)
 			ereport(ERROR,
 					(errcode_for_file_access(),
 					 errmsg("could not read WAL at %X/%X: %s",
 							LSN_FORMAT_ARGS(first_record), errormsg)));
-		else
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read WAL at %X/%X",
-							LSN_FORMAT_ARGS(first_record))));
 	}
 
 	return record;
@@ -246,7 +255,11 @@ pg_get_wal_record_info(PG_FUNCTION_ARGS)
 
 	xlogreader = InitXLogReaderState(lsn, &first_record);
 
-	(void) ReadNextXLogRecord(xlogreader, first_record);
+	if (!ReadNextXLogRecord(xlogreader, first_record))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("could not read WAL at %X/%X",
+						LSN_FORMAT_ARGS(first_record))));
 
 	MemSet(values, 0, sizeof(values));
 	MemSet(nulls, 0, sizeof(nulls));
@@ -254,6 +267,7 @@ pg_get_wal_record_info(PG_FUNCTION_ARGS)
 	GetWALRecordInfo(xlogreader, first_record, values, nulls,
 					 PG_GET_WAL_RECORD_INFO_COLS);
 
+	pfree(xlogreader->private_data);
 	XLogReaderFree(xlogreader);
 
 	tuple = heap_form_tuple(tupdesc, values, nulls);
@@ -327,26 +341,19 @@ GetWALRecordsInfo(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
 	MemSet(values, 0, sizeof(values));
 	MemSet(nulls, 0, sizeof(nulls));
 
-	for (;;)
+	while (ReadNextXLogRecord(xlogreader, first_record) &&
+		   xlogreader->EndRecPtr <= end_lsn)
 	{
-		(void) ReadNextXLogRecord(xlogreader, first_record);
+		GetWALRecordInfo(xlogreader, xlogreader->currRecPtr, values, nulls,
+						 PG_GET_WAL_RECORDS_INFO_COLS);
 
-		if (xlogreader->EndRecPtr <= end_lsn)
-		{
-			GetWALRecordInfo(xlogreader, xlogreader->currRecPtr, values, nulls,
-							 PG_GET_WAL_RECORDS_INFO_COLS);
-
-			tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
-								 values, nulls);
-		}
-
-		/* if we read up to end_lsn, we're done */
-		if (xlogreader->EndRecPtr >= end_lsn)
-			break;
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+							 values, nulls);
 
 		CHECK_FOR_INTERRUPTS();
 	}
 
+	pfree(xlogreader->private_data);
 	XLogReaderFree(xlogreader);
 
 #undef PG_GET_WAL_RECORDS_INFO_COLS
@@ -555,20 +562,15 @@ GetWalStats(FunctionCallInfo fcinfo, XLogRecPtr start_lsn,
 
 	MemSet(&stats, 0, sizeof(stats));
 
-	for (;;)
+	while (ReadNextXLogRecord(xlogreader, first_record) &&
+		   xlogreader->EndRecPtr <= end_lsn)
 	{
-		(void) ReadNextXLogRecord(xlogreader, first_record);
-
-		if (xlogreader->EndRecPtr <= end_lsn)
-			XLogRecStoreStats(&stats, xlogreader);
-
-		/* if we read up to end_lsn, we're done */
-		if (xlogreader->EndRecPtr >= end_lsn)
-			break;
+		XLogRecStoreStats(&stats, xlogreader);
 
 		CHECK_FOR_INTERRUPTS();
 	}
 
+	pfree(xlogreader->private_data);
 	XLogReaderFree(xlogreader);
 
 	MemSet(values, 0, sizeof(values));
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 425702641a..1c52b128fc 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -957,7 +957,19 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr,
 
 			/* If asked, let's not wait for future WAL. */
 			if (!wait_for_wal)
+			{
+				ReadLocalXLogPageNoWaitPrivate *private_data;
+
+				/*
+				 * Inform the caller of read_local_xlog_page_no_wait that the
+				 * end of WAL has reached so that it can deal wiht it
+				 * accordingly.
+				 */
+				private_data = (ReadLocalXLogPageNoWaitPrivate *)
+										state->private_data;
+				private_data->end_of_wal = true;
 				break;
+			}
 
 			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 761625acf4..5fcbbc136f 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -75,6 +75,12 @@ typedef enum
 								 * need to be replayed) */
 } XLogRedoAction;
 
+/* Private data of the read_local_xlog_page_no_wait callback. */
+typedef struct ReadLocalXLogPageNoWaitPrivate
+{
+	bool end_of_wal;	/* true, when end of WAL is reached */
+} ReadLocalXLogPageNoWaitPrivate;
+
 extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,
 											uint8 buffer_id, Buffer *buf);
 extern Buffer XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id);
-- 
2.25.1

