>From e3a4da834a79770c63c26c9859dc179911a37540 Mon Sep 17 00:00:00 2001
From: Takashi Menjo <takashi.menjou.vg@hco.ntt.co.jp>
Date: Wed, 24 Jun 2020 15:07:58 +0900
Subject: [PATCH v3 3/5] walreceiver supports non-volatile WAL buffer

Now walreceiver stores received records directly to non-volatile
WAL buffer if applicable.
---
 src/backend/access/transam/xlog.c     | 31 +++++++++++++++-
 src/backend/replication/walreceiver.c | 53 ++++++++++++++++++++++++++-
 src/include/access/xlog.h             |  4 ++
 3 files changed, 85 insertions(+), 3 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 45e05b9498..2a022be36a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -925,6 +925,8 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 static int	XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 						 XLogSource source, bool notfoundOk);
 static int	XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
+static bool CopyXLogRecordsOnNVWAL(char *buf, Size count, XLogRecPtr startptr,
+								   bool store);
 static int	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
 						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
@@ -12650,6 +12652,21 @@ GetLoadableSizeFromNvwal(XLogRecPtr target, Size count, XLogRecPtr *nvwalptr)
  */
 bool
 CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr)
+{
+	return CopyXLogRecordsOnNVWAL(buf, count, startptr, false);
+}
+
+/*
+ * Called by walreceiver.
+ */
+bool
+CopyXLogRecordsToNVWAL(char *buf, Size count, XLogRecPtr startptr)
+{
+	return CopyXLogRecordsOnNVWAL(buf, count, startptr, true);
+}
+
+static bool
+CopyXLogRecordsOnNVWAL(char *buf, Size count, XLogRecPtr startptr, bool store)
 {
 	char	   *p;
 	XLogRecPtr	recptr;
@@ -12699,7 +12716,13 @@ CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr)
 		max_copy = NvwalSize - off;
 		copybytes = Min(nbytes, max_copy);
 
-		memcpy(p, q, copybytes);
+		if (store)
+		{
+			memcpy(q, p, copybytes);
+			nv_flush(q, copybytes);
+		}
+		else
+			memcpy(p, q, copybytes);
 
 		/* Update state for copy */
 		recptr += copybytes;
@@ -12711,6 +12734,12 @@ CopyXLogRecordsFromNVWAL(char *buf, Size count, XLogRecPtr startptr)
 	return true;
 }
 
+void
+SyncNVWAL(void)
+{
+	nv_drain();
+}
+
 static bool
 IsXLogSourceFromStream(XLogSource source)
 {
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index d1ad75da87..20922ed230 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -130,6 +130,7 @@ static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *start
 static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
+static void XLogWalRcvStore(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(bool dying);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
@@ -856,7 +857,10 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
 
 				buf += hdrlen;
 				len -= hdrlen;
-				XLogWalRcvWrite(buf, len, dataStart);
+				if (IsNvwalAvail())
+					XLogWalRcvStore(buf, len, dataStart);
+				else
+					XLogWalRcvWrite(buf, len, dataStart);
 				break;
 			}
 		case 'k':				/* Keepalive */
@@ -991,6 +995,42 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 }
 
+/*
+ * Like XLogWalRcvWrite, but store to non-volatile WAL buffer.
+ */
+static void
+XLogWalRcvStore(char *buf, Size nbytes, XLogRecPtr recptr)
+{
+	Assert(IsNvwalAvail());
+
+	CopyXLogRecordsToNVWAL(buf, nbytes, recptr);
+
+	/*
+	 * Also write out to file if we have to archive segments.
+	 *
+	 * We could do this segment by segment but we reuse existing method to
+	 * do it record by record because the former gives us more complexity
+	 * (locking WalBufMappingLock, getting the address of the segment on
+	 * non-volatile WAL buffer, etc).
+	 */
+	if (XLogArchiveMode == ARCHIVE_MODE_ALWAYS)
+		XLogWalRcvWrite(buf, nbytes, recptr);
+	else
+	{
+		/*
+		 * Update status as like XLogWalRcvWrite does.
+		 */
+
+		/* Update process-local status */
+		XLByteToSeg(recptr + nbytes, recvSegNo, wal_segment_size);
+		recvFileTLI = ThisTimeLineID;
+		LogstreamResult.Write = recptr + nbytes;
+
+		/* Update shared-memory status */
+		pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
+	}
+}
+
 /*
  * Flush the log to disk.
  *
@@ -1004,7 +1044,16 @@ XLogWalRcvFlush(bool dying)
 	{
 		WalRcvData *walrcv = WalRcv;
 
-		issue_xlog_fsync(recvFile, recvSegNo);
+		/*
+		 * We should call both SyncNVWAL and issue_xlog_fsync if we use NVWAL
+		 * and WAL archive.  So we have the following two if-statements, not
+		 * one if-else-statement.
+		 */
+		if (IsNvwalAvail())
+			SyncNVWAL();
+
+		if (recvFile >= 0)
+			issue_xlog_fsync(recvFile, recvSegNo);
 
 		LogstreamResult.Flush = LogstreamResult.Write;
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 75433a6dc0..e6ca151271 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -358,6 +358,10 @@ extern XLogRecPtr GetLoadableSizeFromNvwal(XLogRecPtr target,
 extern bool CopyXLogRecordsFromNVWAL(char *buf,
 									 Size count,
 									 XLogRecPtr startptr);
+extern bool CopyXLogRecordsToNVWAL(char *buf,
+								   Size count,
+								   XLogRecPtr startptr);
+extern void SyncNVWAL(void);
 
 /*
  * Routines to start, stop, and get status of a base backup.
-- 
2.17.1

