OK, this version is much more palatable, because here we verify that the
OVERWRITE_CONTRECORD we replay matches the record that was lost.  Also,
I wrote a test script that creates such a broken record (by the simple
expedient of deleting the WAL file containing the second half while the
server is down); we then create a standby and we can observe that it
replays the sequence correctly.

If you have some time to try your reproducers with this new proposed
fix, I would appreciate it.


Added Matsumura-san to CC, because he was interested in this topic too
per the earlier thread.

-- 
Álvaro Herrera         PostgreSQL Developer  —  https://www.EnterpriseDB.com/
>From 0404d47ca1456771a1be4c8afb2c1bd1e0fa0f08 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 2 Sep 2021 17:21:46 -0400
Subject: [PATCH v7 1/2] Implement FIRST_IS_ABORTED_CONTRECORD

---
 src/backend/access/rmgrdesc/xlogdesc.c  |  11 ++
 src/backend/access/transam/xlog.c       | 153 ++++++++++++++++++++++--
 src/backend/access/transam/xloginsert.c |   1 +
 src/backend/access/transam/xlogreader.c |  54 ++++++++-
 src/include/access/xlog_internal.h      |  20 +++-
 src/include/access/xlogreader.h         |  14 +++
 src/include/catalog/pg_control.h        |   1 +
 7 files changed, 244 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index e6090a9dad..0be382f8a5 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -139,6 +139,14 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
 						 timestamptz_to_str(xlrec.end_time));
 	}
+	else if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		xl_overwrite_contrecord xlrec;
+
+		memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord));
+		appendStringInfo(buf, "lsn %X/%X",
+						 LSN_FORMAT_ARGS(xlrec.overwritten_lsn));
+	}
 }
 
 const char *
@@ -178,6 +186,9 @@ xlog_identify(uint8 info)
 		case XLOG_END_OF_RECOVERY:
 			id = "END_OF_RECOVERY";
 			break;
+		case XLOG_OVERWRITE_CONTRECORD:
+			id = "OVERWRITE_CONTRECORD";
+			break;
 		case XLOG_FPI:
 			id = "FPI";
 			break;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749d..b6af3eb258 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -199,6 +199,15 @@ static XLogRecPtr LastRec;
 static XLogRecPtr flushedUpto = 0;
 static TimeLineID receiveTLI = 0;
 
+/*
+ * abortedRecPtr is the start pointer of a broken record at end of WAL when
+ * recovery completes; missingContrecPtr is the location of the first
+ * contrecord that went missing.  See CreateOverwriteContrecordRecord for
+ * details.
+ */
+static XLogRecPtr abortedRecPtr;
+static XLogRecPtr missingContrecPtr;
+
 /*
  * During recovery, lastFullPageWrites keeps track of full_page_writes that
  * the replayed WAL records indicate. It's initialized with full_page_writes
@@ -894,6 +903,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 								TimeLineID prevTLI);
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
+static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
@@ -927,6 +937,7 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader,
 static void CheckRecoveryConsistency(void);
 static XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader,
 										XLogRecPtr RecPtr, int whichChkpt, bool report);
+static void VerifyOverwrittenRecord(XLogReaderState *record);
 static bool rescanLatestTimeLine(void);
 static void InitControlFile(uint64 sysidentifier);
 static void WriteControlFile(void);
@@ -1120,8 +1131,8 @@ XLogInsertRecord(XLogRecData *rdata,
 		 * All the record data, including the header, is now ready to be
 		 * inserted. Copy the record in the space reserved.
 		 */
-		CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
-							StartPos, EndPos);
+		CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch,
+							rdata, StartPos, EndPos);
 
 		/*
 		 * Unless record is flagged as not important, update LSN of last
@@ -1504,7 +1515,8 @@ checkXLogConsistency(XLogReaderState *record)
  * area in the WAL.
  */
 static void
-CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+					XLogRecData *rdata,
 					XLogRecPtr StartPos, XLogRecPtr EndPos)
 {
 	char	   *currpos;
@@ -2246,6 +2258,19 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		if (!Insert->forcePageWrites)
 			NewPage->xlp_info |= XLP_BKP_REMOVABLE;
 
+		/*
+		 * If a record was found to be broken at the end of recovery, and
+		 * we're going to write on the page where its first contrecord was
+		 * lost, set the XLP_FIRST_IS_ABORTED_PARTIAL flag on the page header.
+		 * This instructs readers to skip the broken record that precedes
+		 * this.
+		 */
+		if (missingContrecPtr == NewPageBeginPtr)
+		{
+			NewPage->xlp_info |= XLP_FIRST_IS_ABORTED_PARTIAL;
+			missingContrecPtr = InvalidXLogRecPtr;
+		}
+
 		/*
 		 * If first page of an XLOG segment file, make it a long header.
 		 */
@@ -4394,6 +4419,17 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
 		{
+			/*
+			 * When WAL ends in an incomplete record, keep track of it.  After
+			 * recovery is done, we'll write a record to indicate downstream
+			 * WAL readers that that portion is to be ignored.
+			 */
+			if (!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+			{
+				abortedRecPtr = xlogreader->abortedRecPtr;
+				missingContrecPtr = xlogreader->missingContrecPtr;
+			}
+
 			if (readFile >= 0)
 			{
 				close(readFile);
@@ -4516,6 +4552,36 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 	}
 }
 
+/*
+ * When we find an XLOG_OVERWRITE_CONTRECORD record, verify that it matches
+ * what xlogreader says it had read prior to throwing it all away.
+ */
+static void
+VerifyOverwrittenRecord(XLogReaderState *record)
+{
+	xl_overwrite_contrecord xlrec;
+
+	if (XLogRecGetRmid(record) != RM_XLOG_ID)
+		elog(FATAL, "not XLOG rmgr");
+
+	if ((XLogRecGetInfo(record) & ~XLR_INFO_MASK) != XLOG_OVERWRITE_CONTRECORD)
+		elog(FATAL, "not OVERWRITE_CONTRECORD");
+
+	memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_overwrite_contrecord));
+
+	if (xlrec.overwritten_lsn != record->overwrittenRecPtr)
+		elog(FATAL, "mismatching overwritten LSN %X/%X -> %X/%X",
+			 LSN_FORMAT_ARGS(xlrec.overwritten_lsn),
+			 LSN_FORMAT_ARGS(record->overwrittenRecPtr));
+
+	ereport(LOG,
+			(errmsg("sucessfully skipped missing contrecord at %X/%X",
+					LSN_FORMAT_ARGS(xlrec.overwritten_lsn))));
+
+	/* Verifying the record should only happen once */
+	record->overwrittenRecPtr = InvalidXLogRecPtr;
+}
+
 /*
  * Scan for new timelines that might have appeared in the archive since we
  * started recovery.
@@ -7069,6 +7135,12 @@ StartupXLOG(void)
 		InRecovery = true;
 	}
 
+	/*
+	 * Start recovery assuming that the final record isn't lost.
+	 */
+	abortedRecPtr = InvalidXLogRecPtr;
+	missingContrecPtr = InvalidXLogRecPtr;
+
 	/* REDO */
 	if (InRecovery)
 	{
@@ -7655,8 +7727,9 @@ StartupXLOG(void)
 
 	/*
 	 * Kill WAL receiver, if it's still running, before we continue to write
-	 * the startup checkpoint record. It will trump over the checkpoint and
-	 * subsequent records if it's still alive when we start writing WAL.
+	 * the startup checkpoint and aborted-contrecord records. It will trump
+	 * over these records and subsequent ones if it's still alive when we
+	 * start writing WAL.
 	 */
 	XLogShutdownWalRcv();
 
@@ -7689,8 +7762,12 @@ StartupXLOG(void)
 	StandbyMode = false;
 
 	/*
-	 * Re-fetch the last valid or last applied record, so we can identify the
-	 * exact endpoint of what we consider the valid portion of WAL.
+	 * Determine where to start writing WAL next.
+	 *
+	 * When recovery ended in an incomplete record, write a WAL record about
+	 * that and continue after it.  In all other cases, re-fetch the last
+	 * valid or last applied record, so we can identify the exact endpoint of
+	 * what we consider the valid portion of WAL.
 	 */
 	XLogBeginRead(xlogreader, LastRec);
 	record = ReadRecord(xlogreader, PANIC, false);
@@ -7821,6 +7898,18 @@ StartupXLOG(void)
 	XLogCtl->ThisTimeLineID = ThisTimeLineID;
 	XLogCtl->PrevTimeLineID = PrevTimeLineID;
 
+	/*
+	 * Actually, if WAL ended in an incomplete record, skip the parts that
+	 * made it through and start writing after the portion that persisted.
+	 * (It's critical to first write an OVERWRITE_CONTRECORD message, which
+	 * we'll do as soon as we're open for writing new WAL.)
+	 */
+	if (!XLogRecPtrIsInvalid(missingContrecPtr))
+	{
+		Assert(!XLogRecPtrIsInvalid(abortedRecPtr));
+		EndOfLog = missingContrecPtr;
+	}
+
 	/*
 	 * Prepare to write WAL starting at EndOfLog location, and init xlog
 	 * buffer cache using the block containing the last record from the
@@ -7873,13 +7962,23 @@ StartupXLOG(void)
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
+	LocalSetXLogInsertAllowed();
+
+	/* If necessary, write overwrite-contrecord before doing anything else */
+	if (!XLogRecPtrIsInvalid(abortedRecPtr))
+	{
+		Assert(!XLogRecPtrIsInvalid(missingContrecPtr));
+		CreateOverwriteContrecordRecord(abortedRecPtr);
+		abortedRecPtr = InvalidXLogRecPtr;
+		missingContrecPtr = InvalidXLogRecPtr;
+	}
+
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
 	 * record before resource manager writes cleanup WAL records or checkpoint
 	 * record is written.
 	 */
 	Insert->fullPageWrites = lastFullPageWrites;
-	LocalSetXLogInsertAllowed();
 	UpdateFullPageWrites();
 	LocalXLogInsertAllowed = -1;
 
@@ -9365,6 +9464,40 @@ CreateEndOfRecoveryRecord(void)
 	LocalXLogInsertAllowed = -1;	/* return to "check" state */
 }
 
+/*
+ * Write an OVERWRITE_CONTRECORD message, which is used after recovery is
+ * complete to mark the spot where a partial message was lost.  We must not
+ * overwrite the preserved portions of the partial message, because
+ * downstream WAL consumers (physical replicas) may have already received
+ * those parts and would not retreat to overwrite them.
+ */
+static XLogRecPtr
+CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
+{
+	xl_overwrite_contrecord xlrec;
+	XLogRecPtr	recptr;
+
+	/* sanity check */
+	if (!RecoveryInProgress())
+		elog(ERROR, "can only be used at end of recovery");
+
+	xlrec.overwritten_lsn = aborted_lsn;
+	/* XXX include the CRC-so-far of the old message? */
+
+	START_CRIT_SECTION();
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord));
+
+	recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD);
+
+	XLogFlush(recptr);
+
+	END_CRIT_SECTION();
+
+	return recptr;
+}
+
 /*
  * Flush all data in shared memory to disk, and fsync
  *
@@ -10295,6 +10428,10 @@ xlog_redo(XLogReaderState *record)
 
 		RecoveryRestartPoint(&checkPoint);
 	}
+	else if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		VerifyOverwrittenRecord(record);
+	}
 	else if (info == XLOG_END_OF_RECOVERY)
 	{
 		xl_end_of_recovery xlrec;
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index e596a0470a..a33cb23ac0 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -409,6 +409,7 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
  *	 durability, which allows to avoid triggering WAL archiving and other
  *	 background activity.
+ * - XLOG_INCLUDE_XID, ???
  */
 void
 XLogSetRecordFlags(uint8 flags)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5cf74e181a..b72a8bf9f6 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -39,6 +39,7 @@
 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
 			pg_attribute_printf(2, 3);
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
+static void RememberOverwrittenRecord(XLogReaderState *state);
 static int	ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
 							 int reqLen);
 static void XLogReaderInvalReadState(XLogReaderState *state);
@@ -278,6 +279,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 				total_len;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
+	bool		assembled;
 	bool		gotheader;
 	int			readOff;
 
@@ -293,6 +295,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	state->errormsg_buf[0] = '\0';
 
 	ResetDecoder(state);
+	state->abortedRecPtr = InvalidXLogRecPtr;
+	state->missingContrecPtr = InvalidXLogRecPtr;
 
 	RecPtr = state->EndRecPtr;
 
@@ -319,7 +323,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		randAccess = true;
 	}
 
+restart:
 	state->currRecPtr = RecPtr;
+	assembled = false;
 
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
@@ -415,6 +421,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		char	   *buffer;
 		uint32		gotlen;
 
+		assembled = true;
+
 		/*
 		 * Enlarge readRecordBuf as needed.
 		 */
@@ -448,8 +456,24 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 
 			Assert(SizeOfXLogShortPHD <= readOff);
 
-			/* Check that the continuation on next page looks valid */
 			pageHeader = (XLogPageHeader) state->readBuf;
+
+			/*
+			 * If we were expecting a continuation record and got an "aborted
+			 * partial" flag, that means the continuation record was lost.
+			 * Ignore the record we were reading, since we now know it's broken
+			 * and lost forever, and restart the read by assuming the address
+			 * to read is the location where we found this flag.
+			 */
+			if (pageHeader->xlp_info & XLP_FIRST_IS_ABORTED_PARTIAL)
+			{
+				RememberOverwrittenRecord(state);
+				ResetDecoder(state);
+				RecPtr = targetPagePtr;
+				goto restart;
+			}
+
+			/* Check that the continuation on next page looks valid */
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
@@ -551,6 +575,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		return NULL;
 
 err:
+	if (assembled)
+	{
+		/*
+		 * We get here when a record that spans multiple pages needs to be
+		 * assembled, but something went wrong -- perhaps a contrecord piece
+		 * was lost.  If caller is WAL replay, it will know where the aborted
+		 * record was and where to direct followup WAL to be written, marking
+		 * the next piece with XLP_FIRST_IS_ABORTED_PARTIAL, which will in
+		 * turn signal downstream WAL consumers that the broken WAL record is
+		 * to be ignored.
+		 */
+		state->abortedRecPtr = RecPtr;
+		state->missingContrecPtr = targetPagePtr;
+	}
 
 	/*
 	 * Invalidate the read state. We might read from a different source after
@@ -564,6 +602,20 @@ err:
 	return NULL;
 }
 
+/*
+ * When we find XLP_FIRST_IS_ABORTED_PARTIAL and stop reading a record that
+ * was incompletely reading, keep track of what it is that we're aborting
+ * before throwing away the data we already read.  This allows somebody else to
+ * verify that the next record we're going to find (which should be a
+ * XLOG_OVERWRITE_CONTRECORD record) matches what we throw away.
+ */
+static void
+RememberOverwrittenRecord(XLogReaderState *state)
+{
+	state->overwrittenRecPtr = state->currRecPtr;
+	/* XXX more? could store CRC-so-far, for example */
+}
+
 /*
  * Read a single xlog page including at least [pageptr, reqLen] of valid data
  * via the page_read() callback.
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 3b5eceff65..0fb44ba37d 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -76,8 +76,20 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define XLP_LONG_HEADER				0x0002
 /* This flag indicates backup blocks starting in this page are optional */
 #define XLP_BKP_REMOVABLE			0x0004
+/*
+ * This flag marks a record that replaces a missing contrecord.
+ * When on WAL replay we expect a continuation record at the start of
+ * a page that is not there, recovery ends but the checkpoint record
+ * that follows is marked with this flag, which indicates WAL readers
+ * that the incomplete record is to be skipped, and that WAL reading
+ * is to be resumed here.  This is useful for downstream consumers of
+ * WAL which have already received (the first half of) the original
+ * broken WAL record, such as via archive_command or physical streaming
+ * replication, which we cannot "rewind".
+ */
+#define XLP_FIRST_IS_ABORTED_PARTIAL 0x0008
 /* All defined flag bits in xlp_info (used for validity checking of header) */
-#define XLP_ALL_FLAGS				0x0007
+#define XLP_ALL_FLAGS				0x000F
 
 #define XLogPageHeaderSize(hdr)		\
 	(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
@@ -249,6 +261,12 @@ typedef struct xl_restore_point
 	char		rp_name[MAXFNAMELEN];
 } xl_restore_point;
 
+/* Overwrite of prior contrecord */
+typedef struct xl_overwrite_contrecord
+{
+	XLogRecPtr	overwritten_lsn;
+} xl_overwrite_contrecord;
+
 /* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
 typedef struct xl_end_of_recovery
 {
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 21d200d3df..f75fcf9005 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -144,6 +144,12 @@ typedef struct
 	uint16		data_bufsz;
 } DecodedBkpBlock;
 
+/* Data about a record that was found incomplete */
+typedef struct
+{
+	XLogRecPtr	lsn;
+} XLogOverwrittenRecord;
+
 struct XLogReaderState
 {
 	/*
@@ -175,6 +181,14 @@ struct XLogReaderState
 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
+	/*
+	 * The start point of a partial record at the end of WAL, and the start
+	 * location of its first contrecord that went missing.
+	 */
+	XLogRecPtr	abortedRecPtr;
+	XLogRecPtr	missingContrecPtr;
+	/* Set when XLP_FIRST_IS_ABORTED_PARTIAL is found */
+	XLogRecPtr	overwrittenRecPtr;
 
 	/* ----------------------------------------
 	 * Decoded representation of current record
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index e3f48158ce..179efa9f4a 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -76,6 +76,7 @@ typedef struct CheckPoint
 #define XLOG_END_OF_RECOVERY			0x90
 #define XLOG_FPI_FOR_HINT				0xA0
 #define XLOG_FPI						0xB0
+#define XLOG_OVERWRITE_CONTRECORD		0xC0
 
 
 /*
-- 
2.20.1

>From 86e4e9c85e2916f11ffb287c566d23652e7bd285 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Wed, 15 Sep 2021 13:10:04 -0300
Subject: [PATCH v7 2/2] test code

---
 src/test/recovery/t/026_aborted_contrecord.pl | 73 +++++++++++++++++++
 1 file changed, 73 insertions(+)
 create mode 100644 src/test/recovery/t/026_aborted_contrecord.pl

diff --git a/src/test/recovery/t/026_aborted_contrecord.pl b/src/test/recovery/t/026_aborted_contrecord.pl
new file mode 100644
index 0000000000..e13f324ae6
--- /dev/null
+++ b/src/test/recovery/t/026_aborted_contrecord.pl
@@ -0,0 +1,73 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test that a WAL record that is partially archived can be recovered from.
+
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More;
+
+plan tests => 3;
+
+my $node = PostgresNode->new('primary');
+$node->init(allows_streaming => 1);
+$node->append_conf('postgresql.conf', 'wal_keep_size=1GB');
+$node->append_conf('postgresql.conf', 'max_locks_per_transaction=1000');
+$node->start;
+
+$node->safe_psql('postgres', 'create table filler (a int)');
+# First, measure how many bytes does the insertion of 1000 rows produce
+my $start_lsn = $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+$node->safe_psql('postgres', 'insert into filler select * from generate_series(1, 1000)');
+my $end_lsn = $node->safe_psql('postgres', q{select pg_current_wal_insert_lsn() - '0/0'});
+my $rows_walsize = $end_lsn - $start_lsn;
+
+# Now consume all remaining room in the current WAL segment, leaving
+# space enough only for the start of a largish record.
+$node->safe_psql('postgres', qq{
+WITH setting AS (
+  SELECT setting::int AS wal_segsize
+    FROM pg_settings WHERE name = 'wal_segment_size'
+)
+INSERT INTO filler
+SELECT g FROM setting,
+  generate_series(1, 1000 * (wal_segsize - ((pg_current_wal_insert_lsn() - '0/0') % wal_segsize)) / $rows_walsize) g
+});
+
+my $initfile = $node->safe_psql('postgres', 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
+$node->safe_psql('postgres', qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))});
+#$node->safe_psql('postgres', qq{create table foo ()});
+my $endfile = $node->safe_psql('postgres', 'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
+ok($initfile != $endfile, "$initfile differs from $endfile");
+
+# Now stop abruptly, to avoid a stop checkpoint.  We can remove the tail file
+# afterwards, and on startup the large message should be overwritten with new
+# contents
+$node->stop('immediate');
+
+unlink $node->basedir . "/pgdata/pg_wal/$endfile" or die "could not unlink ".$node->basedir."/pgdata/pg_wal/$endfile: $!";
+
+# OK, create a standby at this spot.
+$node->backup_fs_cold('backup');
+my $node_standby = PostgresNode->new('standby');
+$node_standby->init_from_backup($node, 'backup', has_streaming => 1);
+
+$node_standby->start;
+$node->start;
+
+$node->safe_psql('postgres', qq{create table foo (a text); insert into foo values ('hello')});
+$node->safe_psql('postgres', qq{SELECT pg_logical_emit_message(true, 'test 026', 'AABBCC')});
+
+my $until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
+my $caughtup_query = "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
+$node_standby->poll_query_until('postgres', $caughtup_query)
+	or die "Timed out while waiting for standby to catch up";
+
+ok($node_standby->safe_psql('postgres', 'select * from foo') eq
+	'hello', 'standby replays past overwritten contrecord');
+
+# Verify message appears in standby's log
+my $log = slurp_file($node_standby->logfile);
+like($log, qr[sucessfully skipped missing contrecord at], "found log line in standby");
-- 
2.20.1

Reply via email to