From 2d976f6b8ea424b3903fa71027b0abc19ce6e7c6 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Fri, 17 Aug 2018 16:00:39 +1200
Subject: [PATCH 2/3] Add an fsync request pipe for Windows.

On Windows, a pipe is the most natural replacement for a Unix domin
socket, but unfortunately pipes don't support multiplexing via
WSAEventSelect(), as used by our WaitEventSet machninery.  So use
"overlapped" IO, and add the ability to wait for IO completion to
WaitEventSet.  A new wait event flag WL_WIN32_HANDLE is provided
on Windows only, and used to wait for asynchronous read and write
operations over the checkpointer pipe.

XXX Could use serious review from a real Windows programmer.

Author: Thomas Munro
---
 src/backend/postmaster/checkpointer.c | 167 ++++++++++++++++++++++++--
 src/backend/postmaster/postmaster.c   |  80 ++++++++++--
 src/backend/storage/file/fd.c         |   4 +
 src/backend/storage/ipc/latch.c       |  12 ++
 src/include/postmaster/postmaster.h   |   4 +
 src/include/storage/fd.h              |   2 +
 src/include/storage/latch.h           |   1 +
 7 files changed, 250 insertions(+), 20 deletions(-)

diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 645a5a59e0c..4aa889f49ef 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -188,6 +188,11 @@ static void ReqCheckpointHandler(SIGNAL_ARGS);
 static void chkpt_sigusr1_handler(SIGNAL_ARGS);
 static void ReqShutdownHandler(SIGNAL_ARGS);
 
+#ifdef WIN32
+/* State used to track in-progress asynchronous fsync pipe reads. */
+static OVERLAPPED absorb_overlapped;
+static HANDLE *absorb_read_in_progress;
+#endif
 
 /*
  * Main entry point for checkpointer process
@@ -200,6 +205,7 @@ CheckpointerMain(void)
 {
 	sigjmp_buf	local_sigjmp_buf;
 	MemoryContext checkpointer_context;
+	WaitEventSet *wes;
 
 	CheckpointerShmem->checkpointer_pid = MyProcPid;
 
@@ -340,6 +346,21 @@ CheckpointerMain(void)
 	 */
 	ProcGlobal->checkpointerLatch = &MyProc->procLatch;
 
+	/* Create reusable WaitEventSet. */
+	wes = CreateWaitEventSet(TopMemoryContext, 3);
+	AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
+					  NULL);
+	AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+#ifndef WIN32
+	AddWaitEventToSet(wes, WL_SOCKET_READABLE, fsync_fds[FSYNC_FD_PROCESS],
+					  NULL, NULL);
+#else
+	absorb_overlapped.hEvent = CreateEvent(NULL, TRUE, TRUE,
+										   "fsync pipe read completion");
+	AddWaitEventToSet(wes, WL_WIN32_HANDLE, PGINVALID_SOCKET, NULL,
+					  &absorb_overlapped.hEvent);
+#endif
+
 	/*
 	 * Loop forever
 	 */
@@ -351,6 +372,7 @@ CheckpointerMain(void)
 		int			elapsed_secs;
 		int			cur_timeout;
 		int			rc;
+		WaitEvent	event;
 
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
@@ -551,17 +573,14 @@ CheckpointerMain(void)
 			cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);
 		}
 
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
-							   fsync_fds[FSYNC_FD_PROCESS],
-							   cur_timeout * 1000L /* convert to ms */ ,
-							   WAIT_EVENT_CHECKPOINTER_MAIN);
+		rc = WaitEventSetWait(wes, cur_timeout * 1000, &event, 1, 0);
+		Assert(rc > 0);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
 		 * necessity for manual cleanup of all postmaster children.
 		 */
-		if (rc & WL_POSTMASTER_DEATH)
+		if (event.events == WL_POSTMASTER_DEATH)
 			exit(1);
 	}
 }
@@ -1126,7 +1145,18 @@ ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno,
 	request.rnode = rnode;
 	request.forknum = forknum;
 	request.segno = segno;
+#ifndef WIN32
 	request.contains_fd = file != -1;
+#else
+	/*
+	 * For now we don't try to send duplicate handles to the checkpointer on
+	 * Windows.  That would be possible, but it's not clear whether it would
+	 * actually serve any useful purpose in that kernel without inside
+	 * knowledge of how it tracks errors.  The file will simply be reopened by
+	 * name when required by the checkpointer.
+	 */
+	request.contains_fd = false;
+#endif
 
 	/*
 	 * Tell the checkpointer the sequence number of the most recent open, so
@@ -1215,13 +1245,18 @@ AbsorbAllFsyncRequests(void)
 static bool
 AbsorbFsyncRequest(bool stop_at_current_cycle)
 {
-	CheckpointerRequest req;
-	int fd;
+	static CheckpointerRequest req;
+	int fd = -1;
+#ifndef WIN32
 	int ret;
+#else
+	DWORD bytes_read;
+#endif
 
 	ReleaseLruFiles();
 
 	START_CRIT_SECTION();
+#ifndef WIN32
 	ret = pg_uds_recv_with_fd(fsync_fds[FSYNC_FD_PROCESS], &req, sizeof(req), &fd);
 	if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
 	{
@@ -1230,6 +1265,51 @@ AbsorbFsyncRequest(bool stop_at_current_cycle)
 	}
 	else if (ret < 0)
 		elog(ERROR, "recvmsg failed: %m");
+#else
+	if (!absorb_read_in_progress)
+	{
+		if (!ReadFile(fsyncPipe[FSYNC_FD_PROCESS], &req, sizeof(req), &bytes_read,
+					  &absorb_overlapped))
+		{
+			if (GetLastError() != ERROR_IO_PENDING)
+			{
+				_dosmaperr(GetLastError());
+				elog(ERROR, "can't begin read from fsync pipe: %m");
+			}
+
+			/*
+			 * An asynchronous read has begun.  We'll tell caller to call us
+			 * back when the event indicates completion.
+			 */
+			absorb_read_in_progress = &absorb_overlapped.hEvent;
+			END_CRIT_SECTION();
+			return false;
+		}
+		/* The read completed synchronously.  'req' is now populated. */
+	}
+	if (absorb_read_in_progress)
+	{
+		/* Completed yet? */
+		if (!GetOverlappedResult(fsyncPipe[FSYNC_FD_PROCESS], &absorb_overlapped, &bytes_read,
+								 false))
+		{
+			if (GetLastError() == ERROR_IO_INCOMPLETE)
+			{
+				/* Nope.  Spurious event?  Tell caller to wait some more. */
+				END_CRIT_SECTION();
+				return false;
+			}
+			_dosmaperr(GetLastError());
+			elog(ERROR, "can't complete from fsync pipe: %m");
+		}
+		/* The asynchronous read completed.  'req' is now populated. */
+		absorb_read_in_progress = NULL;
+	}
+
+	/* Check message size. */
+	if (bytes_read != sizeof(req))
+		elog(ERROR, "unexpected short read on fsync pipe");
+#endif
 
 	if (req.contains_fd != (fd != -1))
 	{
@@ -1305,16 +1385,25 @@ CountBackendWrite(void)
 	pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_writes, 1);
 }
 
+/*
+ * Send a message to the checkpointer's fsync socket (Unix) or pipe (Windows).
+ * This is essentially a blocking call (there is no CHECK_FOR_INTERRUPTS, and
+ * even if there were it'd be surpressed since callers hold a lock), except
+ * that we don't ignore postmaster death so we need an event loop.
+ *
+ * The code is rather different on Windows, because there we have to do the
+ * write and then wait for it to complete, while on Unix we have to wait until
+ * we can do the write.
+ */
 static void
 SendFsyncRequest(CheckpointerRequest *request, int fd)
 {
+#ifndef WIN32
 	ssize_t ret;
 	int		rc;
 
 	while (true)
 	{
-		CHECK_FOR_INTERRUPTS();
-
 		ret = pg_uds_send_with_fd(fsync_fds[FSYNC_FD_SUBMIT], request, sizeof(*request),
 								  request->contains_fd ? fd : -1);
 
@@ -1341,7 +1430,7 @@ SendFsyncRequest(CheckpointerRequest *request, int fd)
 			 * only for that OS.
 			 */
 
-			/* blocked on write - wait for socket to become readable */
+			/* Blocked on write - wait for socket to become readable */
 			rc = WaitLatchOrSocket(NULL,
 								   WL_SOCKET_WRITEABLE | WL_POSTMASTER_DEATH,
 								   fsync_fds[FSYNC_FD_SUBMIT], -1, 0);
@@ -1351,4 +1440,60 @@ SendFsyncRequest(CheckpointerRequest *request, int fd)
 		else
 			ereport(FATAL, (errmsg("could not send fsync request: %m")));
 	}
+
+#else /* WIN32 */
+	{
+		OVERLAPPED overlapped = {0};
+		DWORD nwritten;
+		int rc;
+
+		overlapped.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
+
+		if (!WriteFile(fsyncPipe[FSYNC_FD_SUBMIT], request, sizeof(*request), &nwritten,
+					   &overlapped))
+		{
+			WaitEventSet *wes;
+			WaitEvent event;
+
+			/* Handle unexpected errors. */
+			if (GetLastError() != ERROR_IO_PENDING)
+			{
+				_dosmaperr(GetLastError());
+				CloseHandle(overlapped.hEvent);
+				ereport(FATAL, (errmsg("could not send fsync request: %m")));
+			}
+
+			/* Wait for asynchronous IO to complete. */
+			wes = CreateWaitEventSet(TopMemoryContext, 3);
+			AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL,
+							  NULL);
+			AddWaitEventToSet(wes, WL_WIN32_HANDLE, PGINVALID_SOCKET, NULL,
+							  &overlapped.hEvent);
+			for (;;)
+			{
+				rc = WaitEventSetWait(wes, -1, &event, 1, 0);
+				Assert(rc > 0);
+				if (event.events == WL_POSTMASTER_DEATH)
+					exit(1);
+				if (event.events == WL_WIN32_HANDLE)
+				{
+					if (!GetOverlappedResult(fsyncPipe[FSYNC_FD_SUBMIT], &overlapped,
+											 &nwritten, FALSE))
+					{
+						_dosmaperr(GetLastError());
+						CloseHandle(overlapped.hEvent);
+						ereport(FATAL, (errmsg("could not get result of sending fsync request: %m")));
+					}
+					if (nwritten > 0)
+						break;
+				}
+			}
+			FreeWaitEventSet(wes);
+		}
+
+		CloseHandle(overlapped.hEvent);
+		if (nwritten != sizeof(*request))
+			elog(FATAL, "unexpected short write to fsync request pipe");
+	}
+#endif
 }
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 5631a09fcb8..8ec71d13fa7 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -525,6 +525,7 @@ typedef struct
 	HANDLE		PostmasterHandle;
 	HANDLE		initial_signal_pipe;
 	HANDLE		syslogPipe[2];
+	HANDLE		fsyncPipe[2];
 #else
 	int			postmaster_alive_fds[2];
 	int			syslogPipe[2];
@@ -571,7 +572,11 @@ int			postmaster_alive_fds[2] = {-1, -1};
 HANDLE		PostmasterHandle;
 #endif
 
+#ifndef WIN32
 int			fsync_fds[2] = {-1, -1};
+#else
+HANDLE		fsyncPipe[2] = {0, 0};
+#endif
 
 /*
  * Postmaster main entry point
@@ -6004,7 +6009,8 @@ extern pg_time_t first_syslogger_file_time;
 #define write_inheritable_socket(dest, src, childpid) ((*(dest) = (src)), true)
 #define read_inheritable_socket(dest, src) (*(dest) = *(src))
 #else
-static bool write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE child);
+static bool write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE child,
+									bool close_source);
 static bool write_inheritable_socket(InheritableSocket *dest, SOCKET src,
 						 pid_t childPid);
 static void read_inheritable_socket(SOCKET *dest, InheritableSocket *src);
@@ -6068,7 +6074,15 @@ save_backend_variables(BackendParameters *param, Port *port,
 	param->PostmasterHandle = PostmasterHandle;
 	if (!write_duplicated_handle(&param->initial_signal_pipe,
 								 pgwin32_create_signal_listener(childPid),
-								 childProcess))
+								 childProcess, true))
+		return false;
+	if (!write_duplicated_handle(&param->fsyncPipe[0],
+								 fsyncPipe[0],
+								 childProcess, false))
+		return false;
+	if (!write_duplicated_handle(&param->fsyncPipe[1],
+								 fsyncPipe[1],
+								 childProcess, false))
 		return false;
 #else
 	memcpy(&param->postmaster_alive_fds, &postmaster_alive_fds,
@@ -6094,7 +6108,8 @@ save_backend_variables(BackendParameters *param, Port *port,
  * process instance of the handle to the parameter file.
  */
 static bool
-write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE childProcess)
+write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE childProcess,
+						bool close_source)
 {
 	HANDLE		hChild = INVALID_HANDLE_VALUE;
 
@@ -6104,7 +6119,8 @@ write_duplicated_handle(HANDLE *dest, HANDLE src, HANDLE childProcess)
 						 &hChild,
 						 0,
 						 TRUE,
-						 DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS))
+						 (close_source ? DUPLICATE_CLOSE_SOURCE : 0) |
+						 DUPLICATE_SAME_ACCESS))
 	{
 		ereport(LOG,
 				(errmsg_internal("could not duplicate handle to be written to backend parameter file: error code %lu",
@@ -6300,6 +6316,8 @@ restore_backend_variables(BackendParameters *param, Port *port)
 #ifdef WIN32
 	PostmasterHandle = param->PostmasterHandle;
 	pgwin32_initial_signal_pipe = param->initial_signal_pipe;
+	fsyncPipe[0] = param->fsyncPipe[0];
+	fsyncPipe[1] = param->fsyncPipe[1];
 #else
 	memcpy(&postmaster_alive_fds, &param->postmaster_alive_fds,
 		   sizeof(postmaster_alive_fds));
@@ -6486,11 +6504,12 @@ static void
 InitFsyncFdSocketPair(void)
 {
 	Assert(MyProcPid == PostmasterPid);
+
+#ifndef WIN32
 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, fsync_fds) < 0)
 		ereport(FATAL,
 				(errcode_for_file_access(),
 				 errmsg_internal("could not create fsync sockets: %m")));
-
 	/*
 	 * Set O_NONBLOCK on both fds.
 	 */
@@ -6515,9 +6534,52 @@ InitFsyncFdSocketPair(void)
 				(errcode_for_socket_access(),
 				 errmsg_internal("could not set fsync submit socket to close-on-exec mode: %m")));
 #endif
+#else
+	{
+		UCHAR		pipename[MAX_PATH];
+		SECURITY_ATTRIBUTES sa;
 
-	/*
-	 * FIXME: do DuplicateHandle dance for windows - can that work
-	 * trivially?
-	 */
+		memset(&sa, 0, sizeof(sa));
+
+		/*
+		 * We'll create a named pipe, because anonymous pipes don't allow
+		 * overlapped (= async) IO or message-orient communication.  We'll
+		 * open both ends of it here, and then duplicate them into all child
+		 * processes in save_backend_variables().  First, open the server end.
+		 */
+		snprintf(pipename, sizeof(pipename), "\\\\.\\Pipe\\fsync_pipe.%08x",
+				 GetCurrentProcessId());
+		fsyncPipe[FSYNC_FD_PROCESS] = CreateNamedPipeA(pipename,
+													   PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
+													   PIPE_TYPE_MESSAGE | PIPE_WAIT,
+													   1,
+													   4096,
+													   4096,
+													   -1,
+													   &sa);
+		if (!fsyncPipe[FSYNC_FD_PROCESS])
+		{
+			_dosmaperr(GetLastError());
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg_internal("could not create server end of fsync pipe: %m")));
+		}
+
+		/* Now open the client end. */
+		fsyncPipe[FSYNC_FD_SUBMIT] = CreateFileA(pipename,
+												 GENERIC_WRITE,
+												 0,
+												 &sa,
+												 OPEN_EXISTING,
+												 FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
+												 NULL);
+		if (!fsyncPipe[FSYNC_FD_SUBMIT])
+		{
+			_dosmaperr(GetLastError());
+			ereport(FATAL,
+					(errcode_for_file_access(),
+					 errmsg_internal("could not create client end of fsync pipe: %m")));
+		}
+	}
+#endif
 }
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 3dbfd0e4c06..d5c8328b5d6 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -3671,6 +3671,8 @@ MakePGDirectory(const char *directoryName)
 	return mkdir(directoryName, pg_dir_create_mode);
 }
 
+#ifndef WIN32
+
 /*
  * Send data over a unix domain socket, optionally (when fd != -1) including a
  * file descriptor.
@@ -3773,3 +3775,5 @@ pg_uds_recv_with_fd(int sock, void *buf, ssize_t bufsize, int *fd)
 
 	return size;
 }
+
+#endif
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index f6dda9cc9ac..081d399eefc 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -878,6 +878,12 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 	{
 		*handle = PostmasterHandle;
 	}
+#ifdef WIN32
+	else if (event->events == WL_WIN32_HANDLE)
+	{
+		*handle = *(HANDLE *)event->user_data;
+	}
+#endif
 	else
 	{
 		int			flags = FD_CLOSE;	/* always check for errors/EOF */
@@ -1453,6 +1459,12 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 			returned_events++;
 		}
 	}
+	else if (cur_event->events & WL_WIN32_HANDLE)
+	{
+		occurred_events->events |= WL_WIN32_HANDLE;
+		occurred_events++;
+		returned_events++;
+	}
 
 	return returned_events;
 }
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index e2ba64e8984..821fd2d1ad2 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -47,7 +47,11 @@ extern int	postmaster_alive_fds[2];
 #define FSYNC_FD_SUBMIT			0
 #define FSYNC_FD_PROCESS		1
 
+#ifndef WIN32
 extern int	fsync_fds[2];
+#else
+extern HANDLE fsyncPipe[2];
+#endif
 
 extern PGDLLIMPORT const char *progname;
 
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 2808b06613a..d952acf714e 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -148,8 +148,10 @@ extern void SyncDataDirectory(void);
 #define PG_TEMP_FILES_DIR "pgsql_tmp"
 #define PG_TEMP_FILE_PREFIX "pgsql_tmp"
 
+#ifndef WIN32
 /* XXX; This should probably go elsewhere */
 ssize_t pg_uds_send_with_fd(int sock, void *buf, ssize_t buflen, int fd);
 ssize_t pg_uds_recv_with_fd(int sock, void *buf, ssize_t bufsize, int *fd);
+#endif
 
 #endif							/* FD_H */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index fd8735b7f5f..a74eedfe4e9 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -128,6 +128,7 @@ typedef struct Latch
 #define WL_POSTMASTER_DEATH  (1 << 4)
 #ifdef WIN32
 #define WL_SOCKET_CONNECTED  (1 << 5)
+#define WL_WIN32_HANDLE		 (1 << 6)
 #else
 /* avoid having to deal with case on platforms not requiring it */
 #define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE
-- 
2.17.1 (Apple Git-112)

