>From 0cb1f9197350d76ad8ef1fc2115afb7abdfc4fdc Mon Sep 17 00:00:00 2001
From: Takashi Menjo <takashi.menjou.vg@hco.ntt.co.jp>
Date: Wed, 24 Jun 2020 15:07:57 +0900
Subject: [PATCH v3 2/5] Non-volatile WAL buffer

Now external WAL buffer becomes non-volatile.

Bumps PG_CONTROL_VERSION.
---
 src/backend/access/transam/xlog.c            | 1154 ++++++++++++++++--
 src/backend/access/transam/xlogreader.c      |   24 +
 src/bin/pg_controldata/pg_controldata.c      |    3 +
 src/include/access/xlog.h                    |    8 +
 src/include/catalog/pg_control.h             |   17 +-
 src/test/regress/expected/misc_functions.out |   14 +-
 src/test/regress/sql/misc_functions.sql      |   14 +-
 7 files changed, 1097 insertions(+), 137 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0681ba1262..45e05b9498 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -654,6 +654,13 @@ typedef struct XLogCtlData
 	TimeLineID	ThisTimeLineID;
 	TimeLineID	PrevTimeLineID;
 
+	/*
+	 * Used for non-volatile WAL buffer (NVWAL).
+	 *
+	 * All the records up to this LSN are persistent in NVWAL.
+	 */
+	XLogRecPtr	persistentUpTo;
+
 	/*
 	 * SharedRecoveryState indicates if we're still in crash or archive
 	 * recovery.  Protected by info_lck.
@@ -783,11 +790,13 @@ typedef enum
 	XLOG_FROM_ANY = 0,			/* request to read WAL from any source */
 	XLOG_FROM_ARCHIVE,			/* restored using restore_command */
 	XLOG_FROM_PG_WAL,			/* existing file in pg_wal */
-	XLOG_FROM_STREAM			/* streamed from master */
+	XLOG_FROM_NVWAL,			/* non-volatile WAL buffer */
+	XLOG_FROM_STREAM,			/* streamed from master via segment file */
+	XLOG_FROM_STREAM_NVWAL		/* same as above, but via NVWAL */
 } XLogSource;
 
 /* human-readable names for XLogSources, for debugging output */
-static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
+static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "nvwal", "stream", "stream_nvwal"};
 
 /*
  * openLogFile is -1 or a kernel FD for an open log file segment.
@@ -922,6 +931,7 @@ static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 										bool fetching_ckpt, XLogRecPtr tliRecPtr);
 static int	emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
 static void XLogFileClose(void);
+static void PreallocNonVolatileXlogBuffer(void);
 static void PreallocXlogFiles(XLogRecPtr endptr);
 static void RemoveTempXlogFiles(void);
 static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecPtr endptr);
@@ -1204,6 +1214,43 @@ XLogInsertRecord(XLogRecData *rdata,
 		}
 	}
 
+	/*
+	 * Request a checkpoint here if non-volatile WAL buffer is used and we
+	 * have consumed too much WAL since the last checkpoint.
+	 *
+	 * We first screen under the condition (1) OR (2) below:
+	 *
+	 * (1) The record was the first one in a certain segment.
+	 * (2) The record was inserted across segments.
+	 *
+	 * We then check the segment number which the record was inserted into.
+	 */
+	if (NvwalAvail && inserted &&
+		(StartPos % wal_segment_size == SizeOfXLogLongPHD ||
+		 StartPos / wal_segment_size < EndPos / wal_segment_size))
+	{
+		XLogSegNo	end_segno;
+
+		XLByteToSeg(EndPos, end_segno, wal_segment_size);
+
+		/*
+		 * NOTE: We do not signal walsender here because the inserted record
+		 * have not drained by NVWAL buffer yet.
+		 *
+		 * NOTE: We do not signal walarchiver here because the inserted record
+		 * have not flushed to a segment file.  So we don't need to update
+		 * XLogCtl->lastSegSwitch{Time,LSN}, used only by CheckArchiveTimeout.
+		 */
+
+		/* Two-step checking for speed (see also XLogWrite) */
+		if (IsUnderPostmaster && XLogCheckpointNeeded(end_segno))
+		{
+			(void) GetRedoRecPtr();
+			if (XLogCheckpointNeeded(end_segno))
+				RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
+		}
+	}
+
 #ifdef WAL_DEBUG
 	if (XLOG_DEBUG)
 	{
@@ -2136,6 +2183,15 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 	XLogRecPtr	NewPageBeginPtr;
 	XLogPageHeader NewPage;
 	int			npages = 0;
+	bool		is_firstpage;
+
+	if (NvwalAvail)
+		elog(DEBUG1, "XLogCtl->InitializedUpTo %X/%X; upto %X/%X; opportunistic %s",
+			 (uint32) (XLogCtl->InitializedUpTo >> 32),
+			 (uint32) XLogCtl->InitializedUpTo,
+			 (uint32) (upto >> 32),
+			 (uint32) upto,
+			 opportunistic ? "true" : "false");
 
 	LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
 
@@ -2197,7 +2253,25 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 				{
 					/* Have to write it ourselves */
 					TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
-					WriteRqst.Write = OldPageRqstPtr;
+
+					if (NvwalAvail)
+					{
+						/*
+						 * If we use non-volatile WAL buffer, it is a special
+						 * but expected case to write the buffer pages out to
+						 * segment files, and for simplicity, it is done in
+						 * segment by segment.
+						 */
+						XLogRecPtr		OldSegEndPtr;
+
+						OldSegEndPtr = OldPageRqstPtr - XLOG_BLCKSZ + wal_segment_size;
+						Assert(OldSegEndPtr % wal_segment_size == 0);
+
+						WriteRqst.Write = OldSegEndPtr;
+					}
+					else
+						WriteRqst.Write = OldPageRqstPtr;
+
 					WriteRqst.Flush = 0;
 					XLogWrite(WriteRqst, false);
 					LWLockRelease(WALWriteLock);
@@ -2224,7 +2298,20 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		 * Be sure to re-zero the buffer so that bytes beyond what we've
 		 * written will look like zeroes and not valid XLOG records...
 		 */
-		MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
+		if (NvwalAvail)
+		{
+			/*
+			 * We do not take the way that combines MemSet() and pmem_persist()
+			 * because pmem_persist() may use slow and strong-ordered cache
+			 * flush instruction if weak-ordered fast one is not supported.
+			 * Instead, we first fill the buffer with zero by
+			 * pmem_memset_persist() that can leverage non-temporal fast store
+			 * instructions, then make the header persistent later.
+			 */
+			nv_memset_persist(NewPage, 0, XLOG_BLCKSZ);
+		}
+		else
+			MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
 
 		/*
 		 * Fill the new page's header
@@ -2256,7 +2343,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		/*
 		 * If first page of an XLOG segment file, make it a long header.
 		 */
-		if ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0)
+		is_firstpage = ((XLogSegmentOffset(NewPage->xlp_pageaddr, wal_segment_size)) == 0);
+		if (is_firstpage)
 		{
 			XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
 
@@ -2271,7 +2359,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		 * before the xlblocks update. GetXLogBuffer() reads xlblocks without
 		 * holding a lock.
 		 */
-		pg_write_barrier();
+		if (NvwalAvail)
+		{
+			/* Make the header persistent on PMEM */
+			nv_persist(NewPage, is_firstpage ? SizeOfXLogLongPHD : SizeOfXLogShortPHD);
+		}
+		else
+			pg_write_barrier();
 
 		*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
 
@@ -2281,6 +2375,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 	}
 	LWLockRelease(WALBufMappingLock);
 
+	if (NvwalAvail)
+		elog(DEBUG1, "ControlFile->discardedUpTo %X/%X; XLogCtl->InitializedUpTo %X/%X",
+			 (uint32) (ControlFile->discardedUpTo >> 32),
+			 (uint32) ControlFile->discardedUpTo,
+			 (uint32) (XLogCtl->InitializedUpTo >> 32),
+			 (uint32) XLogCtl->InitializedUpTo);
+
 #ifdef WAL_DEBUG
 	if (XLOG_DEBUG && npages > 0)
 	{
@@ -2662,6 +2763,23 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 		LogwrtResult.Flush = LogwrtResult.Write;
 	}
 
+	/*
+	 * Update discardedUpTo if NVWAL is used.  A new value should not fall
+	 * behind the old one.
+	 */
+	if (NvwalAvail)
+	{
+		Assert(LogwrtResult.Write == LogwrtResult.Flush);
+
+		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+		if (ControlFile->discardedUpTo < LogwrtResult.Write)
+		{
+			ControlFile->discardedUpTo = LogwrtResult.Write;
+			UpdateControlFile();
+		}
+		LWLockRelease(ControlFileLock);
+	}
+
 	/*
 	 * Update shared-memory status
 	 *
@@ -2866,6 +2984,123 @@ XLogFlush(XLogRecPtr record)
 		return;
 	}
 
+	if (NvwalAvail)
+	{
+		XLogRecPtr	FromPos;
+
+		/*
+		 * No page on the NVWAL is to be flushed to segment files.  Instead,
+		 * we wait all the insertions preceding this one complete.  We will
+		 * wait for all the records to be persistent on the NVWAL below.
+		 */
+		record = WaitXLogInsertionsToFinish(record);
+
+		/*
+		 * Check if another backend already have done what I am doing.
+		 *
+		 * We can compare something <= XLogCtl->persistentUpTo without
+		 * holding XLogCtl->info_lck spinlock because persistentUpTo is
+		 * monotonically increasing and can be loaded atomically on each
+		 * NVWAL-supported platform (now x64 only).
+		 */
+		FromPos = *((volatile XLogRecPtr *) &XLogCtl->persistentUpTo);
+		if (record <= FromPos)
+			return;
+
+		/*
+		 * In a very rare case, we rounded whole the NVWAL.  We do not need
+		 * to care old pages here because they already have been evicted to
+		 * segment files at record insertion.
+		 *
+		 * In such a case, we flush whole the NVWAL.  We also log it as
+		 * warning because it can be time-consuming operation.
+		 *
+		 * TODO Advance XLogCtl->persistentUpTo at the end of XLogWrite, and
+		 * we can remove the following first if-block.
+		 */
+		if (record - FromPos > NvwalSize)
+		{
+			elog(WARNING, "flush whole the NVWAL; FromPos %X/%X; record %X/%X",
+				 (uint32) (FromPos >> 32), (uint32) FromPos,
+				 (uint32) (record >> 32), (uint32) record);
+
+			nv_flush(XLogCtl->pages, NvwalSize);
+		}
+		else
+		{
+			char   *frompos;
+			char   *uptopos;
+			size_t	fromoff;
+			size_t	uptooff;
+
+			/*
+			 * Flush each record that is probably not flushed yet.
+			 *
+			 * We have two reasons why we say "probably".  The first is because
+			 * such a record copied with non-temporal store instruction has
+			 * already "flushed" but we cannot distinguish it.  nv_flush is
+			 * harmless for it in consistency.
+			 *
+			 * The second reason is that the target record might have already
+			 * been evicted to a segment file until now.  Also in this case,
+			 * nv_flush is harmless in consistency.
+			 */
+			uptooff = record % NvwalSize;
+			uptopos = XLogCtl->pages + uptooff;
+			fromoff = FromPos % NvwalSize;
+			frompos = XLogCtl->pages + fromoff;
+
+			/* Handles rotation */
+			if (uptopos <= frompos)
+			{
+				nv_flush(frompos, NvwalSize - fromoff);
+				fromoff = 0;
+				frompos = XLogCtl->pages;
+			}
+
+			nv_flush(frompos, uptooff - fromoff);
+		}
+
+		/*
+		 * To guarantee durability ("D" of ACID), we should satisfy the
+		 * following two for each transaction X:
+		 *
+		 *  (1) All the WAL records inserted by X, including the commit record
+		 *      of X, should persist on NVWAL before the server commits X.
+		 *
+		 *  (2) All the WAL records inserted by any other transactions than
+		 *      X, that have less LSN than the commit record just inserted
+		 *      by X, should persist on NVWAL before the server commits X.
+		 *
+		 * The (1) can be satisfied by a store barrier after the commit record
+		 * of X is flushed because each WAL record on X is already flushed in
+		 * the end of its insertion.  The (2) can be satisfied by waiting for
+		 * any record insertions that have less LSN than the commit record just
+		 * inserted by X, and by a store barrier as well.
+		 *
+		 * Now is the time.  Have a store barrier.
+		 */
+		nv_drain();
+
+		/*
+		 * Remember where the last persistent record is.  A new value should
+		 * not fall behind the old one.
+		 */
+		SpinLockAcquire(&XLogCtl->info_lck);
+		if (XLogCtl->persistentUpTo < record)
+			XLogCtl->persistentUpTo = record;
+		SpinLockRelease(&XLogCtl->info_lck);
+
+		/*
+		 * The records up to the returned "record" have been persisntent on
+		 * NVWAL.  Now signal walsenders.
+		 */
+		WalSndWakeupRequest();
+		WalSndWakeupProcessRequests();
+
+		return;
+	}
+
 	/* Quick exit if already known flushed */
 	if (record <= LogwrtResult.Flush)
 		return;
@@ -3049,6 +3284,13 @@ XLogBackgroundFlush(void)
 	if (RecoveryInProgress())
 		return false;
 
+	/*
+	 * Quick exit if NVWAL buffer is used and archiving is not active. In this
+	 * case, we need no WAL segment file in pg_wal directory.
+	 */
+	if (NvwalAvail && !XLogArchivingActive())
+		return false;
+
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
 	LogwrtResult = XLogCtl->LogwrtResult;
@@ -3067,6 +3309,18 @@ XLogBackgroundFlush(void)
 		flexible = false;		/* ensure it all gets written */
 	}
 
+	/*
+	 * If NVWAL is used, back off to the last compeleted segment boundary
+	 * for writing the buffer page to files in segment by segment.  We do so
+	 * nowhere but here after XLogCtl->asyncXactLSN is loaded because it
+	 * should be considered.
+	 */
+	if (NvwalAvail)
+	{
+		WriteRqst.Write -= WriteRqst.Write % wal_segment_size;
+		flexible = false;		/* ensure it all gets written */
+	}
+
 	/*
 	 * If already known flushed, we're done. Just need to check if we are
 	 * holding an open file handle to a logfile that's no longer in use,
@@ -3093,7 +3347,12 @@ XLogBackgroundFlush(void)
 	flushbytes =
 		WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
 
-	if (WalWriterFlushAfter == 0 || lastflush == 0)
+	if (NvwalAvail)
+	{
+		WriteRqst.Flush = WriteRqst.Write;
+		lastflush = now;
+	}
+	else if (WalWriterFlushAfter == 0 || lastflush == 0)
 	{
 		/* first call, or block based limits disabled */
 		WriteRqst.Flush = WriteRqst.Write;
@@ -3152,7 +3411,28 @@ XLogBackgroundFlush(void)
 	 * Great, done. To take some work off the critical path, try to initialize
 	 * as many of the no-longer-needed WAL buffers for future use as we can.
 	 */
-	AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
+	if (NvwalAvail && max_wal_senders == 0)
+	{
+		XLogRecPtr		upto;
+
+		/*
+		 * If NVWAL is used and there is no walsender, nobody is to load
+		 * segments on the buffer.  So let's recycle segments up to {where we
+		 * have requested to write and flush} + NvwalSize.
+		 *
+		 * Note that if NVWAL is used and a walsender seems running, we have to
+		 * do nothing; keep the written pages on the buffer for walsenders to be
+		 * loaded from the buffer, not from the segment files.  Note that the
+		 * buffer pages are eventually to be recycled by checkpoint.
+		 */
+		Assert(WriteRqst.Write == WriteRqst.Flush);
+		Assert(WriteRqst.Write % wal_segment_size == 0);
+
+		upto = WriteRqst.Write + NvwalSize;
+		AdvanceXLInsertBuffer(upto - XLOG_BLCKSZ, false);
+	}
+	else
+		AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
 
 	/*
 	 * If we determined that we need to write data, but somebody else
@@ -3885,6 +4165,43 @@ XLogFileClose(void)
 	ReleaseExternalFD();
 }
 
+/*
+ * Preallocate non-volatile XLOG buffers.
+ *
+ * This zeroes buffers and prepare page headers up to
+ * ControlFile->discardedUpTo + S, where S is the total size of
+ * the non-volatile XLOG buffers.
+ *
+ * It is caller's responsibility to update ControlFile->discardedUpTo
+ * and to set XLogCtl->InitializedUpTo properly.
+ */
+static void
+PreallocNonVolatileXlogBuffer(void)
+{
+	XLogRecPtr	newupto,
+				InitializedUpTo;
+
+	Assert(NvwalAvail);
+
+	LWLockAcquire(ControlFileLock, LW_SHARED);
+	newupto = ControlFile->discardedUpTo;
+	LWLockRelease(ControlFileLock);
+
+	InitializedUpTo = XLogCtl->InitializedUpTo;
+
+	newupto += NvwalSize;
+	Assert(newupto % wal_segment_size == 0);
+
+	if (newupto <= InitializedUpTo)
+		return;
+
+	/*
+	 * Subtracting XLOG_BLCKSZ is important, because AdvanceXLInsertBuffer
+	 * handles the first argument as the beginning of pages, not the end.
+	 */
+	AdvanceXLInsertBuffer(newupto - XLOG_BLCKSZ, false);
+}
+
 /*
  * Preallocate log files beyond the specified log endpoint.
  *
@@ -4181,8 +4498,11 @@ RemoveXlogFile(const char *segname, XLogRecPtr lastredoptr, XLogRecPtr endptr)
 	 * Before deleting the file, see if it can be recycled as a future log
 	 * segment. Only recycle normal files, pg_standby for example can create
 	 * symbolic links pointing to a separate archive directory.
+	 *
+	 * If NVWAL buffer is used, a log segment file is never to be recycled
+	 * (that is, always go into else block).
 	 */
-	if (wal_recycle &&
+	if (!NvwalAvail && wal_recycle &&
 		endlogSegNo <= recycleSegNo &&
 		lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
 		InstallXLogFileSegment(&endlogSegNo, path,
@@ -4600,6 +4920,7 @@ InitControlFile(uint64 sysidentifier)
 	memcpy(ControlFile->mock_authentication_nonce, mock_auth_nonce, MOCK_AUTH_NONCE_LEN);
 	ControlFile->state = DB_SHUTDOWNED;
 	ControlFile->unloggedLSN = FirstNormalUnloggedLSN;
+	ControlFile->discardedUpTo = (NvwalAvail) ? wal_segment_size : InvalidXLogRecPtr;
 
 	/* Set important parameter values for use when replaying WAL */
 	ControlFile->MaxConnections = MaxConnections;
@@ -5430,41 +5751,58 @@ BootStrapXLOG(void)
 	record->xl_crc = crc;
 
 	/* Create first XLOG segment file */
-	use_existent = false;
-	openLogFile = XLogFileInit(1, &use_existent, false);
+	if (NvwalAvail)
+	{
+		pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE);
+		nv_memcpy_nodrain(XLogCtl->pages + wal_segment_size, page, XLOG_BLCKSZ);
+		pgstat_report_wait_end();
 
-	/*
-	 * We needn't bother with Reserve/ReleaseExternalFD here, since we'll
-	 * close the file again in a moment.
-	 */
+		pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC);
+		nv_drain();
+		pgstat_report_wait_end();
 
-	/* Write the first page with the initial record */
-	errno = 0;
-	pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE);
-	if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
-	{
-		/* if write didn't set errno, assume problem is no disk space */
-		if (errno == 0)
-			errno = ENOSPC;
-		ereport(PANIC,
-				(errcode_for_file_access(),
-				 errmsg("could not write bootstrap write-ahead log file: %m")));
+		/*
+		 * Other WAL stuffs will be initialized in startup process.
+		 */
 	}
-	pgstat_report_wait_end();
+	else
+	{
+		use_existent = false;
+		openLogFile = XLogFileInit(1, &use_existent, false);
 
-	pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC);
-	if (pg_fsync(openLogFile) != 0)
-		ereport(PANIC,
-				(errcode_for_file_access(),
-				 errmsg("could not fsync bootstrap write-ahead log file: %m")));
-	pgstat_report_wait_end();
+		/*
+		 * We needn't bother with Reserve/ReleaseExternalFD here, since we'll
+		 * close the file again in a moment.
+		 */
 
-	if (close(openLogFile) != 0)
-		ereport(PANIC,
-				(errcode_for_file_access(),
-				 errmsg("could not close bootstrap write-ahead log file: %m")));
+		/* Write the first page with the initial record */
+		errno = 0;
+		pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_WRITE);
+		if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+		{
+			/* if write didn't set errno, assume problem is no disk space */
+			if (errno == 0)
+				errno = ENOSPC;
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not write bootstrap write-ahead log file: %m")));
+		}
+		pgstat_report_wait_end();
 
-	openLogFile = -1;
+		pgstat_report_wait_start(WAIT_EVENT_WAL_BOOTSTRAP_SYNC);
+		if (pg_fsync(openLogFile) != 0)
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not fsync bootstrap write-ahead log file: %m")));
+		pgstat_report_wait_end();
+
+		if (close(openLogFile) != 0)
+			ereport(PANIC,
+					(errcode_for_file_access(),
+					 errmsg("could not close bootstrap write-ahead log file: %m")));
+
+		openLogFile = -1;
+	}
 
 	/* Now create pg_control */
 	InitControlFile(sysidentifier);
@@ -5718,41 +6056,47 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
 	 * happens in the middle of a segment, copy data from the last WAL segment
 	 * of the old timeline up to the switch point, to the starting WAL segment
 	 * on the new timeline.
+	 *
+	 * If non-volatile WAL buffer is used, no new segment file is created. Data
+	 * up to the switch point will be copied into NVWAL buffer by StartupXLOG().
 	 */
-	if (endLogSegNo == startLogSegNo)
-	{
-		/*
-		 * Make a copy of the file on the new timeline.
-		 *
-		 * Writing WAL isn't allowed yet, so there are no locking
-		 * considerations. But we should be just as tense as XLogFileInit to
-		 * avoid emplacing a bogus file.
-		 */
-		XLogFileCopy(endLogSegNo, endTLI, endLogSegNo,
-					 XLogSegmentOffset(endOfLog, wal_segment_size));
-	}
-	else
+	if (!NvwalAvail)
 	{
-		/*
-		 * The switch happened at a segment boundary, so just create the next
-		 * segment on the new timeline.
-		 */
-		bool		use_existent = true;
-		int			fd;
+		if (endLogSegNo == startLogSegNo)
+		{
+			/*
+			 * Make a copy of the file on the new timeline.
+			 *
+			 * Writing WAL isn't allowed yet, so there are no locking
+			 * considerations. But we should be just as tense as XLogFileInit to
+			 * avoid emplacing a bogus file.
+			 */
+			XLogFileCopy(endLogSegNo, endTLI, endLogSegNo,
+						 XLogSegmentOffset(endOfLog, wal_segment_size));
+		}
+		else
+		{
+			/*
+			 * The switch happened at a segment boundary, so just create the next
+			 * segment on the new timeline.
+			 */
+			bool		use_existent = true;
+			int			fd;
 
-		fd = XLogFileInit(startLogSegNo, &use_existent, true);
+			fd = XLogFileInit(startLogSegNo, &use_existent, true);
 
-		if (close(fd) != 0)
-		{
-			char		xlogfname[MAXFNAMELEN];
-			int			save_errno = errno;
+			if (close(fd) != 0)
+			{
+				char		xlogfname[MAXFNAMELEN];
+				int			save_errno = errno;
 
-			XLogFileName(xlogfname, ThisTimeLineID, startLogSegNo,
-						 wal_segment_size);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not close file \"%s\": %m", xlogfname)));
+				XLogFileName(xlogfname, ThisTimeLineID, startLogSegNo,
+							 wal_segment_size);
+				errno = save_errno;
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not close file \"%s\": %m", xlogfname)));
+			}
 		}
 	}
 
@@ -7009,6 +7353,11 @@ StartupXLOG(void)
 		InRecovery = true;
 	}
 
+	/* Dump discardedUpTo just before REDO */
+	elog(LOG, "ControlFile->discardedUpTo %X/%X",
+		 (uint32) (ControlFile->discardedUpTo >> 32),
+		 (uint32) ControlFile->discardedUpTo);
+
 	/* REDO */
 	if (InRecovery)
 	{
@@ -7795,10 +8144,88 @@ StartupXLOG(void)
 	Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
 	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
 
+	if (NvwalAvail)
+	{
+		XLogRecPtr	discardedUpTo;
+
+		discardedUpTo = ControlFile->discardedUpTo;
+		Assert(discardedUpTo == InvalidXLogRecPtr ||
+			   discardedUpTo % wal_segment_size == 0);
+
+		if (discardedUpTo == InvalidXLogRecPtr)
+		{
+			elog(DEBUG1, "brand-new NVWAL");
+
+			/* The following "Tricky point" is to initialize the buffer */
+		}
+		else if (EndOfLog <= discardedUpTo)
+		{
+			elog(DEBUG1, "no record on NVWAL has been UNDONE");
+
+			LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+			ControlFile->discardedUpTo = InvalidXLogRecPtr;
+			UpdateControlFile();
+			LWLockRelease(ControlFileLock);
+
+			nv_memset_persist(XLogCtl->pages, 0, NvwalSize);
+
+			/* The following "Tricky point" is to initialize the buffer */
+		}
+		else
+		{
+			int			last_idx;
+			int			idx;
+			XLogRecPtr	ptr;
+
+			elog(DEBUG1, "some records on NVWAL have been UNDONE; keep them");
+
+			/*
+			 * Initialize xlblock array because we decided to keep UNDONE
+			 * records on NVWAL buffer; or each page on the buffer that meets
+			 * xlblocks == 0 (initialized as so by XLOGShmemInit) is to be
+			 * accidentally cleared by the following AdvanceXLInsertBuffer!
+			 *
+			 * Two cases can be considered:
+			 *
+			 * 1) EndOfLog is on a page boundary (divisible by XLOG_BLCKSZ):
+			 *    Initialize up to (and including) the page containing the last
+			 *    record.  That page should end with EndOfLog.  The one more
+			 *    next page "N" beginning with EndOfLog is to be untouched
+			 *    because, in such a very corner case that all the NVWAL
+			 *    buffer pages are already filled, page N is on the same
+			 *    location as the first page "F" beginning with discardedUpTo.
+			 *    Of cource we should not overwrite the page F.
+			 *
+			 *    In this case, we first get XLogRecPtrToBufIdx(EndOfLog) as
+			 *    last_idx, indicating the page N.  Then, we go forward from
+			 *    the page F up to (but excluding) page N that have the same
+			 *    index as the page F.
+			 *
+			 * 2) EndOfLog is not on a page boundary:  Initialize all the pages
+			 *    but the page "L" having the last record. The page L is to be
+			 *    initialized by the following "Tricky point", including its
+			 *    content.
+			 *
+			 * In either case, XLogCtl->InitializedUpTo is to be initialized in
+			 * the following "Tricky" if-else block.
+			 */
+
+			last_idx = XLogRecPtrToBufIdx(EndOfLog);
+
+			ptr = discardedUpTo;
+			for (idx = XLogRecPtrToBufIdx(ptr); idx != last_idx;
+				 idx = NextBufIdx(idx))
+			{
+				ptr += XLOG_BLCKSZ;
+				XLogCtl->xlblocks[idx] = ptr;
+			}
+		}
+	}
+
 	/*
-	 * Tricky point here: readBuf contains the *last* block that the LastRec
-	 * record spans, not the one it starts in.  The last block is indeed the
-	 * one we want to use.
+	 * Tricky point here: readBuf contains the *last* block that the
+	 * LastRec record spans, not the one it starts in.  The last block is
+	 * indeed the one we want to use.
 	 */
 	if (EndOfLog % XLOG_BLCKSZ != 0)
 	{
@@ -7818,6 +8245,9 @@ StartupXLOG(void)
 		memcpy(page, xlogreader->readBuf, len);
 		memset(page + len, 0, XLOG_BLCKSZ - len);
 
+		if (NvwalAvail)
+			nv_persist(page, XLOG_BLCKSZ);
+
 		XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
 		XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
 	}
@@ -7831,12 +8261,54 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
-	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	if (NvwalAvail)
+	{
+		XLogRecPtr	SegBeginPtr;
+
+		/*
+		 * If NVWAL buffer is used, writing records out to segment files should
+		 * be done in segment by segment. So Logwrt{Rqst,Result} (and also
+		 * discardedUpTo) should be multiple of wal_segment_size.  Let's get
+		 * them back off to the last segment boundary.
+		 */
+
+		SegBeginPtr = EndOfLog - (EndOfLog % wal_segment_size);
+		LogwrtResult.Write = LogwrtResult.Flush = SegBeginPtr;
+		XLogCtl->LogwrtResult = LogwrtResult;
+		XLogCtl->LogwrtRqst.Write = SegBeginPtr;
+		XLogCtl->LogwrtRqst.Flush = SegBeginPtr;
+
+		/*
+		 * persistentUpTo does not need to be multiple of wal_segment_size,
+		 * and should be drained-up-to LSN. walsender will use it to load
+		 * records from NVWAL buffer.
+		 */
+		XLogCtl->persistentUpTo = EndOfLog;
+
+		/* Update discardedUpTo in pg_control if still invalid */
+		if (ControlFile->discardedUpTo == InvalidXLogRecPtr)
+		{
+			LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+			ControlFile->discardedUpTo = SegBeginPtr;
+			UpdateControlFile();
+			LWLockRelease(ControlFileLock);
+		}
+
+		elog(DEBUG1, "EndOfLog: %X/%X",
+			 (uint32) (EndOfLog >> 32), (uint32) EndOfLog);
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+		elog(DEBUG1, "SegBeginPtr: %X/%X",
+			 (uint32) (SegBeginPtr >> 32), (uint32) SegBeginPtr);
+	}
+	else
+	{
+		LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->LogwrtRqst.Write = EndOfLog;
-	XLogCtl->LogwrtRqst.Flush = EndOfLog;
+		XLogCtl->LogwrtResult = LogwrtResult;
+
+		XLogCtl->LogwrtRqst.Write = EndOfLog;
+		XLogCtl->LogwrtRqst.Flush = EndOfLog;
+	}
 
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
@@ -7967,6 +8439,7 @@ StartupXLOG(void)
 				char		origpath[MAXPGPATH];
 				char		partialfname[MAXFNAMELEN];
 				char		partialpath[MAXPGPATH];
+				XLogRecPtr	discardedUpTo;
 
 				XLogFilePath(origpath, EndOfLogTLI, endLogSegNo, wal_segment_size);
 				snprintf(partialfname, MAXFNAMELEN, "%s.partial", origfname);
@@ -7978,6 +8451,53 @@ StartupXLOG(void)
 				 */
 				XLogArchiveCleanup(partialfname);
 
+				/*
+				 * If NVWAL is also used for archival recovery, write old
+				 * records out to segment files to archive them.  Note that we
+				 * need locks related to WAL because LocalXLogInsertAllowed
+				 * already got to -1.
+				 */
+				discardedUpTo = ControlFile->discardedUpTo;
+				if (NvwalAvail && discardedUpTo != InvalidXLogRecPtr &&
+					discardedUpTo < EndOfLog)
+				{
+					XLogwrtRqst WriteRqst;
+					TimeLineID	thisTLI = ThisTimeLineID;
+					XLogRecPtr	SegBeginPtr =
+						EndOfLog - (EndOfLog % wal_segment_size);
+
+					/*
+					 * XXX Assume that all the records have the same TLI.
+					 */
+					ThisTimeLineID = EndOfLogTLI;
+
+					WriteRqst.Write = EndOfLog;
+					WriteRqst.Flush = 0;
+
+					LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+					XLogWrite(WriteRqst, false);
+
+					/*
+					 * Force back-off to the last segment boundary.
+					 */
+					LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+					ControlFile->discardedUpTo = SegBeginPtr;
+					UpdateControlFile();
+					LWLockRelease(ControlFileLock);
+
+					LogwrtResult.Write = LogwrtResult.Flush = SegBeginPtr;
+
+					SpinLockAcquire(&XLogCtl->info_lck);
+					XLogCtl->LogwrtResult = LogwrtResult;
+					XLogCtl->LogwrtRqst.Write = SegBeginPtr;
+					XLogCtl->LogwrtRqst.Flush = SegBeginPtr;
+					SpinLockRelease(&XLogCtl->info_lck);
+
+					LWLockRelease(WALWriteLock);
+
+					ThisTimeLineID = thisTLI;
+				}
+
 				durable_rename(origpath, partialpath, ERROR);
 				XLogArchiveNotify(partialfname);
 			}
@@ -7987,7 +8507,10 @@ StartupXLOG(void)
 	/*
 	 * Preallocate additional log files, if wanted.
 	 */
-	PreallocXlogFiles(EndOfLog);
+	if (NvwalAvail)
+		PreallocNonVolatileXlogBuffer();
+	else
+		PreallocXlogFiles(EndOfLog);
 
 	/*
 	 * Okay, we're officially UP.
@@ -8550,10 +9073,24 @@ GetInsertRecPtr(void)
 /*
  * GetFlushRecPtr -- Returns the current flush position, ie, the last WAL
  * position known to be fsync'd to disk.
+ *
+ * If NVWAL is used, this returns the last persistent WAL position instead.
  */
 XLogRecPtr
 GetFlushRecPtr(void)
 {
+	if (NvwalAvail)
+	{
+		XLogRecPtr		ret;
+
+		SpinLockAcquire(&XLogCtl->info_lck);
+		LogwrtResult = XLogCtl->LogwrtResult;
+		ret = XLogCtl->persistentUpTo;
+		SpinLockRelease(&XLogCtl->info_lck);
+
+		return ret;
+	}
+
 	SpinLockAcquire(&XLogCtl->info_lck);
 	LogwrtResult = XLogCtl->LogwrtResult;
 	SpinLockRelease(&XLogCtl->info_lck);
@@ -8853,6 +9390,9 @@ CreateCheckPoint(int flags)
 	VirtualTransactionId *vxids;
 	int			nvxids;
 
+	/* for non-volatile WAL buffer */
+	XLogRecPtr	newDiscardedUpTo = 0;
+
 	/*
 	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
 	 * issued at a different time.
@@ -9164,6 +9704,22 @@ CreateCheckPoint(int flags)
 	 */
 	PriorRedoPtr = ControlFile->checkPointCopy.redo;
 
+	/*
+	 * If non-volatile WAL buffer is used, discardedUpTo should be updated and
+	 * persist on the control file. So the new value should be caluculated
+	 * here.
+	 *
+	 * TODO Do not copy and paste codes...
+	 */
+	if (NvwalAvail)
+	{
+		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
+		KeepLogSeg(recptr, &_logSegNo);
+		_logSegNo--;
+
+		newDiscardedUpTo = _logSegNo * wal_segment_size;
+	}
+
 	/*
 	 * Update the control file.
 	 */
@@ -9172,6 +9728,16 @@ CreateCheckPoint(int flags)
 		ControlFile->state = DB_SHUTDOWNED;
 	ControlFile->checkPoint = ProcLastRecPtr;
 	ControlFile->checkPointCopy = checkPoint;
+	if (NvwalAvail)
+	{
+		/*
+		 * A new value should not fall behind the old one.
+		 */
+		if (ControlFile->discardedUpTo < newDiscardedUpTo)
+			ControlFile->discardedUpTo = newDiscardedUpTo;
+		else
+			newDiscardedUpTo = ControlFile->discardedUpTo;
+	}
 	ControlFile->time = (pg_time_t) time(NULL);
 	/* crash recovery should always recover to the end of WAL */
 	ControlFile->minRecoveryPoint = InvalidXLogRecPtr;
@@ -9189,6 +9755,44 @@ CreateCheckPoint(int flags)
 	UpdateControlFile();
 	LWLockRelease(ControlFileLock);
 
+	/*
+	 * If we use non-volatile XLOG buffer, update XLogCtl->Logwrt{Rqst,Result}
+	 * so that the XLOG records older than newDiscardedUpTo are treated as
+	 * "already written and flushed."
+	 */
+	if (NvwalAvail)
+	{
+		Assert(newDiscardedUpTo > 0);
+
+		/* Update process-local variables */
+		LogwrtResult.Write = LogwrtResult.Flush = newDiscardedUpTo;
+
+		/*
+		 * Update shared-memory variables. We need both light-weight lock and
+		 * spin lock to update them.
+		 */
+		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+		SpinLockAcquire(&XLogCtl->info_lck);
+
+		/*
+		 * Note that there can be a corner case that process-local
+		 * LogwrtResult falls behind shared XLogCtl->LogwrtResult if whole the
+		 * non-volatile XLOG buffer is filled and some pages are written out
+		 * to segment files between UpdateControlFile and LWLockAcquire above.
+		 *
+		 * TODO For now, we ignore that case because it can hardly occur.
+		 */
+		XLogCtl->LogwrtResult = LogwrtResult;
+
+		if (XLogCtl->LogwrtRqst.Write < newDiscardedUpTo)
+			XLogCtl->LogwrtRqst.Write = newDiscardedUpTo;
+		if (XLogCtl->LogwrtRqst.Flush < newDiscardedUpTo)
+			XLogCtl->LogwrtRqst.Flush = newDiscardedUpTo;
+
+		SpinLockRelease(&XLogCtl->info_lck);
+		LWLockRelease(WALWriteLock);
+	}
+
 	/* Update shared-memory copy of checkpoint XID/epoch */
 	SpinLockAcquire(&XLogCtl->info_lck);
 	XLogCtl->ckptFullXid = checkPoint.nextFullXid;
@@ -9212,22 +9816,48 @@ CreateCheckPoint(int flags)
 	if (PriorRedoPtr != InvalidXLogRecPtr)
 		UpdateCheckPointDistanceEstimate(RedoRecPtr - PriorRedoPtr);
 
-	/*
-	 * Delete old log files, those no longer needed for last checkpoint to
-	 * prevent the disk holding the xlog from growing full.
-	 */
-	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
-	KeepLogSeg(recptr, &_logSegNo);
-	InvalidateObsoleteReplicationSlots(_logSegNo);
-	_logSegNo--;
-	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
+	if (NvwalAvail)
+	{
+		/*
+		 * We already set _logSegNo to the value equivalent to discardedUpTo.
+		 * We first increment it to call InvalidateObsoleteReplicationSlots.
+		 */
+		_logSegNo++;
+		InvalidateObsoleteReplicationSlots(_logSegNo);
+
+		/*
+		 * Then we decrement _logSegNo again to remove WAL segment files
+		 * having spilled out of non-volatile WAL buffer.
+		 *
+		 * Note that you should set wal_recycle to off to remove segment files.
+		 */
+		_logSegNo--;
+		RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
+	}
+	else
+	{
+		/*
+		 * Delete old log files, those no longer needed for last checkpoint to
+		 * prevent the disk holding the xlog from growing full.
+		 */
+		XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
+		KeepLogSeg(recptr, &_logSegNo);
+		InvalidateObsoleteReplicationSlots(_logSegNo);
+		_logSegNo--;
+		RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
+	}
 
 	/*
 	 * Make more log segments if needed.  (Do this after recycling old log
 	 * segments, since that may supply some of the needed files.)
 	 */
 	if (!shutdown)
-		PreallocXlogFiles(recptr);
+	{
+		if (NvwalAvail)
+			PreallocNonVolatileXlogBuffer();
+		else
+			PreallocXlogFiles(recptr);
+	}
 
 	/*
 	 * Truncate pg_subtrans if possible.  We can throw away all data before
@@ -11971,6 +12601,170 @@ CancelBackup(void)
 	}
 }
 
+/*
+ * Is NVWAL used?
+ */
+bool
+IsNvwalAvail(void)
+{
+	return NvwalAvail;
+}
+
+/*
+ * Returns the size we can load from NVWAL and sets nvwalptr to load-from LSN.
+ */
+Size
+GetLoadableSizeFromNvwal(XLogRecPtr target, Size count, XLogRecPtr *nvwalptr)
+{
+	XLogRecPtr	readUpTo;
+	XLogRecPtr	discardedUpTo;
+
+	Assert(IsNvwalAvail());
+	Assert(nvwalptr != NULL);
+
+	readUpTo = target + count;
+
+	LWLockAcquire(ControlFileLock, LW_SHARED);
+	discardedUpTo = ControlFile->discardedUpTo;
+	LWLockRelease(ControlFileLock);
+
+	/* Check if all the records are on WAL segment files */
+	if (readUpTo <= discardedUpTo)
+		return 0;
+
+	/* Check if all the records are on NVWAL */
+	if (discardedUpTo <= target)
+	{
+		*nvwalptr = target;
+		return count;
+	}
+
+	/* Some on WAL segment files, some on NVWAL */
+	*nvwalptr = discardedUpTo;
+	return (Size) (readUpTo - discardedUpTo);
+}
+
+/*
+ * It is like WALRead @ xlogreader.c, but loads from non-volatile WAL
+ * buffer.
+ */
+bool
+CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr)
+{
+	char	   *p;
+	XLogRecPtr	recptr;
+	Size		nbytes;
+
+	Assert(NvwalAvail);
+
+	p = buf;
+	recptr = startptr;
+	nbytes = count;
+
+	/*
+	 * Hold shared WALBufMappingLock to let others not rotate WAL buffer
+	 * while copying WAL records from it.  We do not need exclusive lock
+	 * because we will not rotate the buffer in this function.
+	 */
+	LWLockAcquire(WALBufMappingLock, LW_SHARED);
+
+	while (nbytes > 0)
+	{
+		char	   *q;
+		Size		off;
+		Size		max_copy;
+		Size		copybytes;
+		XLogRecPtr	discardedUpTo;
+
+		LWLockAcquire(ControlFileLock, LW_SHARED);
+		discardedUpTo = ControlFile->discardedUpTo;
+		LWLockRelease(ControlFileLock);
+
+		/* Check if the records we need have been already evicted or not */
+		if (recptr < discardedUpTo)
+		{
+			LWLockRelease(WALBufMappingLock);
+
+			/* TODO error handling? */
+			return false;
+		}
+
+		/*
+		 * Get the target address on non-volatile WAL buffer and the size we
+		 * can copy from it at once because the buffer can rotate and we
+		 * might have to copy what we want devided into two or more.
+		 */
+		off = recptr % NvwalSize;
+		q = XLogCtl->pages + off;
+		max_copy = NvwalSize - off;
+		copybytes = Min(nbytes, max_copy);
+
+		memcpy(p, q, copybytes);
+
+		/* Update state for copy */
+		recptr += copybytes;
+		nbytes -= copybytes;
+		p += copybytes;
+	}
+
+	LWLockRelease(WALBufMappingLock);
+	return true;
+}
+
+static bool
+IsXLogSourceFromStream(XLogSource source)
+{
+	switch (source)
+	{
+		case XLOG_FROM_STREAM:
+		case XLOG_FROM_STREAM_NVWAL:
+			return true;
+
+		default:
+			return false;
+	}
+}
+
+static bool
+IsXLogSourceFromNvwal(XLogSource source)
+{
+	switch (source)
+	{
+		case XLOG_FROM_NVWAL:
+		case XLOG_FROM_STREAM_NVWAL:
+			return true;
+
+		default:
+			return false;
+	}
+}
+
+static bool
+NeedsForMoreXLog(XLogRecPtr targetChunkEndPtr)
+{
+	switch (readSource)
+	{
+		case XLOG_FROM_ARCHIVE:
+		case XLOG_FROM_PG_WAL:
+			return (readFile < 0);
+
+		case XLOG_FROM_NVWAL:
+			Assert(NvwalAvail);
+			return false;
+
+		case XLOG_FROM_STREAM:
+			return (flushedUpto < targetChunkEndPtr);
+
+		case XLOG_FROM_STREAM_NVWAL:
+			Assert(NvwalAvail);
+			return (flushedUpto < targetChunkEndPtr);
+
+		default: /* XLOG_FROM_ANY */
+			Assert(readFile < 0);
+			return true;
+	}
+}
+
 /*
  * Read the XLOG page containing RecPtr into readBuf (if not read already).
  * Returns number of bytes read, if the page is read successfully, or -1
@@ -12012,7 +12806,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 	 * See if we need to switch to a new segment because the requested record
 	 * is not in the currently open one.
 	 */
-	if (readFile >= 0 &&
+	if ((readFile >= 0 || IsXLogSourceFromNvwal(readSource)) &&
 		!XLByteInSeg(targetPagePtr, readSegNo, wal_segment_size))
 	{
 		/*
@@ -12029,7 +12823,8 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 			}
 		}
 
-		close(readFile);
+		if (readFile >= 0)
+			close(readFile);
 		readFile = -1;
 		readSource = XLOG_FROM_ANY;
 	}
@@ -12038,9 +12833,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 
 retry:
 	/* See if we need to retrieve more data */
-	if (readFile < 0 ||
-		(readSource == XLOG_FROM_STREAM &&
-		 flushedUpto < targetPagePtr + reqLen))
+	if (NeedsForMoreXLog(targetPagePtr + reqLen))
 	{
 		if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
 										 private->randAccess,
@@ -12061,7 +12854,7 @@ retry:
 	 * At this point, we have the right segment open and if we're streaming we
 	 * know the requested record is in it.
 	 */
-	Assert(readFile != -1);
+	Assert(readFile != -1 || IsXLogSourceFromNvwal(readSource));
 
 	/*
 	 * If the current segment is being streamed from master, calculate how
@@ -12069,7 +12862,7 @@ retry:
 	 * requested record has been received, but this is for the benefit of
 	 * future calls, to allow quick exit at the top of this function.
 	 */
-	if (readSource == XLOG_FROM_STREAM)
+	if (IsXLogSourceFromStream(readSource))
 	{
 		if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ))
 			readLen = XLOG_BLCKSZ;
@@ -12080,41 +12873,59 @@ retry:
 	else
 		readLen = XLOG_BLCKSZ;
 
-	/* Read the requested page */
 	readOff = targetPageOff;
 
-	pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
-	r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff);
-	if (r != XLOG_BLCKSZ)
+	if (IsXLogSourceFromNvwal(readSource))
 	{
-		char		fname[MAXFNAMELEN];
-		int			save_errno = errno;
+		Size		offset = (Size) (targetPagePtr % NvwalSize);
+		char	   *readpos = XLogCtl->pages + offset;
+
+		Assert(offset % XLOG_BLCKSZ == 0);
 
+		/* Load the requested page from non-volatile WAL buffer */
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+		memcpy(readBuf, readpos, readLen);
 		pgstat_report_wait_end();
-		XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size);
-		if (r < 0)
+
+		/* There are not any other clues of TLI... */
+		xlogreader->seg.ws_tli = ((XLogPageHeader) readBuf)->xlp_tli;
+	}
+	else
+	{
+		/* Read the requested page from file */
+		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
+		r = pg_pread(readFile, readBuf, XLOG_BLCKSZ, (off_t) readOff);
+		if (r != XLOG_BLCKSZ)
 		{
-			errno = save_errno;
-			ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
-					(errcode_for_file_access(),
-					 errmsg("could not read from log segment %s, offset %u: %m",
-							fname, readOff)));
+			char		fname[MAXFNAMELEN];
+			int			save_errno = errno;
+
+			pgstat_report_wait_end();
+			XLogFileName(fname, curFileTLI, readSegNo, wal_segment_size);
+			if (r < 0)
+			{
+				errno = save_errno;
+				ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
+						(errcode_for_file_access(),
+						 errmsg("could not read from log segment %s, offset %u: %m",
+								fname, readOff)));
+			}
+			else
+				ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
+								fname, readOff, r, (Size) XLOG_BLCKSZ)));
+			goto next_record_is_invalid;
 		}
-		else
-			ereport(emode_for_corrupt_record(emode, targetPagePtr + reqLen),
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read from log segment %s, offset %u: read %d of %zu",
-							fname, readOff, r, (Size) XLOG_BLCKSZ)));
-		goto next_record_is_invalid;
+		pgstat_report_wait_end();
+
+		xlogreader->seg.ws_tli = curFileTLI;
 	}
-	pgstat_report_wait_end();
 
 	Assert(targetSegNo == readSegNo);
 	Assert(targetPageOff == readOff);
 	Assert(reqLen <= readLen);
 
-	xlogreader->seg.ws_tli = curFileTLI;
-
 	/*
 	 * Check the page header immediately, so that we can retry immediately if
 	 * it's not valid. This may seem unnecessary, because XLogReadRecord()
@@ -12148,6 +12959,17 @@ retry:
 		goto next_record_is_invalid;
 	}
 
+	/*
+	 * Updating curFileTLI on each page verified if non-volatile WAL buffer
+	 * is used because there is no TimeLineID information in NVWAL's filename.
+	 */
+	if (IsXLogSourceFromNvwal(readSource) &&
+		curFileTLI != xlogreader->latestPageTLI)
+	{
+		curFileTLI = xlogreader->latestPageTLI;
+		elog(DEBUG1, "curFileTLI: %u", curFileTLI);
+	}
+
 	return readLen;
 
 next_record_is_invalid:
@@ -12229,7 +13051,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 	if (!InArchiveRecovery)
 		currentSource = XLOG_FROM_PG_WAL;
 	else if (currentSource == XLOG_FROM_ANY ||
-			 (!StandbyMode && currentSource == XLOG_FROM_STREAM))
+			 (!StandbyMode && IsXLogSourceFromStream(currentSource)))
 	{
 		lastSourceFailed = false;
 		currentSource = XLOG_FROM_ARCHIVE;
@@ -12252,6 +13074,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 			{
 				case XLOG_FROM_ARCHIVE:
 				case XLOG_FROM_PG_WAL:
+				case XLOG_FROM_NVWAL:
 
 					/*
 					 * Check to see if the trigger file exists. Note that we
@@ -12265,6 +13088,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 						return false;
 					}
 
+					/* Try NVWAL if available */
+					if (NvwalAvail && currentSource != XLOG_FROM_NVWAL)
+					{
+						currentSource = XLOG_FROM_NVWAL;
+						break;
+					}
+
 					/*
 					 * Not in standby mode, and we've now tried the archive
 					 * and pg_wal.
@@ -12276,11 +13106,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 * Move to XLOG_FROM_STREAM state, and set to start a
 					 * walreceiver if necessary.
 					 */
-					currentSource = XLOG_FROM_STREAM;
+					if (currentSource == XLOG_FROM_NVWAL)
+						currentSource = XLOG_FROM_STREAM_NVWAL;
+					else
+						currentSource = XLOG_FROM_STREAM;
 					startWalReceiver = true;
 					break;
 
 				case XLOG_FROM_STREAM:
+				case XLOG_FROM_STREAM_NVWAL:
 
 					/*
 					 * Failure while streaming. Most likely, we got here
@@ -12386,6 +13220,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 		{
 			case XLOG_FROM_ARCHIVE:
 			case XLOG_FROM_PG_WAL:
+			case XLOG_FROM_NVWAL:
 
 				/*
 				 * WAL receiver must not be running when reading WAL from
@@ -12403,6 +13238,59 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 				if (randAccess)
 					curFileTLI = 0;
 
+				/* Try to load from NVWAL */
+				if (currentSource == XLOG_FROM_NVWAL)
+				{
+					XLogRecPtr		discardedUpTo;
+
+					Assert(NvwalAvail);
+
+					/*
+					 * Check if the target page exists on NVWAL.  Note that
+					 * RecPtr points to the end of the target chunk.
+					 *
+					 * TODO need ControlFileLock?
+					 */
+					discardedUpTo = ControlFile->discardedUpTo;
+					if (discardedUpTo != InvalidXLogRecPtr &&
+						discardedUpTo < RecPtr &&
+						RecPtr <= discardedUpTo + NvwalSize)
+					{
+						/* Report recovery progress in PS display */
+						set_ps_display("recovering NVWAL");
+						elog(DEBUG1, "recovering NVWAL");
+
+						/* Track source of data and receipt time */
+						readSource = XLOG_FROM_NVWAL;
+						XLogReceiptSource = XLOG_FROM_NVWAL;
+						XLogReceiptTime = GetCurrentTimestamp();
+
+						/*
+						 * Construct expectedTLEs.  This is necessary to
+						 * recover only from NVWAL because its filename does
+						 * not have any TLI information.
+						 */
+						if (!expectedTLEs)
+						{
+							TimeLineHistoryEntry	   *entry;
+
+							entry = palloc(sizeof(TimeLineHistoryEntry));
+							entry->tli = recoveryTargetTLI;
+							entry->begin = entry->end = InvalidXLogRecPtr;
+
+							expectedTLEs = list_make1(entry);
+							elog(DEBUG1, "expectedTLEs: [%u]",
+								 (uint32) recoveryTargetTLI);
+						}
+
+						return true;
+					}
+
+					/* Target page does not exist on NVWAL */
+					lastSourceFailed = true;
+					break;
+				}
+
 				/*
 				 * Try to restore the file from archive, or read an existing
 				 * file from pg_wal.
@@ -12420,6 +13308,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 				break;
 
 			case XLOG_FROM_STREAM:
+			case XLOG_FROM_STREAM_NVWAL:
 				{
 					bool		havedata;
 
@@ -12544,21 +13433,34 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 						 * info is set correctly and XLogReceiptTime isn't
 						 * changed.
 						 */
-						if (readFile < 0)
+						if (currentSource == XLOG_FROM_STREAM_NVWAL)
 						{
 							if (!expectedTLEs)
 								expectedTLEs = readTimeLineHistory(receiveTLI);
-							readFile = XLogFileRead(readSegNo, PANIC,
-													receiveTLI,
-													XLOG_FROM_STREAM, false);
-							Assert(readFile >= 0);
+
+							/* TODO is it ok to return, not to break switch? */
+							readSource = XLOG_FROM_STREAM_NVWAL;
+							XLogReceiptSource = XLOG_FROM_STREAM_NVWAL;
+							return true;
 						}
 						else
 						{
-							/* just make sure source info is correct... */
-							readSource = XLOG_FROM_STREAM;
-							XLogReceiptSource = XLOG_FROM_STREAM;
-							return true;
+							if (readFile < 0)
+							{
+								if (!expectedTLEs)
+									expectedTLEs = readTimeLineHistory(receiveTLI);
+								readFile = XLogFileRead(readSegNo, PANIC,
+														receiveTLI,
+														XLOG_FROM_STREAM, false);
+								Assert(readFile >= 0);
+							}
+							else
+							{
+								/* just make sure source info is correct... */
+								readSource = XLOG_FROM_STREAM;
+								XLogReceiptSource = XLOG_FROM_STREAM;
+								return true;
+							}
 						}
 						break;
 					}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index cb76be4f46..77f629fda2 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1066,11 +1066,24 @@ WALRead(XLogReaderState *state,
 	char	   *p;
 	XLogRecPtr	recptr;
 	Size		nbytes;
+#ifndef FRONTEND
+	XLogRecPtr	recptr_nvwal = 0;
+	Size		nbytes_nvwal = 0;
+#endif
 
 	p = buf;
 	recptr = startptr;
 	nbytes = count;
 
+#ifndef FRONTEND
+	/* Try to load records directly from NVWAL if used */
+	if (IsNvwalAvail())
+	{
+		nbytes_nvwal = GetLoadableSizeFromNvwal(startptr, count, &recptr_nvwal);
+		nbytes = count - nbytes_nvwal;
+	}
+#endif
+
 	while (nbytes > 0)
 	{
 		uint32		startoff;
@@ -1138,6 +1151,17 @@ WALRead(XLogReaderState *state,
 		p += readbytes;
 	}
 
+#ifndef FRONTEND
+	if (IsNvwalAvail())
+	{
+		if (!CopyXLogRecordsFromNVWAL(p, nbytes_nvwal, recptr_nvwal))
+		{
+			/* TODO graceful error handling */
+			elog(PANIC, "some records on NVWAL had been discarded");
+		}
+	}
+#endif
+
 	return true;
 }
 
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index e73639df74..4c594e915f 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -272,6 +272,9 @@ main(int argc, char *argv[])
 		   ControlFile->checkPointCopy.oldestCommitTsXid);
 	printf(_("Latest checkpoint's newestCommitTsXid:%u\n"),
 		   ControlFile->checkPointCopy.newestCommitTsXid);
+	printf(_("discarded Up To:                      %X/%X\n"),
+		   (uint32) (ControlFile->discardedUpTo >> 32),
+		   (uint32) ControlFile->discardedUpTo);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0a05e79524..75433a6dc0 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -351,6 +351,14 @@ extern void XLogRequestWalReceiverReply(void);
 extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
 
+extern bool IsNvwalAvail(void);
+extern XLogRecPtr GetLoadableSizeFromNvwal(XLogRecPtr target,
+										   Size count,
+										   XLogRecPtr *nvwalptr);
+extern bool CopyXLogRecordsFromNVWAL(char *buf,
+									 Size count,
+									 XLogRecPtr startptr);
+
 /*
  * Routines to start, stop, and get status of a base backup.
  */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index de5670e538..fe71992a69 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -22,7 +22,7 @@
 
 
 /* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION	1300
+#define PG_CONTROL_VERSION	1301
 
 /* Nonce key length, see below */
 #define MOCK_AUTH_NONCE_LEN		32
@@ -132,6 +132,21 @@ typedef struct ControlFileData
 
 	XLogRecPtr	unloggedLSN;	/* current fake LSN value, for unlogged rels */
 
+	/*
+	 * Used for non-volatile WAL buffer (NVWAL).
+	 *
+	 * discardedUpTo is updated to the oldest LSN in the NVWAL when either a
+	 * checkpoint or a restartpoint is completed successfully, or whole the
+	 * NVWAL is filled with WAL records and a new record is being inserted.
+	 * This field tells that the NVWAL contains WAL records in the range of
+	 * [discardedUpTo, discardedUpTo+S), where S is the size of the NVWAL.
+	 * Note that the WAL records whose LSN are less than discardedUpTo would
+	 * remain in WAL segment files and be needed for recovery.
+	 *
+	 * It is set to zero when NVWAL is not used.
+	 */
+	XLogRecPtr	discardedUpTo;
+
 	/*
 	 * These two values determine the minimum point we must recover up to
 	 * before starting up:
diff --git a/src/test/regress/expected/misc_functions.out b/src/test/regress/expected/misc_functions.out
index d3acb98d04..bbd47e1663 100644
--- a/src/test/regress/expected/misc_functions.out
+++ b/src/test/regress/expected/misc_functions.out
@@ -142,14 +142,17 @@ HINT:  No function matches the given name and argument types. You might need to
 select setting as segsize
 from pg_settings where name = 'wal_segment_size'
 \gset
-select count(*) > 0 as ok from pg_ls_waldir();
+select setting as nvwal_path
+from pg_settings where name = 'nvwal_path'
+\gset
+select count(*) > 0 or :'nvwal_path' <> '' as ok from pg_ls_waldir();
  ok 
 ----
  t
 (1 row)
 
 -- Test ProjectSet as well as FunctionScan
-select count(*) > 0 as ok from (select pg_ls_waldir()) ss;
+select count(*) > 0 or :'nvwal_path' <> '' as ok from (select pg_ls_waldir()) ss;
  ok 
 ----
  t
@@ -161,14 +164,15 @@ select * from pg_ls_waldir() limit 0;
 ------+------+--------------
 (0 rows)
 
-select count(*) > 0 as ok from (select * from pg_ls_waldir() limit 1) ss;
+select count(*) > 0 or :'nvwal_path' <> '' as ok from (select * from pg_ls_waldir() limit 1) ss;
  ok 
 ----
  t
 (1 row)
 
-select (w).size = :segsize as ok
-from (select pg_ls_waldir() w) ss where length((w).name) = 24 limit 1;
+select count(*) > 0 or :'nvwal_path' <> '' as ok from
+  (select * from pg_ls_waldir() w
+   where length((w).name) = 24 and (w).size = :segsize limit 1) ss;
  ok 
 ----
  t
diff --git a/src/test/regress/sql/misc_functions.sql b/src/test/regress/sql/misc_functions.sql
index 094e8f8296..09c326775d 100644
--- a/src/test/regress/sql/misc_functions.sql
+++ b/src/test/regress/sql/misc_functions.sql
@@ -39,15 +39,19 @@ SELECT num_nulls();
 select setting as segsize
 from pg_settings where name = 'wal_segment_size'
 \gset
+select setting as nvwal_path
+from pg_settings where name = 'nvwal_path'
+\gset
 
-select count(*) > 0 as ok from pg_ls_waldir();
+select count(*) > 0 or :'nvwal_path' <> '' as ok from pg_ls_waldir();
 -- Test ProjectSet as well as FunctionScan
-select count(*) > 0 as ok from (select pg_ls_waldir()) ss;
+select count(*) > 0 or :'nvwal_path' <> '' as ok from (select pg_ls_waldir()) ss;
 -- Test not-run-to-completion cases.
 select * from pg_ls_waldir() limit 0;
-select count(*) > 0 as ok from (select * from pg_ls_waldir() limit 1) ss;
-select (w).size = :segsize as ok
-from (select pg_ls_waldir() w) ss where length((w).name) = 24 limit 1;
+select count(*) > 0 or :'nvwal_path' <> '' as ok from (select * from pg_ls_waldir() limit 1) ss;
+select count(*) > 0 or :'nvwal_path' <> '' as ok from
+  (select * from pg_ls_waldir() w
+   where length((w).name) = 24 and (w).size = :segsize limit 1) ss;
 
 select count(*) >= 0 as ok from pg_ls_archive_statusdir();
 
-- 
2.17.1

