>From 6d75e271b7475cc853b13ef54d13ba1c0b2fab1d Mon Sep 17 00:00:00 2001
From: Takashi Menjo <takashi.menjou.vg@hco.ntt.co.jp>
Date: Fri, 24 Jan 2020 13:16:27 +0900
Subject: [PATCH 2/3] Non-volatile WAL buffer

Now external WAL buffer becomes non-volatile.

Bumps PG_CONTROL_VERSION.
---
 src/backend/access/transam/xlog.c       | 975 +++++++++++++++++++++---
 src/backend/replication/walsender.c     |  50 ++
 src/bin/pg_controldata/pg_controldata.c |   3 +
 src/include/access/xlog.h               |   6 +
 src/include/catalog/pg_control.h        |  17 +-
 5 files changed, 948 insertions(+), 103 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index eae0c01e3c..ba89d3c158 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -643,6 +643,13 @@ typedef struct XLogCtlData
 	TimeLineID	ThisTimeLineID;
 	TimeLineID	PrevTimeLineID;
 
+	/*
+	 * Used for non-volatile WAL buffer (NVWAL).
+	 *
+	 * All the records up to this LSN are persistent in NVWAL.
+	 */
+	XLogRecPtr	persistentUpTo;
+
 	/*
 	 * SharedRecoveryInProgress indicates if we're still in crash or archive
 	 * recovery.  Protected by info_lck.
@@ -766,11 +773,12 @@ typedef enum
 	XLOG_FROM_ANY = 0,			/* request to read WAL from any source */
 	XLOG_FROM_ARCHIVE,			/* restored using restore_command */
 	XLOG_FROM_PG_WAL,			/* existing file in pg_wal */
+	XLOG_FROM_NVWAL,			/* non-volatile WAL buffer */
 	XLOG_FROM_STREAM			/* streamed from master */
 } XLogSource;
 
 /* human-readable names for XLogSources, for debugging output */
-static const char *xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
+static const char *xlogSourceNames[] = {"any", "archive", "pg_wal", "nvwal", "stream"};
 
 /*
  * openLogFile is -1 or a kernel FD for an open log file segment.
@@ -898,6 +906,7 @@ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 										bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
 static void XLogFileClose(void);
+static void PreallocNonVolatileXlogBuffer(void);
 static void PreallocXlogFiles(XLogRecPtr endptr);
 static void RemoveTempXlogFiles(void);
 static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr RedoRecPtr, XLogRecPtr endptr);
@@ -1177,6 +1186,43 @@ XLogInsertRecord(XLogRecData *rdata,
 		}
 	}
 
+	/*
+	 * Request a checkpoint here if non-volatile WAL buffer is used and we
+	 * have consumed too much WAL since the last checkpoint.
+	 *
+	 * We first screen under the condition (1) OR (2) below:
+	 *
+	 * (1) The record was the first one in a certain segment.
+	 * (2) The record was inserted across segments.
+	 *
+	 * We then check the segment number which the record was inserted into.
+	 */
+	if (NvwalAvail && inserted &&
+		(StartPos % wal_segment_size == SizeOfXLogLongPHD ||
+		 StartPos / wal_segment_size < EndPos / wal_segment_size))
+	{
+		XLogSegNo	end_segno;
+
+		XLByteToSeg(EndPos, end_segno, wal_segment_size);
+
+		/*
+		 * NOTE: We do not signal walsender here because the inserted record
+		 * have not drained by NVWAL buffer yet.
+		 *
+		 * NOTE: We do not signal walarchiver here because the inserted record
+		 * have not flushed to a segment file.  So we don't need to update
+		 * XLogCtl->lastSegSwitch{Time,LSN}, used only by CheckArchiveTimeout.
+		 */
+
+		/* Two-step checking for speed (see also XLogWrite) */
+		if (IsUnderPostmaster && XLogCheckpointNeeded(end_segno))
+		{
+			(void) GetRedoRecPtr();
+			if (XLogCheckpointNeeded(end_segno))
+				RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
+		}
+	}
+
 #ifdef WAL_DEBUG
 	if (XLOG_DEBUG)
 	{
@@ -2100,6 +2146,15 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 	XLogRecPtr	NewPageBeginPtr;
 	XLogPageHeader NewPage;
 	int			npages = 0;
+	bool		is_firstpage;
+
+	if (NvwalAvail)
+		elog(DEBUG1, "XLogCtl->InitializedUpTo %X/%X; upto %X/%X; opportunistic %s",
+			 (uint32) (XLogCtl->InitializedUpTo >> 32),
+			 (uint32) XLogCtl->InitializedUpTo,
+			 (uint32) (upto >> 32),
+			 (uint32) upto,
+			 opportunistic ? "true" : "false");
 
 	LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
 
@@ -2161,7 +2216,25 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 				{
 					/* Have to write it ourselves */
 					TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
-					WriteRqst.Write = OldPageRqstPtr;
+
+					if (NvwalAvail)
+					{
+						/*
+						 * If we use non-volatile WAL buffer, it is a special
+						 * but expected case to write the buffer pages out to
+						 * segment files, and for simplicity, it is done in
+						 * segment by segment.
+						 */
+						XLogRecPtr		OldSegEndPtr;
+
+						OldSegEndPtr = OldPageRqstPtr - XLOG_BLCKSZ + wal_segment_size;
+						Assert(OldSegEndPtr % wal_segment_size == 0);
+
+						WriteRqst.Write = OldSegEndPtr;
+					}
+					else
+						WriteRqst.Write = OldPageRqstPtr;
+
 					WriteRqst.Flush = 0;
 					XLogWrite(WriteRqst, false);
 					LWLockRelease(WALWriteLock);
@@ -2188,7 +2261,20 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		 * Be sure to re-zero the buffer so that bytes beyond what we've
 		 * written will look like zeroes and not valid XLOG records...
 		 */
-		MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+		if (NvwalAvail)
+		{
+			/*
+			 * We do not take the way that combines MemSet() and pmem_persist()
+			 * because pmem_persist() may use slow and strong-ordered cache
+			 * flush instruction if weak-ordered fast one is not supported.
+			 * Instead, we first fill the buffer with zero by
+			 * pmem_memset_persist() that can leverage non-temporal fast store
+			 * instructions, then make the header persistent later.
+			 */
+			nv_memset_persist(NewPage, 0, XLOG_BLCKSZ);
+		}
+		else
+			MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
 
 		/*
 		 * Fill the new page's header
@@ -2220,7 +2306,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		/*
 		 * If first page of an XLOG segment file, make it a long header.
 		 */
-		if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
+		is_firstpage = ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0);
+		if (is_firstpage)
 		{
 			XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
 
@@ -2235,7 +2322,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		 * before the xlblocks update. GetXLogBuffer() reads xlblocks without
 		 * holding a lock.
 		 */
-		pg_write_barrier();
+		if (NvwalAvail)
+		{
+			/* Make the header persistent on PMEM */
+			nv_persist(NewPage, is_firstpage ? SizeOfXLogLongPHD : SizeOfXLogShortPHD);
+		}
+		else
+			pg_write_barrier();
 
 		*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
 
@@ -2245,6 +2338,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 	}
 	LWLockRelease(WALBufMappingLock);
 
+	if (NvwalAvail)
+		elog(DEBUG1, "ControlFile->discardedUpTo %X/%X; XLogCtl->InitializedUpTo %X/%X",
+			 (uint32) (ControlFile->discardedUpTo >> 32),
+			 (uint32) ControlFile->discardedUpTo,
+			 (uint32) (XLogCtl->InitializedUpTo >> 32),
+			 (uint32) XLogCtl->InitializedUpTo);
+
 #ifdef WAL_DEBUG
 	if (XLOG_DEBUG && npages > 0)
 	{
@@ -2616,6 +2716,23 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 		LogwrtResult.Flush = LogwrtResult.Write;
 	}
 
+	/*
+	 * Update discardedUpTo if NVWAL is used.  A new value should not fall
+	 * behind the old one.
+	 */
+	if (NvwalAvail)
+	{
+		Assert(LogwrtResult.Write == LogwrtResult.Flush);
+
+		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+		if (ControlFile->discardedUpTo < LogwrtResult.Write)
+		{
+			ControlFile->discardedUpTo = LogwrtResult.Write;
+			UpdateControlFile();
+		}
+		LWLockRelease(ControlFileLock);
+	}
+
 	/*
 	 * Update shared-memory status
 	 *
@@ -2820,6 +2937,123 @@ XLogFlush(XLogRecPtr record)
 		return;
 	}
 
+	if (NvwalAvail)
+	{
+		XLogRecPtr	FromPos;
+
+		/*
+		 * No page on the NVWAL is to be flushed to segment files.  Instead,
+		 * we wait all the insertions preceding this one complete.  We will
+		 * wait for all the records to be persistent on the NVWAL below.
+		 */
+		record = WaitXLogInsertionsToFinish(record);
+
+		/*
+		 * Check if another backend already have done what I am doing.
+		 *
+		 * We can compare something <= XLogCtl->persistentUpTo without
+		 * holding XLogCtl->info_lck spinlock because persistentUpTo is
+		 * monotonically increasing and can be loaded atomically on each
+		 * NVWAL-supported platform (now x64 only).
+		 */
+		FromPos = *((volatile XLogRecPtr *) &XLogCtl->persistentUpTo);
+		if (record <= FromPos)
+			return;
+
+		/*
+		 * In a very rare case, we rounded whole the NVWAL.  We do not need
+		 * to care old pages here because they already have been evicted to
+		 * segment files at record insertion.
+		 *
+		 * In such a case, we flush whole the NVWAL.  We also log it as
+		 * warning because it can be time-consuming operation.
+		 *
+		 * TODO Advance XLogCtl->persistentUpTo at the end of XLogWrite, and
+		 * we can remove the following first if-block.
+		 */
+		if (record - FromPos > NvwalSize)
+		{
+			elog(WARNING, "flush whole the NVWAL; FromPos %X/%X; record %X/%X",
+				 (uint32) (FromPos >> 32), (uint32) FromPos,
+				 (uint32) (record >> 32), (uint32) record);
+
+			nv_flush(XLogCtl->pages, NvwalSize);
+		}
+		else
+		{
+			char   *frompos;
+			char   *uptopos;
+			size_t	fromoff;
+			size_t	uptooff;
+
+			/*
+			 * Flush each record that is probably not flushed yet.
+			 *
+			 * We have two reasons why we say "probably".  The first is because
+			 * such a record copied with non-temporal store instruction has
+			 * already "flushed" but we cannot distinguish it.  nv_flush is
+			 * harmless for it in consistency.
+			 *
+			 * The second reason is that the target record might have already
+			 * been evicted to a segment file until now.  Also in this case,
+			 * nv_flush is harmless in consistency.
+			 */
+			uptooff = record % NvwalSize;
+			uptopos = XLogCtl->pages + uptooff;
+			fromoff = FromPos % NvwalSize;
+			frompos = XLogCtl->pages + fromoff;
+
+			/* Handles rotation */
+			if (uptopos <= frompos)
+			{
+				nv_flush(frompos, NvwalSize - fromoff);
+				fromoff = 0;
+				frompos = XLogCtl->pages;
+			}
+
+			nv_flush(frompos, uptooff - fromoff);
+		}
+
+		/*
+		 * To guarantee durability ("D" of ACID), we should satisfy the
+		 * following two for each transaction X:
+		 *
+		 *  (1) All the WAL records inserted by X, including the commit record
+		 *      of X, should persist on NVWAL before the server commits X.
+		 *
+		 *  (2) All the WAL records inserted by any other transactions than
+		 *      X, that have less LSN than the commit record just inserted
+		 *      by X, should persist on NVWAL before the server commits X.
+		 *
+		 * The (1) can be satisfied by a store barrier after the commit record
+		 * of X is flushed because each WAL record on X is already flushed in
+		 * the end of its insertion.  The (2) can be satisfied by waiting for
+		 * any record insertions that have less LSN than the commit record just
+		 * inserted by X, and by a store barrier as well.
+		 *
+		 * Now is the time.  Have a store barrier.
+		 */
+		nv_drain();
+
+		/*
+		 * Remember where the last persistent record is.  A new value should
+		 * not fall behind the old one.
+		 */
+		SpinLockAcquire(&XLogCtl->info_lck);
+		if (XLogCtl->persistentUpTo < record)
+			XLogCtl->persistentUpTo = record;
+		SpinLockRelease(&XLogCtl->info_lck);
+
+		/*
+		 * The records up to the returned "record" have been persisntent on
+		 * NVWAL.  Now signal walsenders.
+		 */
+		WalSndWakeupRequest();
+		WalSndWakeupProcessRequests();
+
+		return;
+	}
+
 	/* Quick exit if already known flushed */
 	if (record <= LogwrtResult.Flush)
 		return;
@@ -3003,6 +3237,13 @@ XLogBackgroundFlush(void)
 	if (RecoveryInProgress())
 		return false;
 
+	/*
+	 * Quick exit if NVWAL buffer is used and archiving is not active. In this
+	 * case, we need no WAL segment file in pg_wal directory.
+	 */
+	if (NvwalAvail && !XLogArchivingActive())
+		return false;
+
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
 	LogwrtResult = XLogCtl->LogwrtResult;
@@ -3021,6 +3262,18 @@ XLogBackgroundFlush(void)
 		flexible = false;		/* ensure it all gets written */
 	}
 
+	/*
+	 * If NVWAL is used, back off to the last compeleted segment boundary
+	 * for writing the buffer page to files in segment by segment.  We do so
+	 * nowhere but here after XLogCtl->asyncXactLSN is loaded because it
+	 * should be considered.
+	 */
+	if (NvwalAvail)
+	{
+		WriteRqst.Write -= WriteRqst.Write % wal_segment_size;
+		flexible = false;		/* ensure it all gets written */
+	}
+
 	/*
 	 * If already known flushed, we're done. Just need to check if we are
 	 * holding an open file handle to a logfile that's no longer in use,
@@ -3047,7 +3300,12 @@ XLogBackgroundFlush(void)
 	flushbytes =
 		WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
 
-	if (WalWriterFlushAfter == 0 || lastflush == 0)
+	if (NvwalAvail)
+	{
+		WriteRqst.Flush = WriteRqst.Write;
+		lastflush = now;
+	}
+	else if (WalWriterFlushAfter == 0 || lastflush == 0)
 	{
 		/* first call, or block based limits disabled */
 		WriteRqst.Flush = WriteRqst.Write;
@@ -3106,7 +3364,28 @@ XLogBackgroundFlush(void)
 	 * Great, done. To take some work off the critical path, try to initialize
 	 * as many of the no-longer-needed WAL buffers for future use as we can.
 	 */
-	AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+	if (NvwalAvail && max_wal_senders == 0)
+	{
+		XLogRecPtr		upto;
+
+		/*
+		 * If NVWAL is used and there is no walsender, nobody is to load
+		 * segments on the buffer.  So let's recycle segments up to {where we
+		 * have requested to write and flush} + NvwalSize.
+		 *
+		 * Note that if NVWAL is used and a walsender seems running, we have to
+		 * do nothing; keep the written pages on the buffer for walsenders to be
+		 * loaded from the buffer, not from the segment files.  Note that the
+		 * buffer pages are eventually to be recycled by checkpoint.
+		 */
+		Assert(WriteRqst.Write == WriteRqst.Flush);
+		Assert(WriteRqst.Write % wal_segment_size == 0);
+
+		upto = WriteRqst.Write + NvwalSize;
+		AdvanceXLInsertBuffer(upto - XLOG_BLCKSZ, false);
+	}
+	else
+		AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
 
 	/*
 	 * If we determined that we need to write data, but somebody else
@@ -3806,6 +4085,43 @@ XLogFileClose(void)
 	openLogFile = -1;
 }
 
+/*
+ * Preallocate non-volatile XLOG buffers.
+ *
+ * This zeroes buffers and prepare page headers up to
+ * ControlFile->discardedUpTo + S, where S is the total size of
+ * the non-volatile XLOG buffers.
+ *
+ * It is caller's responsibility to update ControlFile->discardedUpTo
+ * and to set XLogCtl->InitializedUpTo properly.
+ */
+static void
+PreallocNonVolatileXlogBuffer(void)
+{
+	XLogRecPtr	newupto,
+				InitializedUpTo;
+
+	Assert(NvwalAvail);
+
+	LWLockAcquire(ControlFileLock, LW_SHARED);
+	newupto = ControlFile->discardedUpTo;
+	LWLockRelease(ControlFileLock);
+
+	InitializedUpTo = XLogCtl->InitializedUpTo;
+
+	newupto += NvwalSize;
+	Assert(newupto % wal_segment_size == 0);
+
+	if (newupto <= InitializedUpTo)
+		return;
+
+	/*
+	 * Subtracting XLOG_BLCKSZ is important, because AdvanceXLInsertBuffer
+	 * handles the first argument as the beginning of pages, not the end.
+	 */
+	AdvanceXLInsertBuffer(newupto - XLOG_BLCKSZ, false);
+}
+
 /*
  * Preallocate log files beyond the specified log endpoint.
  *
@@ -4101,8 +4417,11 @@ RemoveXlogFile(const char *segname, XLogRecPtr RedoRecPtr, XLogRecPtr endptr)
 	 * Before deleting the file, see if it can be recycled as a future log
 	 * segment. Only recycle normal files, pg_standby for example can create
 	 * symbolic links pointing to a separate archive directory.
+	 *
+	 * If NVWAL buffer is used, a log segment file is never to be recycled
+	 * (that is, always go into else block).
 	 */
-	if (wal_recycle &&
+	if (!NvwalAvail && wal_recycle &&
 		endlogSegNo <= recycleSegNo &&
 		lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
 		InstallXLogFileSegment(&endlogSegNo, path,
@@ -5336,36 +5655,53 @@ BootStrapXLOG(void)
 	record->xl_crc = crc;
 
 	/* Create first XLOG segment file */
-	use_existent = false;
-	openLogFile = XLogFileInit(1, &use_existent, false);
-
-	/* Write the first page with the initial record */
-	errno = 0;
-	pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE);
-	if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+	if (NvwalAvail)
 	{
-		/* if write didn't set errno, assume problem is no disk space */
-		if (errno == 0)
-			errno = ENOSPC;
-		ereport(PANIC,
-				(errcode_for_file_access(),
-				 errmsg("could not write bootstrap write-ahead log file: %m")));
+		pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE);
+		nv_memcpy_nodrain(XLogCtl->pages + wal_segment_size, page, XLOG_BLCKSZ);
+		pgstat_report_wait_end();
+
+		pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC);
+		nv_drain();
+		pgstat_report_wait_end();
+
+		/*
+		 * Other WAL stuffs will be initialized in startup process.
+		 */
 	}
-	pgstat_report_wait_end();
+	else
+	{
+		use_existent = false;
+		openLogFile = XLogFileInit(1, &use_existent, false);
+
+		/* Write the first page with the initial record */
+		errno = 0;
+		pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE);
+		if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+		{
+			/* if write didn't set errno, assume problem is no disk space */
+			if (errno == 0)
+				errno = ENOSPC;
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not write bootstrap write-ahead log file: %m")));
+		}
+		pgstat_report_wait_end();
 
-	pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC);
-	if (pg_fsync(openLogFile) != 0)
-		ereport(PANIC,
-				(errcode_for_file_access(),
-				 errmsg("could not fsync bootstrap write-ahead log file: %m")));
-	pgstat_report_wait_end();
+		pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC);
+		if (pg_fsync(openLogFile) != 0)
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not fsync bootstrap write-ahead log file: %m")));
+		pgstat_report_wait_end();
 
-	if (close(openLogFile))
-		ereport(PANIC,
-				(errcode_for_file_access(),
-				 errmsg("could not close bootstrap write-ahead log file: %m")));
+		if (close(openLogFile))
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not close bootstrap write-ahead log file: %m")));
 
-	openLogFile = -1;
+		openLogFile = -1;
+	}
 
 	/* Now create pg_control */
 
@@ -5378,6 +5714,7 @@ BootStrapXLOG(void)
 	ControlFile->checkPoint = checkPoint.redo;
 	ControlFile->checkPointCopy = checkPoint;
 	ControlFile->unloggedLSN = FirstNormalUnloggedLSN;
+	ControlFile->discardedUpTo = (NvwalAvail) ? wal_segment_size : InvalidXLogRecPtr;
 
 	/* Set important parameter values for use when replaying WAL */
 	ControlFile->MaxConnections = MaxConnections;
@@ -5638,35 +5975,41 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
 	 * happens in the middle of a segment, copy data from the last WAL segment
 	 * of the old timeline up to the switch point, to the starting WAL segment
 	 * on the new timeline.
+	 *
+	 * If non-volatile WAL buffer is used, no new segment file is created. Data
+	 * up to the switch point will be copied into NVWAL buffer by StartupXLOG().
 	 */
-	if (endLogSegNo == startLogSegNo)
+	if (!NvwalAvail)
 	{
-		/*
-		 * Make a copy of the file on the new timeline.
-		 *
-		 * Writing WAL isn't allowed yet, so there are no locking
-		 * considerations. But we should be just as tense as XLogFileInit to
-		 * avoid emplacing a bogus file.
-		 */
-		XLogFileCopy(endLogSegNo, endTLI, endLogSegNo,
-					 XLogSegmentOffset(endOfLog, wal_segment_size));
-	}
-	else
-	{
-		/*
-		 * The switch happened at a segment boundary, so just create the next
-		 * segment on the new timeline.
-		 */
-		bool		use_existent = true;
-		int			fd;
+		if (endLogSegNo == startLogSegNo)
+		{
+			/*
+			 * Make a copy of the file on the new timeline.
+			 *
+			 * Writing WAL isn't allowed yet, so there are no locking
+			 * considerations. But we should be just as tense as XLogFileInit to
+			 * avoid emplacing a bogus file.
+			 */
+			XLogFileCopy(endLogSegNo, endTLI, endLogSegNo,
+						 XLogSegmentOffset(endOfLog, wal_segment_size));
+		}
+		else
+		{
+			/*
+			 * The switch happened at a segment boundary, so just create the next
+			 * segment on the new timeline.
+			 */
+			bool		use_existent = true;
+			int			fd;
 
-		fd = XLogFileInit(startLogSegNo, &use_existent, true);
+			fd = XLogFileInit(startLogSegNo, &use_existent, true);
 
-		if (close(fd))
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not close file \"%s\": %m",
-							XLogFileNameP(ThisTimeLineID, startLogSegNo))));
+			if (close(fd))
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not close file \"%s\": %m",
+									XLogFileNameP(ThisTimeLineID, startLogSegNo))));
+		}
 	}
 
 	/*
@@ -6888,6 +7231,11 @@ StartupXLOG(void)
 		InRecovery = true;
 	}
 
+	/* Dump discardedUpTo just before REDO */
+	elog(LOG, "ControlFile->discardedUpTo %X/%X",
+		 (uint32) (ControlFile->discardedUpTo >> 32),
+		 (uint32) ControlFile->discardedUpTo);
+
 	/* REDO */
 	if (InRecovery)
 	{
@@ -7635,10 +7983,88 @@ StartupXLOG(void)
 	Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
 	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
+	if (NvwalAvail)
+	{
+		XLogRecPtr	discardedUpTo;
+
+		discardedUpTo = ControlFile->discardedUpTo;
+		Assert(discardedUpTo == InvalidXLogRecPtr ||
+			   discardedUpTo % wal_segment_size == 0);
+
+		if (discardedUpTo == InvalidXLogRecPtr)
+		{
+			elog(DEBUG1, "brand-new NVWAL");
+
+			/* The following "Tricky point" is to initialize the buffer */
+		}
+		else if (EndOfLog <= discardedUpTo)
+		{
+			elog(DEBUG1, "no record on NVWAL has been UNDONE");
+
+			LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+			ControlFile->discardedUpTo = InvalidXLogRecPtr;
+			UpdateControlFile();
+			LWLockRelease(ControlFileLock);
+
+			nv_memset_persist(XLogCtl->pages, 0, NvwalSize);
+
+			/* The following "Tricky point" is to initialize the buffer */
+		}
+		else
+		{
+			int			last_idx;
+			int			idx;
+			XLogRecPtr	ptr;
+
+			elog(DEBUG1, "some records on NVWAL have been UNDONE; keep them");
+
+			/*
+			 * Initialize xlblock array because we decided to keep UNDONE
+			 * records on NVWAL buffer; or each page on the buffer that meets
+			 * xlblocks == 0 (initialized as so by XLOGShmemInit) is to be
+			 * accidentally cleared by the following AdvanceXLInsertBuffer!
+			 *
+			 * Two cases can be considered:
+			 *
+			 * 1) EndOfLog is on a page boundary (divisible by XLOG_BLCKSZ):
+			 *    Initialize up to (and including) the page containing the last
+			 *    record.  That page should end with EndOfLog.  The one more
+			 *    next page "N" beginning with EndOfLog is to be untouched
+			 *    because, in such a very corner case that all the NVWAL
+			 *    buffer pages are already filled, page N is on the same
+			 *    location as the first page "F" beginning with discardedUpTo.
+			 *    Of cource we should not overwrite the page F.
+			 *
+			 *    In this case, we first get XLogRecPtrToBufIdx(EndOfLog) as
+			 *    last_idx, indicating the page N.  Then, we go forward from
+			 *    the page F up to (but excluding) page N that have the same
+			 *    index as the page F.
+			 *
+			 * 2) EndOfLog is not on a page boundary:  Initialize all the pages
+			 *    but the page "L" having the last record. The page L is to be
+			 *    initialized by the following "Tricky point", including its
+			 *    content.
+			 *
+			 * In either case, XLogCtl->InitializedUpTo is to be initialized in
+			 * the following "Tricky" if-else block.
+			 */
+
+			last_idx = XLogRecPtrToBufIdx(EndOfLog);
+
+			ptr = discardedUpTo;
+			for (idx = XLogRecPtrToBufIdx(ptr); idx != last_idx;
+				 idx = NextBufIdx(idx))
+			{
+				ptr += XLOG_BLCKSZ;
+				XLogCtl->xlblocks[idx] = ptr;
+			}
+		}
+	}
+
 	/*
-	 * Tricky point here: readBuf contains the *last* block that the LastRec
-	 * record spans, not the one it starts in.  The last block is indeed the
-	 * one we want to use.
+	 * Tricky point here: readBuf contains the *last* block that the
+	 * LastRec record spans, not the one it starts in.  The last block is
+	 * indeed the one we want to use.
 	 */
 	if (EndOfLog % XLOG_BLCKSZ != 0)
 	{
@@ -7658,6 +8084,9 @@ StartupXLOG(void)
 		memcpy(page, xlogreader->readBuf, len);
 		memset(page + len, 0, XLOG_BLCKSZ - len);
 
+		if (NvwalAvail)
+			nv_persist(page, XLOG_BLCKSZ);
+
 		XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
 		XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
 	}
@@ -7671,12 +8100,54 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
-	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	if (NvwalAvail)
+	{
+		XLogRecPtr	SegBeginPtr;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+		/*
+		 * If NVWAL buffer is used, writing records out to segment files should
+		 * be done in segment by segment. So Logwrt{Rqst,Result} (and also
+		 * discardedUpTo) should be multiple of wal_segment_size.  Let's get
+		 * them back off to the last segment boundary.
+		 */
 
-	XLogCtl->LogwrtRqst.Write = EndOfLog;
-	XLogCtl->LogwrtRqst.Flush = EndOfLog;
+		SegBeginPtr = EndOfLog - (EndOfLog % wal_segment_size);
+		LogwrtResult.Write = LogwrtResult.Flush = SegBeginPtr;
+		XLogCtl->LogwrtResult = LogwrtResult;
+		XLogCtl->LogwrtRqst.Write = SegBeginPtr;
+		XLogCtl->LogwrtRqst.Flush = SegBeginPtr;
+
+		/*
+		 * persistentUpTo does not need to be multiple of wal_segment_size,
+		 * and should be drained-up-to LSN. walsender will use it to load
+		 * records from NVWAL buffer.
+		 */
+		XLogCtl->persistentUpTo = EndOfLog;
+
+		/* Update discardedUpTo in pg_control if still invalid */
+		if (ControlFile->discardedUpTo == InvalidXLogRecPtr)
+		{
+			LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+			ControlFile->discardedUpTo = SegBeginPtr;
+			UpdateControlFile();
+			LWLockRelease(ControlFileLock);
+		}
+
+		elog(DEBUG1, "EndOfLog: %X/%X",
+			 (uint32) (EndOfLog >> 32), (uint32) EndOfLog);
+
+		elog(DEBUG1, "SegBeginPtr: %X/%X",
+			 (uint32) (SegBeginPtr >> 32), (uint32) SegBeginPtr);
+	}
+	else
+	{
+		LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+
+		XLogCtl->LogwrtResult = LogwrtResult;
+
+		XLogCtl->LogwrtRqst.Write = EndOfLog;
+		XLogCtl->LogwrtRqst.Flush = EndOfLog;
+	}
 
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
@@ -7807,6 +8278,7 @@ StartupXLOG(void)
 				char		origpath[MAXPGPATH];
 				char		partialfname[MAXFNAMELEN];
 				char		partialpath[MAXPGPATH];
+				XLogRecPtr	discardedUpTo;
 
 				XLogFilePath(origpath, EndOfLogTLI, endLogSegNo, wal_segment_size);
 				snprintf(partialfname, MAXFNAMELEN, "%s.partial", origfname);
@@ -7818,6 +8290,53 @@ StartupXLOG(void)
 				 */
 				XLogArchiveCleanup(partialfname);
 
+				/*
+				 * If NVWAL is also used for archival recovery, write old
+				 * records out to segment files to archive them.  Note that we
+				 * need locks related to WAL because LocalXLogInsertAllowed
+				 * already got to -1.
+				 */
+				discardedUpTo = ControlFile->discardedUpTo;
+				if (NvwalAvail && discardedUpTo != InvalidXLogRecPtr &&
+					discardedUpTo < EndOfLog)
+				{
+					XLogwrtRqst WriteRqst;
+					TimeLineID	thisTLI = ThisTimeLineID;
+					XLogRecPtr	SegBeginPtr =
+						EndOfLog - (EndOfLog % wal_segment_size);
+
+					/*
+					 * XXX Assume that all the records have the same TLI.
+					 */
+					ThisTimeLineID = EndOfLogTLI;
+
+					WriteRqst.Write = EndOfLog;
+					WriteRqst.Flush = 0;
+
+					LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+					XLogWrite(WriteRqst, false);
+
+					/*
+					 * Force back-off to the last segment boundary.
+					 */
+					LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+					ControlFile->discardedUpTo = SegBeginPtr;
+					UpdateControlFile();
+					LWLockRelease(ControlFileLock);
+
+					LogwrtResult.Write = LogwrtResult.Flush = SegBeginPtr;
+
+					SpinLockAcquire(&XLogCtl->info_lck);
+					XLogCtl->LogwrtResult = LogwrtResult;
+					XLogCtl->LogwrtRqst.Write = SegBeginPtr;
+					XLogCtl->LogwrtRqst.Flush = SegBeginPtr;
+					SpinLockRelease(&XLogCtl->info_lck);
+
+					LWLockRelease(WALWriteLock);
+
+					ThisTimeLineID = thisTLI;
+				}
+
 				durable_rename(origpath, partialpath, ERROR);
 				XLogArchiveNotify(partialfname);
 			}
@@ -7827,7 +8346,10 @@ StartupXLOG(void)
 	/*
 	 * Preallocate additional log files, if wanted.
 	 */
-	PreallocXlogFiles(EndOfLog);
+	if (NvwalAvail)
+		PreallocNonVolatileXlogBuffer();
+	else
+		PreallocXlogFiles(EndOfLog);
 
 	/*
 	 * Okay, we're officially UP.
@@ -8371,10 +8893,24 @@ GetInsertRecPtr(void)
 /*
  * GetFlushRecPtr -- Returns the current flush position, ie, the last WAL
  * position known to be fsync'd to disk.
+ *
+ * If NVWAL is used, this returns the last persistent WAL position instead.
  */
 XLogRecPtr
 GetFlushRecPtr(void)
 {
+	if (NvwalAvail)
+	{
+		XLogRecPtr		ret;
+
+		SpinLockAcquire(&XLogCtl->info_lck);
+		LogwrtResult = XLogCtl->LogwrtResult;
+		ret = XLogCtl->persistentUpTo;
+		SpinLockRelease(&XLogCtl->info_lck);
+
+		return ret;
+	}
+
 	SpinLockAcquire(&XLogCtl->info_lck);
 	LogwrtResult = XLogCtl->LogwrtResult;
 	SpinLockRelease(&XLogCtl->info_lck);
@@ -8674,6 +9210,9 @@ CreateCheckPoint(int flags)
 	VirtualTransactionId *vxids;
 	int			nvxids;
 
+	/* for non-volatile WAL buffer */
+	XLogRecPtr	newDiscardedUpTo = 0;
+
 	/*
 	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
 	 * issued at a different time.
@@ -8985,6 +9524,22 @@ CreateCheckPoint(int flags)
 	 */
 	PriorRedoPtr = ControlFile->checkPointCopy.redo;
 
+	/*
+	 * If non-volatile WAL buffer is used, discardedUpTo should be updated and
+	 * persist on the control file. So the new value should be caluculated
+	 * here.
+	 *
+	 * TODO Do not copy and paste codes...
+	 */
+	if (NvwalAvail)
+	{
+		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
+		KeepLogSeg(recptr, &_logSegNo);
+		_logSegNo--;
+
+		newDiscardedUpTo = _logSegNo * wal_segment_size;
+	}
+
 	/*
 	 * Update the control file.
 	 */
@@ -8993,6 +9548,16 @@ CreateCheckPoint(int flags)
 		ControlFile->state = DB_SHUTDOWNED;
 	ControlFile->checkPoint = ProcLastRecPtr;
 	ControlFile->checkPointCopy = checkPoint;
+	if (NvwalAvail)
+	{
+		/*
+		 * A new value should not fall behind the old one.
+		 */
+		if (ControlFile->discardedUpTo < newDiscardedUpTo)
+			ControlFile->discardedUpTo = newDiscardedUpTo;
+		else
+			newDiscardedUpTo = ControlFile->discardedUpTo;
+	}
 	ControlFile->time = (pg_time_t) time(NULL);
 	/* crash recovery should always recover to the end of WAL */
 	ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
@@ -9010,6 +9575,44 @@ CreateCheckPoint(int flags)
 	UpdateControlFile();
 	LWLockRelease(ControlFileLock);
 
+	/*
+	 * If we use non-volatile XLOG buffer, update XLogCtl->Logwrt{Rqst,Result}
+	 * so that the XLOG records older than newDiscardedUpTo are treated as
+	 * "already written and flushed."
+	 */
+	if (NvwalAvail)
+	{
+		Assert(newDiscardedUpTo > 0);
+
+		/* Update process-local variables */
+		LogwrtResult.Write = LogwrtResult.Flush = newDiscardedUpTo;
+
+		/*
+		 * Update shared-memory variables. We need both light-weight lock and
+		 * spin lock to update them.
+		 */
+		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+		SpinLockAcquire(&XLogCtl->info_lck);
+
+		/*
+		 * Note that there can be a corner case that process-local
+		 * LogwrtResult falls behind shared XLogCtl->LogwrtResult if whole the
+		 * non-volatile XLOG buffer is filled and some pages are written out
+		 * to segment files between UpdateControlFile and LWLockAcquire above.
+		 *
+		 * TODO For now, we ignore that case because it can hardly occur.
+		 */
+		XLogCtl->LogwrtResult = LogwrtResult;
+
+		if (XLogCtl->LogwrtRqst.Write < newDiscardedUpTo)
+			XLogCtl->LogwrtRqst.Write = newDiscardedUpTo;
+		if (XLogCtl->LogwrtRqst.Flush < newDiscardedUpTo)
+			XLogCtl->LogwrtRqst.Flush = newDiscardedUpTo;
+
+		SpinLockRelease(&XLogCtl->info_lck);
+		LWLockRelease(WALWriteLock);
+	}
+
 	/* Update shared-memory copy of checkpoint XID/epoch */
 	SpinLockAcquire(&XLogCtl->info_lck);
 	XLogCtl->ckptFullXid = checkPoint.nextFullXid;
@@ -9033,21 +9636,31 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
-	/*
-	 * Delete old log files, those no longer needed for last checkpoint to
-	 * prevent the disk holding the xlog from growing full.
-	 */
-	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
-	KeepLogSeg(recptr, &_logSegNo);
-	_logSegNo--;
-	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
+	if (NvwalAvail)
+		RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
+	else
+	{
+		/*
+		 * Delete old log files, those no longer needed for last checkpoint to
+		 * prevent the disk holding the xlog from growing full.
+		 */
+		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
+		KeepLogSeg(recptr, &_logSegNo);
+		_logSegNo--;
+		RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
+	}
 
 	/*
 	 * Make more log segments if needed.  (Do this after recycling old log
 	 * segments, since that may supply some of the needed files.)
 	 */
 	if (!shutdown)
-		PreallocXlogFiles(recptr);
+	{
+		if (NvwalAvail)
+			PreallocNonVolatileXlogBuffer();
+		else
+			PreallocXlogFiles(recptr);
+	}
 
 	/*
 	 * Truncate pg_subtrans if possible.  We can throw away all data before
@@ -11651,6 +12264,76 @@ CancelBackup(void)
 	}
 }
 
+/*
+ * Is NVWAL used?
+ */
+bool
+IsNvwalAvail(void)
+{
+	return NvwalAvail;
+}
+
+/*
+ * Get a pointer to the *possibly* right location in the NVWAL buffer
+ * containing the target XLogRecPtr; NULL if the target have already been
+ * discarded.
+ *
+ * Note that the target would be discarded by checkpoint after this
+ * function returns.  The caller should check if the copied record has
+ * expected LSN.
+ */
+char *
+GetNvwalBuffer(XLogRecPtr target, Size *max_read)
+{
+	Size		off;
+	XLogRecPtr	discardedUpTo;
+
+	LWLockAcquire(ControlFileLock, LW_SHARED);
+	discardedUpTo = ControlFile->discardedUpTo;
+	LWLockRelease(ControlFileLock);
+
+	if (target < discardedUpTo)
+		return NULL;
+
+	off = target % NvwalSize;
+	*max_read = NvwalSize - off;
+	return XLogCtl->pages + off;
+}
+
+/*
+ * Returns the size we can load from NVWAL and sets nvwalptr to load-from LSN.
+ */
+Size
+GetLoadableSizeFromNvwal(XLogRecPtr target, Size count, XLogRecPtr *nvwalptr)
+{
+	XLogRecPtr	readUpTo;
+	XLogRecPtr	discardedUpTo;
+
+	Assert(IsNvwalAvail());
+	Assert(nvwalptr != NULL);
+
+	readUpTo = target + count;
+
+	LWLockAcquire(ControlFileLock, LW_SHARED);
+	discardedUpTo = ControlFile->discardedUpTo;
+	LWLockRelease(ControlFileLock);
+
+	/* Check if all the records are on WAL segment files */
+	if (readUpTo <= discardedUpTo)
+		return 0;
+
+	/* Check if all the records are on NVWAL */
+	if (discardedUpTo <= target)
+	{
+		*nvwalptr = target;
+		return count;
+	}
+
+	/* Some on WAL segment files, some on NVWAL */
+	*nvwalptr = discardedUpTo;
+	return (Size) (readUpTo - discardedUpTo);
+}
+
 /*
  * Read the XLOG page containing RecPtr into readBuf (if not read already).
  * Returns number of bytes read, if the page is read successfully, or -1
@@ -11718,7 +12401,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 
 retry:
 	/* See if we need to retrieve more data */
-	if (readFile < 0 ||
+	if ((readSource != XLOG_FROM_NVWAL && readFile < 0) ||
 		(readSource == XLOG_FROM_STREAM &&
 		 receivedUpto < targetPagePtr + reqLen))
 	{
@@ -11730,10 +12413,68 @@ retry:
 			if (readFile >= 0)
 				close(readFile);
 			readFile = -1;
-			readLen = 0;
-			readSource = 0;
 
-			return -1;
+			/*
+			 * Try non-volatile WAL buffer as last resort.
+			 *
+			 * XXX It is not supported yet on stanby mode.
+			 */
+			if (NvwalAvail && !StandbyMode && readSource != XLOG_FROM_STREAM)
+			{
+				XLogRecPtr	discardedUpTo;
+
+				elog(DEBUG1, "see if NVWAL has records to be UNDONE");
+
+				discardedUpTo = ControlFile->discardedUpTo;
+				if (discardedUpTo != InvalidXLogRecPtr &&
+					discardedUpTo <= targetPagePtr)
+				{
+					elog(DEBUG1, "recovering NVWAL");
+
+					/* Loading records from non-volatile WAL buffer */
+					currentSource = XLOG_FROM_NVWAL;
+					lastSourceFailed = false;
+
+					/* Report recovery progress in PS display */
+					set_ps_display("recovering NVWAL", false);
+
+					/* Track source of data */
+					readSource = XLOG_FROM_NVWAL;
+					XLogReceiptSource = XLOG_FROM_NVWAL;
+
+					/* Track receipt time */
+					XLogReceiptTime = GetCurrentTimestamp();
+
+					/*
+					 * Construct expectedTLEs.  This is necessary to recover
+					 * only from NVWAL because its filename does not have any
+					 * TLI information.
+					 */
+					if (!expectedTLEs)
+					{
+						TimeLineHistoryEntry *entry;
+
+						entry = (TimeLineHistoryEntry *) palloc(sizeof(TimeLineHistoryEntry));
+						entry->tli = recoveryTargetTLI;
+						entry->begin = entry->end = InvalidXLogRecPtr;
+
+						expectedTLEs = list_make1(entry);
+
+						elog(DEBUG1, "expectedTLEs: [%u]", (uint32) recoveryTargetTLI);
+					}
+				}
+			}
+			else
+				elog(DEBUG1, "do not recover NVWAL");
+
+			/* See if the try above succeeded or not */
+			if (readSource != XLOG_FROM_NVWAL)
+			{
+				readLen = 0;
+				readSource = 0;
+
+				return -1;
+			}
 		}
 	}
 
@@ -11741,7 +12482,7 @@ retry:
 	 * At this point, we have the right segment open and if we're streaming we
 	 * know the requested record is in it.
 	 */
-	Assert(readFile != -1);
+	Assert(readFile != -1 || readSource == XLOG_FROM_NVWAL);
 
 	/*
 	 * If the current segment is being streamed from master, calculate how
@@ -11760,41 +12501,60 @@ retry:
 	else
 		readLen = XLOG_BLCKSZ;
 
-	/* Read the requested page */
 	readOff = targetPageOff;
 
-	pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-	r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff);
-	if (r != XLOG_BLCKSZ)
+	if (currentSource == XLOG_FROM_NVWAL)
 	{
-		char		fname[MAXFNAMELEN];
-		int			save_errno = errno;
+		Size		offset = (Size) (targetPagePtr % NvwalSize);
+		char	   *readpos = XLogCtl->pages + offset;
 
+		Assert(readLen == XLOG_BLCKSZ);
+		Assert(offset % XLOG_BLCKSZ == 0);
+
+		/* Load the requested page from non-volatile WAL buffer */
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+		memcpy(readBuf, readpos, readLen);
 		pgstat_report_wait_end();
-		XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size);
-		if (r < 0)
+
+		/* There are not any other clues of TLI... */
+		*readTLI = ((XLogPageHeader) readBuf)->xlp_tli;
+	}
+	else
+	{
+		/* Read the requested page from file */
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+		r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff);
+		if (r != XLOG_BLCKSZ)
 		{
-			errno = save_errno;
-			ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u: %m",
-							fname, readOff)));
+			char		fname[MAXFNAMELEN];
+			int			save_errno = errno;
+
+			pgstat_report_wait_end();
+			XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size);
+			if (r < 0)
+			{
+				errno = save_errno;
+				ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
+						(errcode_for_file_access(),
+						 errmsg("could not read from log segment %s, offset %u: %m",
+								fname, readOff)));
+			}
+			else
+				ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+								fname, readOff, r, (Size) XLOG_BLCKSZ)));
+			goto next_record_is_invalid;
 		}
-		else
-			ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							fname, readOff, r, (Size) XLOG_BLCKSZ)));
-		goto next_record_is_invalid;
+		pgstat_report_wait_end();
+
+		*readTLI = curFileTLI;
 	}
-	pgstat_report_wait_end();
 
 	Assert(targetSegNo == readSegNo);
 	Assert(targetPageOff == readOff);
 	Assert(reqLen <= readLen);
 
-	*readTLI = curFileTLI;
-
 	/*
 	 * Check the page header immediately, so that we can retry immediately if
 	 * it's not valid. This may seem unnecessary, because XLogReadRecord()
@@ -11828,6 +12588,17 @@ retry:
 		goto next_record_is_invalid;
 	}
 
+	/*
+	 * Updating curFileTLI on each page verified if non-volatile WAL buffer
+	 * is used because there is no TimeLineID information in NVWAL's filename.
+	 */
+	if (readSource == XLOG_FROM_NVWAL &&
+		curFileTLI != xlogreader->latestPageTLI)
+	{
+		curFileTLI = xlogreader->latestPageTLI;
+		elog(DEBUG1, "curFileTLI: %u", curFileTLI);
+	}
+
 	return readLen;
 
 next_record_is_invalid:
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 92fa86fc9d..f2992b4a85 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2368,7 +2368,9 @@ XLogRead(char *buf, XLogRecPtr startptr, Size count)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
+	XLogRecPtr	recptr_nvwal = 0;
 	Size		nbytes;
+	Size		nbytes_nvwal = 0;
 	XLogSegNo	segno;
 
 retry:
@@ -2376,6 +2378,13 @@ retry:
 	recptr = startptr;
 	nbytes = count;
 
+	/* Try to load records directly from NVWAL if used */
+	if (IsNvwalAvail())
+	{
+		nbytes_nvwal = GetLoadableSizeFromNvwal(startptr, count, &recptr_nvwal);
+		nbytes = count - nbytes_nvwal;
+	}
+
 	while (nbytes > 0)
 	{
 		uint32		startoff;
@@ -2500,6 +2509,47 @@ retry:
 		p += readbytes;
 	}
 
+	/* Load records directly from NVWAL */
+	while (nbytes_nvwal > 0)
+	{
+		char	   *src;
+		Size		max_read = 0;
+		Size		readbytes;
+
+		Assert(IsNvwalAvail());
+
+		/*
+		 * Get the target address on NVWAL and the size we can load from it at
+		 * once because WAL buffer can rotate and we might have to load what we
+		 * want devided into two or more.
+		 *
+		 * Note that, in a rare case, some records on NVWAL might have been
+		 * already discarded.  We retry in such a case.
+		 */
+		src = GetNvwalBuffer(recptr_nvwal, &max_read);
+		if (src == NULL)
+		{
+			elog(WARNING, "some records on NVWAL had been discarded; retry");
+			goto retry;
+		}
+
+		if (nbytes_nvwal < max_read)
+			readbytes = nbytes_nvwal;
+		else
+			readbytes = max_read;
+
+		memcpy(p, src, readbytes);
+
+		/*
+		 * Update state for load.  Note that we do not need to update sendOff
+		 * because it indicates an offset in a segment file and we do not use
+		 * any segment file inside this loop.
+		 */
+		recptr_nvwal += readbytes;
+		nbytes_nvwal -= readbytes;
+		p += readbytes;
+	}
+
 	/*
 	 * After reading into the buffer, check that what we read was valid. We do
 	 * this after reading, because even though the segment was present when we
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index d955b97c0b..a47caefa99 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -280,6 +280,9 @@ main(int argc, char *argv[])
 		   ControlFile->checkPointCopy.oldestCommitTsXid);
 	printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
 		   ControlFile->checkPointCopy.newestCommitTsXid);
+	printf(_("discarded Up To:                      %X/%X\n"),
+		   (uint32) (ControlFile->discardedUpTo >> 32),
+		   (uint32) ControlFile->discardedUpTo);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index bc09fa104c..14efb904be 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -325,6 +325,12 @@ extern void XLogRequestWalReceiverReply(void);
 extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
 
+extern bool IsNvwalAvail(void);
+extern char *GetNvwalBuffer(XLogRecPtr target, Size *max_read);
+extern XLogRecPtr GetLoadableSizeFromNvwal(XLogRecPtr target,
+										   Size count,
+										   XLogRecPtr *nvwalptr);
+
 /*
  * Routines to start, stop, and get status of a base backup.
  */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index ff98d9e91a..04b1b94645 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -22,7 +22,7 @@
 
 
 /* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION	1201
+#define PG_CONTROL_VERSION	9901
 
 /* Nonce key length, see below */
 #define MOCK_AUTH_NONCE_LEN		32
@@ -132,6 +132,21 @@ typedef struct ControlFileData
 
 	XLogRecPtr	unloggedLSN;	/* current fake LSN value, for unlogged rels */
 
+	/*
+	 * Used for non-volatile WAL buffer (NVWAL).
+	 *
+	 * discardedUpTo is updated to the oldest LSN in the NVWAL when either a
+	 * checkpoint or a restartpoint is completed successfully, or whole the
+	 * NVWAL is filled with WAL records and a new record is being inserted.
+	 * This field tells that the NVWAL contains WAL records in the range of
+	 * [discardedUpTo, discardedUpTo+S), where S is the size of the NVWAL.
+	 * Note that the WAL records whose LSN are less than discardedUpTo would
+	 * remain in WAL segment files and be needed for recovery.
+	 *
+	 * It is set to zero when NVWAL is not used.
+	 */
+	XLogRecPtr	discardedUpTo;
+
 	/*
 	 * These two values determine the minimum point we must recover up to
 	 * before starting up:
-- 
2.20.1

