On 2021-Sep-15, Kyotaro Horiguchi wrote:

> +             CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch,
> +                                                     flags & 
> XLOG_SET_ABORTED_PARTIAL,
> +                                                     rdata, StartPos, 
> EndPos);
> 
> The new xlog flag XLOG_SET_ABORTED_PARTIAL is used only by
> RM_XLOG_ID/XLOG_OVERWRITE_CONTRECORD records, so the flag value is the
> equivalent of the record type.

In the new version I removed all this; it was wrong.

> +                     if (set_aborted_partial)
> +                             pagehdr->xlp_info |= 
> XLP_FIRST_IS_ABORTED_PARTIAL;
> +
> 
> I'm not sure about the reason for the change from the previous patch
> (I might be missing something), this sets the flag on the *next* page
> of the page where the record starts.  So in the first place we
> shouldn't set the flag there.

You're right, this code is wrong.  And in fact I had already noticed it
yesterday, but much to my embarrasment I forgot to fix it.  Here is a
fixed version, where I moved the flag set back to AdvanceXLInsertBuffer.
I think doing it anywhere else is going to be very painful.  AFAICT we
do the right thing now, but amusingly we don't have any tooling to
verify that the XLP flag is set in the page where we want it.

With this patch we now have two recptrs: the LSN of the broken record,
and the LSN of the missing contrecord.  The latter is where to start
writing WAL after recovery is done, and the former is currently unused
but we could use it to double-check that we're aborting (forgetting) the
correct record.  I didn't try to implement that, but IIUC it is
xlogreader that would have to do that.

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
"Si quieres ser creativo, aprende el arte de perder el tiempo"
>From f468f058d5b480a9f27f9d1830f3959297accd4d Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Thu, 2 Sep 2021 17:21:46 -0400
Subject: [PATCH v6] Implement FIRST_IS_ABORTED_CONTRECORD

---
 src/backend/access/rmgrdesc/xlogdesc.c  |  11 +++
 src/backend/access/transam/xlog.c       | 112 ++++++++++++++++++++++--
 src/backend/access/transam/xloginsert.c |   1 +
 src/backend/access/transam/xlogreader.c |  38 +++++++-
 src/include/access/xlog_internal.h      |  20 ++++-
 src/include/access/xlogreader.h         |   7 ++
 src/include/catalog/pg_control.h        |   1 +
 7 files changed, 180 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index e6090a9dad..0be382f8a5 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -139,6 +139,14 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
 						 timestamptz_to_str(xlrec.end_time));
 	}
+	else if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		xl_overwrite_contrecord xlrec;
+
+		memcpy(&xlrec, rec, sizeof(xl_overwrite_contrecord));
+		appendStringInfo(buf, "lsn %X/%X",
+						 LSN_FORMAT_ARGS(xlrec.overwritten_lsn));
+	}
 }
 
 const char *
@@ -178,6 +186,9 @@ xlog_identify(uint8 info)
 		case XLOG_END_OF_RECOVERY:
 			id = "END_OF_RECOVERY";
 			break;
+		case XLOG_OVERWRITE_CONTRECORD:
+			id = "OVERWRITE_CONTRECORD";
+			break;
 		case XLOG_FPI:
 			id = "FPI";
 			break;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749d..a8e587b5c6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -199,6 +199,15 @@ static XLogRecPtr LastRec;
 static XLogRecPtr flushedUpto = 0;
 static TimeLineID receiveTLI = 0;
 
+/*
+ * abortedRecPtr is the start pointer of a broken record at end of WAL when
+ * recovery completes; missingContrecPtr is the location of the first
+ * contrecord that went missing.  See CreateOverwriteContrecordRecord for
+ * details.
+ */
+static XLogRecPtr abortedRecPtr;
+static XLogRecPtr missingContrecPtr;
+
 /*
  * During recovery, lastFullPageWrites keeps track of full_page_writes that
  * the replayed WAL records indicate. It's initialized with full_page_writes
@@ -894,6 +903,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 								TimeLineID prevTLI);
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
+static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
@@ -1120,8 +1130,8 @@ XLogInsertRecord(XLogRecData *rdata,
 		 * All the record data, including the header, is now ready to be
 		 * inserted. Copy the record in the space reserved.
 		 */
-		CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
-							StartPos, EndPos);
+		CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch,
+							rdata, StartPos, EndPos);
 
 		/*
 		 * Unless record is flagged as not important, update LSN of last
@@ -1504,7 +1514,8 @@ checkXLogConsistency(XLogReaderState *record)
  * area in the WAL.
  */
 static void
-CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
+CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
+					XLogRecData *rdata,
 					XLogRecPtr StartPos, XLogRecPtr EndPos)
 {
 	char	   *currpos;
@@ -2246,6 +2257,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		if (!Insert->forcePageWrites)
 			NewPage->xlp_info |= XLP_BKP_REMOVABLE;
 
+		if (missingContrecPtr == NewPageBeginPtr)
+		{
+			NewPage->xlp_info |= XLP_FIRST_IS_ABORTED_PARTIAL;
+			missingContrecPtr = InvalidXLogRecPtr;
+		}
+
 		/*
 		 * If first page of an XLOG segment file, make it a long header.
 		 */
@@ -4394,6 +4411,17 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 		EndRecPtr = xlogreader->EndRecPtr;
 		if (record == NULL)
 		{
+			/*
+			 * When WAL ends in an incomplete record, keep track of it.  After
+			 * recovery is done, we'll write a record to indicate downstream
+			 * WAL readers that that portion is to be ignored.
+			 */
+			if (!XLogRecPtrIsInvalid(xlogreader->abortedRecPtr))
+			{
+				abortedRecPtr = xlogreader->abortedRecPtr;
+				missingContrecPtr = xlogreader->missingContrecPtr;
+			}
+
 			if (readFile >= 0)
 			{
 				close(readFile);
@@ -7069,6 +7097,9 @@ StartupXLOG(void)
 		InRecovery = true;
 	}
 
+	abortedRecPtr = InvalidXLogRecPtr;
+	missingContrecPtr = InvalidXLogRecPtr;
+
 	/* REDO */
 	if (InRecovery)
 	{
@@ -7655,8 +7686,9 @@ StartupXLOG(void)
 
 	/*
 	 * Kill WAL receiver, if it's still running, before we continue to write
-	 * the startup checkpoint record. It will trump over the checkpoint and
-	 * subsequent records if it's still alive when we start writing WAL.
+	 * the startup checkpoint and aborted-contrecord records. It will trump
+	 * over these records and subsequent ones if it's still alive when we
+	 * start writing WAL.
 	 */
 	XLogShutdownWalRcv();
 
@@ -7689,8 +7721,12 @@ StartupXLOG(void)
 	StandbyMode = false;
 
 	/*
-	 * Re-fetch the last valid or last applied record, so we can identify the
-	 * exact endpoint of what we consider the valid portion of WAL.
+	 * Determine where to start writing WAL next.
+	 *
+	 * When recovery ended in an incomplete record, write a WAL record about
+	 * that and continue after it.  In all other cases, re-fetch the last
+	 * valid or last applied record, so we can identify the exact endpoint of
+	 * what we consider the valid portion of WAL.
 	 */
 	XLogBeginRead(xlogreader, LastRec);
 	record = ReadRecord(xlogreader, PANIC, false);
@@ -7821,6 +7857,18 @@ StartupXLOG(void)
 	XLogCtl->ThisTimeLineID = ThisTimeLineID;
 	XLogCtl->PrevTimeLineID = PrevTimeLineID;
 
+	/*
+	 * Actually, if WAL ended in an incomplete record, skip the parts that
+	 * made it through and start writing after the portion that persisted.
+	 * (It's critical to first write an OVERWRITE_CONTRECORD message, which
+	 * we'll do as soon as we're open for writing new WAL.)
+	 */
+	if (!XLogRecPtrIsInvalid(missingContrecPtr))
+	{
+		Assert(!XLogRecPtrIsInvalid(abortedRecPtr));
+		EndOfLog = missingContrecPtr;
+	}
+
 	/*
 	 * Prepare to write WAL starting at EndOfLog location, and init xlog
 	 * buffer cache using the block containing the last record from the
@@ -7873,13 +7921,23 @@ StartupXLOG(void)
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
+	LocalSetXLogInsertAllowed();
+
+	/* If necessary, write overwrite-contrecord before doing anything else */
+	if (!XLogRecPtrIsInvalid(abortedRecPtr))
+	{
+		Assert(!XLogRecPtrIsInvalid(missingContrecPtr));
+		CreateOverwriteContrecordRecord(abortedRecPtr);
+		abortedRecPtr = InvalidXLogRecPtr;
+		missingContrecPtr = InvalidXLogRecPtr;
+	}
+
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
 	 * record before resource manager writes cleanup WAL records or checkpoint
 	 * record is written.
 	 */
 	Insert->fullPageWrites = lastFullPageWrites;
-	LocalSetXLogInsertAllowed();
 	UpdateFullPageWrites();
 	LocalXLogInsertAllowed = -1;
 
@@ -9365,6 +9423,40 @@ CreateEndOfRecoveryRecord(void)
 	LocalXLogInsertAllowed = -1;	/* return to "check" state */
 }
 
+/*
+ * Write an OVERWRITE_CONTRECORD message, which is used after recovery is
+ * complete to mark the spot where a partial message was lost.  We must not
+ * overwrite the preserved portions of the partial message, because
+ * downstream WAL consumers (physical replicas) may have already received
+ * those parts and would not retreat to overwrite them.
+ */
+static XLogRecPtr
+CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn)
+{
+	xl_overwrite_contrecord xlrec;
+	XLogRecPtr	recptr;
+
+	/* sanity check */
+	if (!RecoveryInProgress())
+		elog(ERROR, "can only be used at end of recovery");
+
+	xlrec.overwritten_lsn = aborted_lsn;
+	/* XXX include the CRC-so-far of the old message? */
+
+	START_CRIT_SECTION();
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_overwrite_contrecord));
+
+	recptr = XLogInsert(RM_XLOG_ID, XLOG_OVERWRITE_CONTRECORD);
+
+	XLogFlush(recptr);
+
+	END_CRIT_SECTION();
+
+	return recptr;
+}
+
 /*
  * Flush all data in shared memory to disk, and fsync
  *
@@ -10295,6 +10387,10 @@ xlog_redo(XLogReaderState *record)
 
 		RecoveryRestartPoint(&checkPoint);
 	}
+	else if (info == XLOG_OVERWRITE_CONTRECORD)
+	{
+		/* nothing to do here */
+	}
 	else if (info == XLOG_END_OF_RECOVERY)
 	{
 		xl_end_of_recovery xlrec;
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index e596a0470a..a33cb23ac0 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -409,6 +409,7 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
  * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
  *	 durability, which allows to avoid triggering WAL archiving and other
  *	 background activity.
+ * - XLOG_INCLUDE_XID, ???
  */
 void
 XLogSetRecordFlags(uint8 flags)
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5cf74e181a..2bcd2bbec7 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -278,6 +278,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 				total_len;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
+	bool		assembled;
 	bool		gotheader;
 	int			readOff;
 
@@ -293,6 +294,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	state->errormsg_buf[0] = '\0';
 
 	ResetDecoder(state);
+	state->abortedRecPtr = InvalidXLogRecPtr;
+	state->missingContrecPtr = InvalidXLogRecPtr;
 
 	RecPtr = state->EndRecPtr;
 
@@ -319,7 +322,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		randAccess = true;
 	}
 
+restart:
 	state->currRecPtr = RecPtr;
+	assembled = false;
 
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
@@ -415,6 +420,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		char	   *buffer;
 		uint32		gotlen;
 
+		assembled = true;
+
 		/*
 		 * Enlarge readRecordBuf as needed.
 		 */
@@ -448,8 +455,23 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 
 			Assert(SizeOfXLogShortPHD <= readOff);
 
-			/* Check that the continuation on next page looks valid */
 			pageHeader = (XLogPageHeader) state->readBuf;
+
+			/*
+			 * If we were expecting a continuation record and got an "aborted
+			 * partial" flag, that means the continuation record was lost.
+			 * Ignore the record we were reading, since we now know it's broken
+			 * and lost forever, and restart the read by assuming the address
+			 * to read is the location where we found this flag.
+			 */
+			if (pageHeader->xlp_info & XLP_FIRST_IS_ABORTED_PARTIAL)
+			{
+				ResetDecoder(state);
+				RecPtr = targetPagePtr;
+				goto restart;
+			}
+
+			/* Check that the continuation on next page looks valid */
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
@@ -551,6 +573,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		return NULL;
 
 err:
+	if (assembled)
+	{
+		/*
+		 * We get here when a record that spans multiple pages needs to be
+		 * assembled, but something went wrong -- perhaps a contrecord piece
+		 * was lost.  If caller is WAL replay, it will know where the aborted
+		 * record was and where to direct followup WAL to be written, marking
+		 * the next piece with XLP_FIRST_IS_ABORTED_PARTIAL, which will in
+		 * turn signal downstream WAL consumers that the broken WAL record is
+		 * to be ignored.
+		 */
+		state->abortedRecPtr = RecPtr;
+		state->missingContrecPtr = targetPagePtr;
+	}
 
 	/*
 	 * Invalidate the read state. We might read from a different source after
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 3b5eceff65..0fb44ba37d 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -76,8 +76,20 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define XLP_LONG_HEADER				0x0002
 /* This flag indicates backup blocks starting in this page are optional */
 #define XLP_BKP_REMOVABLE			0x0004
+/*
+ * This flag marks a record that replaces a missing contrecord.
+ * When on WAL replay we expect a continuation record at the start of
+ * a page that is not there, recovery ends but the checkpoint record
+ * that follows is marked with this flag, which indicates WAL readers
+ * that the incomplete record is to be skipped, and that WAL reading
+ * is to be resumed here.  This is useful for downstream consumers of
+ * WAL which have already received (the first half of) the original
+ * broken WAL record, such as via archive_command or physical streaming
+ * replication, which we cannot "rewind".
+ */
+#define XLP_FIRST_IS_ABORTED_PARTIAL 0x0008
 /* All defined flag bits in xlp_info (used for validity checking of header) */
-#define XLP_ALL_FLAGS				0x0007
+#define XLP_ALL_FLAGS				0x000F
 
 #define XLogPageHeaderSize(hdr)		\
 	(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
@@ -249,6 +261,12 @@ typedef struct xl_restore_point
 	char		rp_name[MAXFNAMELEN];
 } xl_restore_point;
 
+/* Overwrite of prior contrecord */
+typedef struct xl_overwrite_contrecord
+{
+	XLogRecPtr	overwritten_lsn;
+} xl_overwrite_contrecord;
+
 /* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
 typedef struct xl_end_of_recovery
 {
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 21d200d3df..5748f7018b 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -175,6 +175,13 @@ struct XLogReaderState
 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
+	/*
+	 * The start point of a partial record at the end of WAL, and the start
+	 * location of its first contrecord that went missing.
+	 */
+	XLogRecPtr	abortedRecPtr;
+	XLogRecPtr	missingContrecPtr;
+
 
 	/* ----------------------------------------
 	 * Decoded representation of current record
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index e3f48158ce..179efa9f4a 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -76,6 +76,7 @@ typedef struct CheckPoint
 #define XLOG_END_OF_RECOVERY			0x90
 #define XLOG_FPI_FOR_HINT				0xA0
 #define XLOG_FPI						0xB0
+#define XLOG_OVERWRITE_CONTRECORD		0xC0
 
 
 /*
-- 
2.20.1

Reply via email to