From 6839ee902bcc4e725e2144b4aa2bcb125efd05eb Mon Sep 17 00:00:00 2001
From: Takashi Menjo <takashi.menjou.vg@hco.ntt.co.jp>
Date: Tue, 4 Aug 2020 13:03:02 +0900
Subject: [PATCH v4 3/3] Walreceiver WAL IO using PMDK

Author: Yoshimi Ichiyanagi <ichiyanagi.yoshimi@lab.ntt.co.jp>
---
 src/backend/replication/walreceiver.c | 62 ++++++++++++++++-----------
 1 file changed, 38 insertions(+), 24 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index d5a9b568a6..b7fbd841ae 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -68,6 +68,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/ipc.h"
+#include "storage/pmem.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
@@ -103,6 +104,8 @@ WalReceiverFunctionsType *WalReceiverFunctions = NULL;
 static int	recvFile = -1;
 static TimeLineID recvFileTLI = 0;
 static XLogSegNo recvSegNo = 0;
+static uint32 recvOff = 0;
+void	   *mappedFileAddr = NULL;
 
 /*
  * Flags set by interrupt handlers of walreceiver for later service in the
@@ -610,13 +613,13 @@ WalReceiverMain(void)
 		 * End of WAL reached on the requested timeline. Close the last
 		 * segment, and await for new orders from the startup process.
 		 */
-		if (recvFile >= 0)
+		if (recvFile >= 0 || mappedFileAddr != NULL)
 		{
 			char		xlogfname[MAXFNAMELEN];
 
 			XLogWalRcvFlush(false);
 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
-			if (close(recvFile) != 0)
+			if (do_XLogFileClose(recvFile, mappedFileAddr) != 0)
 				ereport(PANIC,
 						(errcode_for_file_access(),
 						 errmsg("could not close log segment %s: %m",
@@ -632,6 +635,7 @@ WalReceiverMain(void)
 				XLogArchiveNotify(xlogfname);
 		}
 		recvFile = -1;
+		mappedFileAddr = NULL;
 
 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
 		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
@@ -902,7 +906,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 	{
 		int			segbytes;
 
-		if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
+		if ((recvFile < 0 && mappedFileAddr == NULL) ||
+			!XLByteInSeg(recptr, recvSegNo, wal_segment_size))
 		{
 			bool		use_existent;
 
@@ -910,7 +915,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 			 * fsync() and close current file before we switch to next one. We
 			 * would otherwise have to reopen this file to fsync it later
 			 */
-			if (recvFile >= 0)
+			if (recvFile >= 0 || mappedFileAddr != NULL)
 			{
 				char		xlogfname[MAXFNAMELEN];
 
@@ -923,7 +928,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 				 * process soon, so we don't advise the OS to release cache
 				 * pages associated with the file like XLogFileClose() does.
 				 */
-				if (close(recvFile) != 0)
+				if (do_XLogFileClose(recvFile, mappedFileAddr) != 0)
 					ereport(PANIC,
 							(errcode_for_file_access(),
 							 errmsg("could not close log segment %s: %m",
@@ -939,11 +944,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 					XLogArchiveNotify(xlogfname);
 			}
 			recvFile = -1;
+			mappedFileAddr = NULL;
 
 			/* Create/use new log file */
 			XLByteToSeg(recptr, recvSegNo, wal_segment_size);
 			use_existent = true;
-			recvFile = XLogFileInit(recvSegNo, &use_existent, true);
+			recvFile = XLogFileInit(recvSegNo, &use_existent, true, &mappedFileAddr);
 			recvFileTLI = ThisTimeLineID;
 		}
 
@@ -955,27 +961,35 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
 		else
 			segbytes = nbytes;
 
-		/* OK to write the logs */
-		errno = 0;
-
-		byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
-		if (byteswritten <= 0)
+		if (mappedFileAddr)
+		{
+			PmemFileWrite((char *) mappedFileAddr + startoff, buf, segbytes);
+			byteswritten = segbytes;
+		}
+		else
 		{
-			char		xlogfname[MAXFNAMELEN];
-			int			save_errno;
+			/* OK to write the logs */
+			errno = 0;
+
+			byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
+			if (byteswritten <= 0)
+			{
+				char		xlogfname[MAXFNAMELEN];
+				int			save_errno;
 
-			/* if write didn't set errno, assume no disk space */
-			if (errno == 0)
-				errno = ENOSPC;
+				/* if write didn't set errno, assume no disk space */
+				if (errno == 0)
+					errno = ENOSPC;
 
-			save_errno = errno;
-			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
-			errno = save_errno;
-			ereport(PANIC,
-					(errcode_for_file_access(),
-					 errmsg("could not write to log segment %s "
-							"at offset %u, length %lu: %m",
-							xlogfname, startoff, (unsigned long) segbytes)));
+				save_errno = errno;
+				XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
+				errno = save_errno;
+				ereport(PANIC,
+						(errcode_for_file_access(),
+						 errmsg("could not write to log segment %s "
+								"at offset %u, length %lu: %m",
+								xlogfname, startoff, (unsigned long) segbytes)));
+			}
 		}
 
 		/* Update state for write */
-- 
2.25.1

