Hi,

On 2022-01-29 13:47:13 -0800, Andres Freund wrote:
> Here's a version of the patch only creating the event once. Needs a small bit
> of comment polishing, but otherwise I think it's sane?

Ah, it needs a bit more. I was not cleaning up the event at the exit of
ReceiveXlogStream(). For pg_basebackup that perhaps wouldn't matter, but
pg_receivewal loops...

We don't have a good spot for cleaning up right now. ReceiveXlogStream() has
plenty returns. The attached changes those to a goto done; but pretty it is
not. But probably still the best way for the backbranches?

I think the receivelog.c interface probably could do with a bit of
cleanup... The control flow is quite complicated, with repeated checks all
over etc :(.  And the whole thing with giving the appearance of being
instantiatable multiple times, but then using global variables for state, is
...

Attached a revised version.

Greetings,

Andres Freund
>From 0faf35162c7a1dd1720cfe83e3a7166700148f16 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Sun, 16 Jan 2022 01:58:24 -0800
Subject: [PATCH v3 1/3] Avoid slow shutdown of pg_basebackup, windows edition.

See also 7834d20b57a.

Reviewed-By: Magnus Hagander <mag...@hagander.net>
Discussion: https://postgr.es/m/20220129204422.ljyxclfy5ubws...@alap3.anarazel.de
---
 src/bin/pg_basebackup/pg_basebackup.c |  24 ++++-
 src/bin/pg_basebackup/pg_receivewal.c |   4 +
 src/bin/pg_basebackup/receivelog.c    | 146 ++++++++++++++++++++++----
 src/bin/pg_basebackup/receivelog.h    |   6 ++
 4 files changed, 159 insertions(+), 21 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index c40925c1f04..9f80cdc4fef 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -169,6 +169,8 @@ static const char *progress_filename;
 /* Pipe to communicate with background wal receiver process */
 #ifndef WIN32
 static int	bgpipe[2] = {-1, -1};
+#else
+HANDLE	   *bgevent = NULL;
 #endif
 
 /* Handle to child process */
@@ -506,7 +508,14 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 	/*
 	 * At this point we have an end pointer, so compare it to the current
 	 * position to figure out if it's time to stop.
+	 *
+	 * On windows we need to reset the event used to wake up the streaming
+	 * thread, otherwise CopyStreamPoll() will start to immediately return.
 	 */
+#ifdef WIN32
+	ResetEvent(bgevent);
+#endif
+
 	if (segendpos >= xlogendptr)
 		return true;
 
@@ -541,7 +550,7 @@ LogStreamerMain(logstreamer_param *param)
 #ifndef WIN32
 	stream.stop_socket = bgpipe[0];
 #else
-	stream.stop_socket = PGINVALID_SOCKET;
+	stream.stop_event = bgevent;
 #endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = false;
@@ -627,6 +636,14 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 		pg_log_error("could not create pipe for background process: %m");
 		exit(1);
 	}
+#else
+	bgevent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if (bgevent == NULL)
+	{
+		pg_log_error("could not create event for background thread: %lu",
+					 GetLastError());
+		exit(1);
+	}
 #endif
 
 	/* Get a second connection */
@@ -2216,7 +2233,9 @@ BaseBackup(void)
 		/*
 		 * On Windows, since we are in the same process, we can just store the
 		 * value directly in the variable, and then set the flag that says
-		 * it's there.
+		 * it's there. To interrupt the thread while it's waiting for network
+		 * IO, we set an event (which the thread waits on in addition to the
+		 * socket).
 		 */
 		if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
 		{
@@ -2226,6 +2245,7 @@ BaseBackup(void)
 		}
 		xlogendptr = ((uint64) hi) << 32 | lo;
 		InterlockedIncrement(&has_xlogendptr);
+		SetEvent(bgevent);
 
 		/* First wait for the thread to exit */
 		if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) !=
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index ccb215c398c..d27bd85b7ce 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -618,7 +618,11 @@ StreamLog(void)
 					stream.timeline);
 
 	stream.stream_stop = stop_streaming;
+#ifndef WIN32
 	stream.stop_socket = PGINVALID_SOCKET;
+#else
+	stream.stop_event = NULL;
+#endif
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = synchronous;
 	stream.do_sync = do_sync;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d39e4b11a1a..b3cb552c3f7 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -37,8 +37,8 @@ static bool still_sending = true;	/* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
 								  XLogRecPtr *stoppos);
-static int	CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
-static int	CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+static int	CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream);
+static int	CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 							  char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
 								int len, XLogRecPtr blockpos, TimestampTz *last_status);
@@ -414,6 +414,48 @@ CheckServerVersionForStreaming(PGconn *conn)
 	return true;
 }
 
+/*
+ * Prepare for ReceiveXlogStream() doing its work.
+ *
+ * Right now we just need to prepare an event for CopyStreamPoll() on windows.
+ */
+static void
+XLogStreamPrepare(PGconn *conn, StreamCtl *stream)
+{
+#ifdef WIN32
+	Assert(stream->net_event == NULL);
+	stream->net_event = WSACreateEvent();
+	if (stream->net_event == WSA_INVALID_EVENT)
+	{
+		pg_log_error("failed to create event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+
+	if (WSAEventSelect(PQsocket(conn), stream->net_event, FD_READ | FD_CLOSE) != 0)
+	{
+		pg_log_error("failed to set up event for socket: error code %d",
+					 WSAGetLastError());
+		exit(1);
+	}
+#endif
+}
+
+/*
+ * Clean up after ReceiveXlogStream(), undoing XLogStreamPrepare()'s work.
+ */
+static void
+XLogStreamDone(PGconn *conn, StreamCtl *stream)
+{
+#ifdef WIN32
+	Assert(stream->net_event != NULL);
+	/* reset event association for libpq socket, clean up event */
+	WSAEventSelect(PQsocket(conn), NULL, 0);
+	WSACloseEvent(stream->net_event);
+	stream->net_event = NULL;
+#endif
+}
+
 /*
  * Receive a log stream starting at the specified position.
  *
@@ -462,13 +504,16 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	char		slotcmd[128];
 	PGresult   *res;
 	XLogRecPtr	stoppos;
+	bool		ret = false;
+
+	XLogStreamPrepare(conn, stream);
 
 	/*
 	 * The caller should've checked the server version already, but doesn't do
 	 * any harm to check it here too.
 	 */
 	if (!CheckServerVersionForStreaming(conn))
-		return false;
+		goto done;
 
 	/*
 	 * Decide whether we want to report the flush position. If we report the
@@ -506,14 +551,14 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 		if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL))
 		{
 			pg_free(sysidentifier);
-			return false;
+			goto done;
 		}
 
 		if (strcmp(stream->sysidentifier, sysidentifier) != 0)
 		{
 			pg_log_error("system identifier does not match between base backup and streaming connection");
 			pg_free(sysidentifier);
-			return false;
+			goto done;
 		}
 		pg_free(sysidentifier);
 
@@ -521,7 +566,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 		{
 			pg_log_error("starting timeline %u is not present in the server",
 						 stream->timeline);
-			return false;
+			goto done;
 		}
 	}
 
@@ -549,7 +594,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 				pg_log_error("could not send replication command \"%s\": %s",
 							 "TIMELINE_HISTORY", PQresultErrorMessage(res));
 				PQclear(res);
-				return false;
+				goto done;
 			}
 
 			/*
@@ -575,7 +620,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 		 * callback tells us to stop here.
 		 */
 		if (stream->stream_stop(stream->startpos, stream->timeline, false))
-			return true;
+		{
+			ret = true;
+			goto done;
+		}
 
 		/* Initiate the replication stream at specified location */
 		snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
@@ -588,7 +636,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 			pg_log_error("could not send replication command \"%s\": %s",
 						 "START_REPLICATION", PQresultErrorMessage(res));
 			PQclear(res);
-			return false;
+			goto done;
 		}
 		PQclear(res);
 
@@ -672,7 +720,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 			 * complain.
 			 */
 			if (stream->stream_stop(stoppos, stream->timeline, false))
-				return true;
+			{
+				ret = true;
+				goto done;
+			}
 			else
 			{
 				pg_log_error("replication stream was terminated before stop point");
@@ -690,11 +741,15 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	}
 
 error:
+	Assert(ret == false);
 	if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)
 		pg_log_error("could not close file \"%s\": %s",
 					 current_walfile_name, stream->walmethod->getlasterror());
 	walfile = NULL;
-	return false;
+
+done:
+	XLogStreamDone(conn, stream);
+	return ret;
 }
 
 /*
@@ -813,7 +868,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
 												 last_status);
 
-		r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
+		r = CopyStreamReceive(conn, sleeptime, stream, &copybuf);
 		while (r != 0)
 		{
 			if (r == -1)
@@ -858,7 +913,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 			 * Process the received data, and any subsequent data we can read
 			 * without blocking.
 			 */
-			r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
+			r = CopyStreamReceive(conn, 0, stream, &copybuf);
 		}
 	}
 
@@ -877,8 +932,9 @@ error:
  * or interrupted by signal or stop_socket input, and -1 on an error.
  */
 static int
-CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
+CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream)
 {
+#ifndef WIN32
 	int			ret;
 	fd_set		input_mask;
 	int			connsocket;
@@ -896,10 +952,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 	FD_ZERO(&input_mask);
 	FD_SET(connsocket, &input_mask);
 	maxfd = connsocket;
-	if (stop_socket != PGINVALID_SOCKET)
+	if (stream->stop_socket != PGINVALID_SOCKET)
 	{
-		FD_SET(stop_socket, &input_mask);
-		maxfd = Max(maxfd, stop_socket);
+		FD_SET(stream->stop_socket, &input_mask);
+		maxfd = Max(maxfd, stream->stop_socket);
 	}
 
 	if (timeout_ms < 0)
@@ -924,6 +980,58 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
 		return 1;				/* Got input on connection socket */
 
 	return 0;					/* Got timeout or input on stop_socket */
+#else
+	int			ret;
+	int			rc;
+	int			nevents = 0;
+	HANDLE		events[2];
+
+
+	events[0] = stream->net_event;
+	nevents++;
+
+	if (stream->stop_event != NULL)
+	{
+		events[1] = stream->stop_event;
+		nevents++;
+	}
+
+	/* map timeout_ms to WaitForMultipleObjects expectations */
+	if (timeout_ms < 0)
+		timeout_ms = INFINITE;
+
+	rc = WaitForMultipleObjects(nevents, events, FALSE, timeout_ms);
+
+	if (rc == WAIT_FAILED)
+	{
+		pg_log_error("WaitForMultipleObjects() failed: error code %lu",
+					 GetLastError());
+		exit(1);
+	}
+	else if (rc == WAIT_TIMEOUT)
+	{
+		/* timeout exceeded */
+		ret = 0;
+	}
+	else if (rc == WAIT_OBJECT_0)
+	{
+		/* Got input on connection socket */
+		ret = 1;
+	}
+	else if (rc == (WAIT_OBJECT_0 + 1))
+	{
+		Assert(stream->stop_event != NULL);
+		/* Got event on stop socket  */
+		ret = 0;
+	}
+	else
+	{
+		pg_log_error("unexpected return from WaitForMultipleObjects(): %d", rc);
+		exit(1);
+	}
+
+	return ret;
+#endif							/* WIN32 */
 }
 
 /*
@@ -939,7 +1047,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
  * -1 on error. -2 if the server ended the COPY.
  */
 static int
-CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
+CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream,
 				  char **buffer)
 {
 	char	   *copybuf = NULL;
@@ -960,7 +1068,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
 		 * the specified timeout, so that we can ping the server.  Also stop
 		 * waiting if input appears on stop_socket.
 		 */
-		ret = CopyStreamPoll(conn, timeout, stop_socket);
+		ret = CopyStreamPoll(conn, timeout, stream);
 		if (ret <= 0)
 			return ret;
 
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 050d4bc69fd..c0b1911a6c1 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -40,8 +40,14 @@ typedef struct StreamCtl
 
 	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
 
+#ifndef WIN32
 	pgsocket	stop_socket;	/* if valid, watch for input on this socket
 								 * and check stream_stop() when there is any */
+#else
+	HANDLE	   *stop_event;		/* on windows, check an event instead */
+
+	HANDLE	   *net_event;		/* event to wait for network IO */
+#endif
 
 	WalWriteMethod *walmethod;	/* How to write the WAL */
 	char	   *partial_suffix; /* Suffix appended to partially received files */
-- 
2.34.0

Reply via email to