I made one final pass over the whole thing to be sure it's all commented
as thoroughly as possible, and changed the names of things to make them
all consistent.  So here's the last version which I intend to push to
all branches soon.  (The only difference in back-branches is that
the xlogreader struct needs to be adjusted to have the new fields at the
bottom.)

One thing to note is that this is an un-downgradeable minor; and of
course people should upgrade standbys before primaries.

On 2021-Sep-17, Alvaro Herrera wrote:

> On 2021-Sep-17, Bossart, Nathan wrote:

> > I see.  IMO feels a bit counterintuitive to validate a partial record
> > that you are ignoring anyway, but I suppose it's still valuable to
> > know when the WAL is badly broken.  It's not expensive, and it doesn't
> > add a ton of complexity, either.
> 
> Yeah, we don't have any WAL record history validation other than the
> verifying the LSN of the previous record; I suppose in this particular
> case you could argue that we shouldn't bother with any validation
> either.  But it seems safer to do it.  It doesn't hurt anything anyway.

BTW we do validate the CRC for all records, but we don't have any means
to validate the CRC of a partial record; so in theory if we don't add
CRC validation of the ignored contents, there would be no validation at
all for that record.  I don't think we care, but it's true that there
would be a big blob in WAL that we don't really know anything about.

-- 
Álvaro Herrera              Valdivia, Chile  —  https://www.EnterpriseDB.com/
Thou shalt study thy libraries and strive not to reinvent them without
cause, that thy code may be short and readable and thy days pleasant
and productive. (7th Commandment for C Programmers)
>From 8893e2073365c88449a118bd340a78241afe8f97 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 21 Sep 2021 12:43:15 -0300
Subject: [PATCH v8 1/3] Document XLOG_INCLUDE_XID a little better
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

I noticed that commit 0bead9af484c left this flag undocumented in
XLogSetRecordFlags, which led me to discover that the flag doesn't
actually do what the one comment on it said it does.  Improve the
situation by adding some more comments.

Backpatch to 14, where the aforementioned commit appears.

Author: Álvaro Herrera <alvhe...@alvh.no-ip.org>
---
 src/backend/access/transam/xloginsert.c | 2 ++
 src/include/access/xlog.h               | 2 +-
 src/include/access/xlogrecord.h         | 5 +++--
 3 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index e596a0470a..e329ae5c2a 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -409,6 +409,8 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
  *	 durability, which allows to avoid triggering WAL archiving and other
  *	 background activity.
+ * - XLOG_INCLUDE_XID, a message-passing hack between XLogRecordAssemble
+ *   and XLogResetInsertion.
  */
 void
 XLogSetRecordFlags(uint8 flags)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0a8ede700d..5e2c94a05f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -212,7 +212,7 @@ extern bool XLOG_DEBUG;
  */
 #define XLOG_INCLUDE_ORIGIN		0x01	/* include the replication origin */
 #define XLOG_MARK_UNIMPORTANT	0x02	/* record not important for durability */
-#define XLOG_INCLUDE_XID		0x04	/* include XID of top-level xact */
+#define XLOG_INCLUDE_XID		0x04	/* WAL-internal message-passing hack */
 
 
 /* Checkpoint statistics */
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index e06ee92a5e..8d1305eae8 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -215,8 +215,9 @@ typedef struct XLogRecordDataHeaderLong
  * Block IDs used to distinguish different kinds of record fragments. Block
  * references are numbered from 0 to XLR_MAX_BLOCK_ID. A rmgr is free to use
  * any ID number in that range (although you should stick to small numbers,
- * because the WAL machinery is optimized for that case). A couple of ID
- * numbers are reserved to denote the "main" data portion of the record.
+ * because the WAL machinery is optimized for that case). A few ID
+ * numbers are reserved to denote the "main" data portion of the record,
+ * as well as replication-supporting transaction metadata.
  *
  * The maximum is currently set at 32, quite arbitrarily. Most records only
  * need a handful of block references, but there are a few exceptions that
-- 
2.20.1

>From 2daa93570c801f21f5abdacc6d8f77bd62bf5d53 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 2 Sep 2021 17:21:46 -0400
Subject: [PATCH v8 2/3] Implement FIRST_IS_ABORTED_CONTRECORD

---
 src/backend/access/rmgrdesc/xlogdesc.c  |  12 ++
 src/backend/access/transam/xlog.c       | 153 +++++++++++++++++++++++-
 src/backend/access/transam/xlogreader.c |  40 ++++++-
 src/include/access/xlog_internal.h      |  11 +-
 src/include/access/xlogreader.h         |   8 ++
 src/include/catalog/pg_control.h        |   1 +
 6 files changed, 218 insertions(+), 7 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index e6090a9dad..5bf2346dd9 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -139,6 +139,15 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
 						 timestamptz_to_str(xlrec.end_time));
 	}
+	else if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		xl_overwrite_contrecord xlrec;
+
+		memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord));
+		appendStringInfo(buf, "lsn %X/%X; time %s",
+						 LSN_FORMAT_ARGS(xlrec.overwritten_lsn),
+						 timestamptz_to_str(xlrec.overwrite_time));
+	}
 }
 
 const char *
@@ -178,6 +187,9 @@ xlog_identify(uint8 info)
 		case XLOG_END_OF_RECOVERY:
 			id = "END_OF_RECOVERY";
 			break;
+		case XLOG_OVERWRITE_CONTRECORD:
+			id = "OVERWRITE_CONTRECORD";
+			break;
 		case XLOG_FPI:
 			id = "FPI";
 			break;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749d..f5b7090bd8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -199,6 +199,15 @@ static XLogRecPtr LastRec;
 static XLogRecPtr flushedUpto = 0;
 static TimeLineID receiveTLI = 0;
 
+/*
+ * abortedRecPtr is the start pointer of a broken record at end of WAL when
+ * recovery completes; missingContrecPtr is the location of the first
+ * contrecord that went missing.  See CreateOverwriteContrecordRecord for
+ * details.
+ */
+static XLogRecPtr abortedRecPtr;
+static XLogRecPtr missingContrecPtr;
+
 /*
  * During recovery, lastFullPageWrites keeps track of full_page_writes that
  * the replayed WAL records indicate. It's initialized with full_page_writes
@@ -892,8 +901,11 @@ static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
 static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 								TimeLineID prevTLI);
+static void VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec,
+									  XLogReaderState *state);
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
+static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
@@ -2246,6 +2258,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		if (!Insert->forcePageWrites)
 			NewPage->xlp_info |= XLP_BKP_REMOVABLE;
 
+		/*
+		 * If a record was found to be broken at the end of recovery, and
+		 * we're going to write on the page where its first contrecord was
+		 * lost, set the XLP_FIRST_IS_OVERWRITE_CONTRECORD flag on the page
+		 * header.  See CreateOverwriteContrecordRecord().
+		 */
+		if (missingContrecPtr == NewPageBeginPtr)
+		{
+			NewPage->xlp_info |= XLP_FIRST_IS_OVERWRITE_CONTRECORD;
+			missingContrecPtr = InvalidXLogRecPtr;
+		}
+
 		/*
 		 * If first page of an XLOG segment file, make it a long header.
 		 */
@@ -4394,6 +4418,17 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
 		{
+			/*
+			 * When WAL ends in an incomplete record, keep track of it.  After
+			 * recovery is done, we'll write a record to indicate downstream
+			 * WAL readers that that portion is to be ignored.
+			 */
+			if (!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+			{
+				abortedRecPtr = xlogreader->abortedRecPtr;
+				missingContrecPtr = xlogreader->missingContrecPtr;
+			}
+
 			if (readFile >= 0)
 			{
 				close(readFile);
@@ -7069,6 +7104,12 @@ StartupXLOG(void)
 		InRecovery = true;
 	}
 
+	/*
+	 * Start recovery assuming that the final record isn't lost.
+	 */
+	abortedRecPtr = InvalidXLogRecPtr;
+	missingContrecPtr = InvalidXLogRecPtr;
+
 	/* REDO */
 	if (InRecovery)
 	{
@@ -7655,8 +7696,9 @@ StartupXLOG(void)
 
 	/*
 	 * Kill WAL receiver, if it's still running, before we continue to write
-	 * the startup checkpoint record. It will trump over the checkpoint and
-	 * subsequent records if it's still alive when we start writing WAL.
+	 * the startup checkpoint and aborted-contrecord records. It will trump
+	 * over these records and subsequent ones if it's still alive when we
+	 * start writing WAL.
 	 */
 	XLogShutdownWalRcv();
 
@@ -7689,8 +7731,12 @@ StartupXLOG(void)
 	StandbyMode = false;
 
 	/*
-	 * Re-fetch the last valid or last applied record, so we can identify the
-	 * exact endpoint of what we consider the valid portion of WAL.
+	 * Determine where to start writing WAL next.
+	 *
+	 * When recovery ended in an incomplete record, write a WAL record about
+	 * that and continue after it.  In all other cases, re-fetch the last
+	 * valid or last applied record, so we can identify the exact endpoint of
+	 * what we consider the valid portion of WAL.
 	 */
 	XLogBeginRead(xlogreader, LastRec);
 	record = ReadRecord(xlogreader, PANIC, false);
@@ -7821,6 +7867,18 @@ StartupXLOG(void)
 	XLogCtl->ThisTimeLineID = ThisTimeLineID;
 	XLogCtl->PrevTimeLineID = PrevTimeLineID;
 
+	/*
+	 * Actually, if WAL ended in an incomplete record, skip the parts that
+	 * made it through and start writing after the portion that persisted.
+	 * (It's critical to first write an OVERWRITE_CONTRECORD message, which
+	 * we'll do as soon as we're open for writing new WAL.)
+	 */
+	if (!XLogRecPtrIsInvalid(missingContrecPtr))
+	{
+		Assert(!XLogRecPtrIsInvalid(abortedRecPtr));
+		EndOfLog = missingContrecPtr;
+	}
+
 	/*
 	 * Prepare to write WAL starting at EndOfLog location, and init xlog
 	 * buffer cache using the block containing the last record from the
@@ -7873,13 +7931,23 @@ StartupXLOG(void)
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
+	LocalSetXLogInsertAllowed();
+
+	/* If necessary, write overwrite-contrecord before doing anything else */
+	if (!XLogRecPtrIsInvalid(abortedRecPtr))
+	{
+		Assert(!XLogRecPtrIsInvalid(missingContrecPtr));
+		CreateOverwriteContrecordRecord(abortedRecPtr);
+		abortedRecPtr = InvalidXLogRecPtr;
+		missingContrecPtr = InvalidXLogRecPtr;
+	}
+
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
 	 * record before resource manager writes cleanup WAL records or checkpoint
 	 * record is written.
 	 */
 	Insert->fullPageWrites = lastFullPageWrites;
-	LocalSetXLogInsertAllowed();
 	UpdateFullPageWrites();
 	LocalXLogInsertAllowed = -1;
 
@@ -9365,6 +9433,54 @@ CreateEndOfRecoveryRecord(void)
 	LocalXLogInsertAllowed = -1;	/* return to "check" state */
 }
 
+/*
+ * Write an OVERWRITE_CONTRECORD message.
+ *
+ * When on WAL replay we expect a continuation record at the start of a page
+ * that is not there, recovery ends and WAL writing resumes at that point.
+ * But it's wrong to resume writing new WAL back at the start of the record
+ * that was broken, because downstream consumers of that WAL (physical
+ * replicas) are not prepared to "rewind".  So the first action after finishing
+ * replay of all valid WAL is to write a record of this type at the point where
+ * the contrecord was missing.  This record type, which also carries a special
+ * flag XLP_FIRST_IS_OVERWRITE_CONTRECORD, is used to let downstream WAL
+ * readers that the incomplete record is to be skipped, and that WAL reading is
+ * to be resumed at this position.  On "replay" of this record, we verify that
+ * the record being skipped is the one that was being read.
+ *
+ * XLP_FIRST_IS_OVERWRITE_CONTRECORD instructs xlogreader to set abortedRecPtr
+ * to the start LSN of the record we were reading, and missingContrecPtr to the
+ * LSN of the contrecord that went missing.  These are used to inform
+ * StartupXLOG about contents of this record and the location where to resume
+ * writing WAL (i.e., where to create this record), respectively.
+ */
+static XLogRecPtr
+CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
+{
+	xl_overwrite_contrecord xlrec;
+	XLogRecPtr	recptr;
+
+	/* sanity check */
+	if (!RecoveryInProgress())
+		elog(ERROR, "can only be used at end of recovery");
+
+	xlrec.overwritten_lsn = aborted_lsn;
+	xlrec.overwrite_time = GetCurrentTimestamp();
+
+	START_CRIT_SECTION();
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord));
+
+	recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD);
+
+	XLogFlush(recptr);
+
+	END_CRIT_SECTION();
+
+	return recptr;
+}
+
 /*
  * Flush all data in shared memory to disk, and fsync
  *
@@ -10295,6 +10411,13 @@ xlog_redo(XLogReaderState *record)
 
 		RecoveryRestartPoint(&checkPoint);
 	}
+	else if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		xl_overwrite_contrecord xlrec;
+
+		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_overwrite_contrecord));
+		VerifyOverwriteContrecord(&xlrec, record);
+	}
 	else if (info == XLOG_END_OF_RECOVERY)
 	{
 		xl_end_of_recovery xlrec;
@@ -10462,6 +10585,26 @@ xlog_redo(XLogReaderState *record)
 	}
 }
 
+/*
+ * Verify the payload of a XLOG_OVERWRITE_CONTRECORD record.
+ */
+static void
+VerifyOverwriteContrecord(xl_overwrite_contrecord *xlrec, XLogReaderState *state)
+{
+	if (xlrec->overwritten_lsn != state->overwrittenRecPtr)
+		elog(FATAL, "mismatching overwritten LSN %X/%X -> %X/%X",
+			 LSN_FORMAT_ARGS(xlrec->overwritten_lsn),
+			 LSN_FORMAT_ARGS(state->overwrittenRecPtr));
+
+	ereport(LOG,
+			(errmsg("sucessfully skipped missing contrecord at %X/%X, overwritten at %s",
+					LSN_FORMAT_ARGS(xlrec->overwritten_lsn),
+					timestamptz_to_str(xlrec->overwrite_time))));
+
+	/* Verifying the record should only happen once */
+	state->overwrittenRecPtr = InvalidXLogRecPtr;
+}
+
 #ifdef WAL_DEBUG
 
 static void
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5cf74e181a..0963837925 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -278,6 +278,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 				total_len;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
+	bool		assembled;
 	bool		gotheader;
 	int			readOff;
 
@@ -293,6 +294,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	state->errormsg_buf[0] = '\0';
 
 	ResetDecoder(state);
+	state->abortedRecPtr = InvalidXLogRecPtr;
+	state->missingContrecPtr = InvalidXLogRecPtr;
 
 	RecPtr = state->EndRecPtr;
 
@@ -319,7 +322,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		randAccess = true;
 	}
 
+restart:
 	state->currRecPtr = RecPtr;
+	assembled = false;
 
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
@@ -415,6 +420,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		char	   *buffer;
 		uint32		gotlen;
 
+		assembled = true;
+
 		/*
 		 * Enlarge readRecordBuf as needed.
 		 */
@@ -448,8 +455,25 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 
 			Assert(SizeOfXLogShortPHD <= readOff);
 
-			/* Check that the continuation on next page looks valid */
 			pageHeader = (XLogPageHeader) state->readBuf;
+
+			/*
+			 * If we were expecting a continuation record and got an
+			 * "overwrite contrecord" flag, that means the continuation record
+			 * was overwritten with a different record.  Restart the read by
+			 * assuming the address to read is the location where we found this
+			 * flag; but keep track of the LSN of the record we were reading,
+			 * for later verification.
+			 */
+			if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
+			{
+				state->overwrittenRecPtr = state->currRecPtr;
+				ResetDecoder(state);
+				RecPtr = targetPagePtr;
+				goto restart;
+			}
+
+			/* Check that the continuation on next page looks valid */
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
@@ -551,6 +575,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		return NULL;
 
 err:
+	if (assembled)
+	{
+		/*
+		 * We get here when a record that spans multiple pages needs to be
+		 * assembled, but something went wrong -- perhaps a contrecord piece
+		 * was lost.  If caller is WAL replay, it will know where the aborted
+		 * record was and where to direct followup WAL to be written, marking
+		 * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
+		 * in turn signal downstream WAL consumers that the broken WAL record
+		 * is to be ignored.
+		 */
+		state->abortedRecPtr = RecPtr;
+		state->missingContrecPtr = targetPagePtr;
+	}
 
 	/*
 	 * Invalidate the read state. We might read from a different source after
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 3b5eceff65..406562780b 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -76,8 +76,10 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define XLP_LONG_HEADER				0x0002
 /* This flag indicates backup blocks starting in this page are optional */
 #define XLP_BKP_REMOVABLE			0x0004
+/* Replaces a missing contrecord; see CreateOverwriteContrecordRecord */
+#define XLP_FIRST_IS_OVERWRITE_CONTRECORD 0x0008
 /* All defined flag bits in xlp_info (used for validity checking of header) */
-#define XLP_ALL_FLAGS				0x0007
+#define XLP_ALL_FLAGS				0x000F
 
 #define XLogPageHeaderSize(hdr)		\
 	(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
@@ -249,6 +251,13 @@ typedef struct xl_restore_point
 	char		rp_name[MAXFNAMELEN];
 } xl_restore_point;
 
+/* Overwrite of prior contrecord */
+typedef struct xl_overwrite_contrecord
+{
+	XLogRecPtr	overwritten_lsn;
+	TimestampTz	overwrite_time;
+} xl_overwrite_contrecord;
+
 /* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
 typedef struct xl_end_of_recovery
 {
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 21d200d3df..98624bdc37 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -175,6 +175,14 @@ struct XLogReaderState
 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
+	/*
+	 * The start point of a partial record at the end of WAL, and the start
+	 * location of its first contrecord that went missing.
+	 */
+	XLogRecPtr	abortedRecPtr;
+	XLogRecPtr	missingContrecPtr;
+	/* Set when XLP_FIRST_IS_OVERWRITE_CONTRECORD is found */
+	XLogRecPtr	overwrittenRecPtr;
 
 	/* ----------------------------------------
 	 * Decoded representation of current record
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index e3f48158ce..179efa9f4a 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -76,6 +76,7 @@ typedef struct CheckPoint
 #define XLOG_END_OF_RECOVERY			0x90
 #define XLOG_FPI_FOR_HINT				0xA0
 #define XLOG_FPI						0xB0
+#define XLOG_OVERWRITE_CONTRECORD		0xC0
 
 
 /*
-- 
2.20.1

>From b49511492818167eee5cdfbd9ba127d7e771edee Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 15 Sep 2021 13:10:04 -0300
Subject: [PATCH v8 3/3] test code

---
 src/test/recovery/t/026_aborted_contrecord.pl | 182 ++++++++++++++++++
 src/test/recovery/t/idiosyncratic_copy        |  20 ++
 2 files changed, 202 insertions(+)
 create mode 100644 src/test/recovery/t/026_aborted_contrecord.pl
 create mode 100755 src/test/recovery/t/idiosyncratic_copy

diff --git a/src/test/recovery/t/026_aborted_contrecord.pl b/src/test/recovery/t/026_aborted_contrecord.pl
new file mode 100644
index 0000000000..e1fc9cb38f
--- /dev/null
+++ b/src/test/recovery/t/026_aborted_contrecord.pl
@@ -0,0 +1,182 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for already-propagated WAL segments ending in incomplete WAL records.
+
+use strict;
+use warnings;
+
+use FindBin;
+use PostgresNode;
+use TestLib;
+use Test::More;
+
+plan tests => 5;
+
+# Test: Create a physical replica that's missing the last WAL file,
+# then restart the primary to create a divergent WAL file and observe
+# that the replica replays the "overwrite contrecord" from that new
+# file.
+
+my $node = PostgresNode->new('primary');
+$node->init(allows_streaming => 1);
+$node->append_conf('postgresql.conf', 'wal_keep_size=1GB');
+$node->start;
+
+$node->safe_psql('postgres', 'create table filler (a int)');
+# First, measure how many bytes does the insertion of 1000 rows produce
+my $start_lsn = $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+$node->safe_psql('postgres', 'insert into filler select * from generate_series(1, 1000)');
+my $end_lsn = $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+my $rows_walsize = $end_lsn - $start_lsn;
+
+# Now consume all remaining room in the current WAL segment, leaving
+# space enough only for the start of a largish record.
+$node->safe_psql('postgres', qq{
+WITH setting AS (
+  SELECT setting::int AS wal_segsize
+    FROM pg_settings WHERE name = 'wal_segment_size'
+)
+INSERT INTO filler
+SELECT g FROM setting,
+  generate_series(1, 1000 * (wal_segsize - ((pg_current_wal_insert_lsn() - '0/0') % wal_segsize)) / $rows_walsize) g
+});
+
+my $initfile = $node->safe_psql('postgres', 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
+$node->safe_psql('postgres', qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))});
+#$node->safe_psql('postgres', qq{create table foo ()});
+my $endfile = $node->safe_psql('postgres', 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
+ok($initfile != $endfile, "$initfile differs from $endfile");
+
+# Now stop abruptly, to avoid a stop checkpoint.  We can remove the tail file
+# afterwards, and on startup the large message should be overwritten with new
+# contents
+$node->stop('immediate');
+
+unlink $node->basedir . "/pgdata/pg_wal/$endfile" or die "could not unlink ".$node->basedir."/pgdata/pg_wal/$endfile: $!";
+
+# OK, create a standby at this spot.
+$node->backup_fs_cold('backup');
+my $node_standby = PostgresNode->new('standby');
+$node_standby->init_from_backup($node, 'backup', has_streaming => 1);
+
+$node_standby->start;
+$node->start;
+
+$node->safe_psql('postgres', qq{create table foo (a text); insert into foo values ('hello')});
+$node->safe_psql('postgres', qq{SELECT pg_logical_emit_message(true, 'test 026', 'AABBCC')});
+
+my $until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $caughtup_query = "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+$node_standby->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for standby to catch up";
+
+ok($node_standby->safe_psql('postgres', 'select * from foo') eq
+	'hello', 'standby replays past overwritten contrecord');
+
+# Verify message appears in standby's log
+my $log = slurp_file($node_standby->logfile);
+like($log, qr[sucessfully skipped missing contrecord at], "found log line in standby");
+
+$node->stop;
+$node_standby->stop;
+
+
+# Second test: a standby that receives WAL via archive/restore commands;
+# use idiosyncratic_copy as archive_command, to let us block archiving an
+# intermediate WAL file.
+$node = PostgresNode->new('primary2');
+$node->init(has_archiving => 1,
+	extra => ['--wal-segsize=1']);
+
+# Note: consistent use of forward slashes here avoids any escaping problems
+# that arise from use of backslashes. That means we need to double-quote all
+# the paths in the archive_command
+my $perlbin = TestLib::perl2host($^X);
+$perlbin =~ s!\\!/!g if $TestLib::windows_os;
+my $archivedir = $node->archive_dir;
+$archivedir =~ s!\\!/!g if $TestLib::windows_os;
+$node->append_conf('postgresql.conf',
+	qq{
+archive_command = '"$perlbin" "$FindBin::RealBin/idiosyncratic_copy" "%p" "$archivedir/%f"'
+wal_keep_size = 1GB
+max_wal_senders = 2
+wal_level = replica
+});
+# Make sure that Msys perl doesn't complain about difficulty in setting locale
+# when called from the archive_command.
+local $ENV{PERL_BADLANG} = 0;
+$node->start;
+$node->backup('backup');
+
+$node_standby = PostgresNode->new('standby2');
+$node_standby->init_from_backup($node, 'backup', has_restoring => 1);
+
+$node_standby->start;
+
+$node->safe_psql('postgres', 'create table filler (a int)');
+# First, measure how many bytes does the insertion of 1000 rows produce
+$start_lsn = $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+$node->safe_psql('postgres', 'insert into filler select * from generate_series(1, 1000)');
+$end_lsn = $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+$rows_walsize = $end_lsn - $start_lsn;
+
+# Now consume all remaining room in the current WAL segment, leaving
+# space enough only for the start of a largish record.
+$node->safe_psql('postgres', qq{
+WITH setting AS (
+  SELECT setting::int AS wal_segsize
+    FROM pg_settings WHERE name = 'wal_segment_size'
+)
+INSERT INTO filler
+SELECT g FROM setting,
+  generate_series(1, 1000 * (wal_segsize - ((pg_current_wal_insert_lsn() - '0/0') % wal_segsize)) / $rows_walsize) g
+});
+
+# Now block idiosyncratic_copy from creating the next WAL in the replica
+my $archivedgood = $node->safe_psql('postgres',
+	q{SELECT pg_walfile_name(pg_current_wal_insert_lsn())});
+my $archivedfail = $node->safe_psql('postgres',
+	q{SELECT pg_walfile_name(pg_current_wal_insert_lsn() + setting::integer)
+	from pg_settings where name = 'wal_segment_size'});
+open my $filefail, ">", "$archivedir/$archivedfail.fail"
+	or die "can't open $archivedir/$archivedfail.fail: $!";
+
+my $currlsn = $node->safe_psql('postgres', 'select pg_current_wal_insert_lsn() - 1000');
+
+# Now produce a large WAL record in a transaction that we leave open
+my ($in, $out);
+my $timer = IPC::Run::timeout(180);
+my $h = $node->background_psql('postgres', \$in, \$out, $timer,
+	on_error_stop => 0);
+
+$in .= qq{BEGIN;
+SELECT pg_logical_emit_message(true, 'test 026', repeat('somenoise', 8192));
+};
+$h->pump_nb;
+$node->poll_query_until('postgres',
+	"SELECT last_archived_wal FROM pg_stat_archiver",
+	$archivedgood),
+	or die "Timed out while waiting for segment $archivedgood to be archived";
+
+# Now crash the node with the transaction open
+$node->stop('immediate');
+#$h->finish();
+$node->start;
+$node->safe_psql('postgres', 'create table witness (a int);');
+$node->safe_psql('postgres', 'insert into witness values (42)');
+unlink "$archivedir/$archivedfail.fail" or die "can't unlink $archivedir/$archivedfail.fail: $!";
+$node->safe_psql('postgres', 'select pg_switch_wal()');
+
+$until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+$caughtup_query = "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+$node_standby->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for standby to catch up";
+
+my $answer = $node_standby->safe_psql('postgres', 'select * from witness');
+is($answer, '42', 'witness tuple appears');
+
+# Verify message appears in standby's log
+$log = slurp_file($node_standby->logfile);
+like($log, qr[sucessfully skipped missing contrecord at], "found log line in standby");
+$node->stop;
+$node_standby->stop;
diff --git a/src/test/recovery/t/idiosyncratic_copy b/src/test/recovery/t/idiosyncratic_copy
new file mode 100755
index 0000000000..f05b1cb943
--- /dev/null
+++ b/src/test/recovery/t/idiosyncratic_copy
@@ -0,0 +1,20 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use File::Copy;
+
+die "wrong number of arguments" if @ARGV != 2;
+my ($source, $target) = @ARGV;
+if ($^O eq 'msys')
+{
+    # make a windows path look like an msys path if necessary
+    $source =~ s!^([A-Za-z]):!'/' . lc($1)!e;
+    $source =~ s!\\!/!g;
+}
+
+die "$0: failed copy of $target" if -f "$target.fail";
+
+copy($source, $target) or die "couldn't copy $source to $target: $!";
+print STDERR "$0: archived $source to $target successfully\n";
-- 
2.20.1

Reply via email to