Hi, On 2022-01-29 13:47:13 -0800, Andres Freund wrote: > Here's a version of the patch only creating the event once. Needs a small bit > of comment polishing, but otherwise I think it's sane?
Ah, it needs a bit more. I was not cleaning up the event at the exit of ReceiveXlogStream(). For pg_basebackup that perhaps wouldn't matter, but pg_receivewal loops... We don't have a good spot for cleaning up right now. ReceiveXlogStream() has plenty returns. The attached changes those to a goto done; but pretty it is not. But probably still the best way for the backbranches? I think the receivelog.c interface probably could do with a bit of cleanup... The control flow is quite complicated, with repeated checks all over etc :(. And the whole thing with giving the appearance of being instantiatable multiple times, but then using global variables for state, is ... Attached a revised version. Greetings, Andres Freund
>From 0faf35162c7a1dd1720cfe83e3a7166700148f16 Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Sun, 16 Jan 2022 01:58:24 -0800 Subject: [PATCH v3 1/3] Avoid slow shutdown of pg_basebackup, windows edition. See also 7834d20b57a. Reviewed-By: Magnus Hagander <mag...@hagander.net> Discussion: https://postgr.es/m/20220129204422.ljyxclfy5ubws...@alap3.anarazel.de --- src/bin/pg_basebackup/pg_basebackup.c | 24 ++++- src/bin/pg_basebackup/pg_receivewal.c | 4 + src/bin/pg_basebackup/receivelog.c | 146 ++++++++++++++++++++++---- src/bin/pg_basebackup/receivelog.h | 6 ++ 4 files changed, 159 insertions(+), 21 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index c40925c1f04..9f80cdc4fef 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -169,6 +169,8 @@ static const char *progress_filename; /* Pipe to communicate with background wal receiver process */ #ifndef WIN32 static int bgpipe[2] = {-1, -1}; +#else +HANDLE *bgevent = NULL; #endif /* Handle to child process */ @@ -506,7 +508,14 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline, /* * At this point we have an end pointer, so compare it to the current * position to figure out if it's time to stop. + * + * On windows we need to reset the event used to wake up the streaming + * thread, otherwise CopyStreamPoll() will start to immediately return. */ +#ifdef WIN32 + ResetEvent(bgevent); +#endif + if (segendpos >= xlogendptr) return true; @@ -541,7 +550,7 @@ LogStreamerMain(logstreamer_param *param) #ifndef WIN32 stream.stop_socket = bgpipe[0]; #else - stream.stop_socket = PGINVALID_SOCKET; + stream.stop_event = bgevent; #endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = false; @@ -627,6 +636,14 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) pg_log_error("could not create pipe for background process: %m"); exit(1); } +#else + bgevent = CreateEvent(NULL, TRUE, FALSE, NULL); + if (bgevent == NULL) + { + pg_log_error("could not create event for background thread: %lu", + GetLastError()); + exit(1); + } #endif /* Get a second connection */ @@ -2216,7 +2233,9 @@ BaseBackup(void) /* * On Windows, since we are in the same process, we can just store the * value directly in the variable, and then set the flag that says - * it's there. + * it's there. To interrupt the thread while it's waiting for network + * IO, we set an event (which the thread waits on in addition to the + * socket). */ if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2) { @@ -2226,6 +2245,7 @@ BaseBackup(void) } xlogendptr = ((uint64) hi) << 32 | lo; InterlockedIncrement(&has_xlogendptr); + SetEvent(bgevent); /* First wait for the thread to exit */ if (WaitForSingleObjectEx((HANDLE) bgchild_handle, INFINITE, FALSE) != diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index ccb215c398c..d27bd85b7ce 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -618,7 +618,11 @@ StreamLog(void) stream.timeline); stream.stream_stop = stop_streaming; +#ifndef WIN32 stream.stop_socket = PGINVALID_SOCKET; +#else + stream.stop_event = NULL; +#endif stream.standby_message_timeout = standby_message_timeout; stream.synchronous = synchronous; stream.do_sync = do_sync; diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index d39e4b11a1a..b3cb552c3f7 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -37,8 +37,8 @@ static bool still_sending = true; /* feedback still needs to be sent? */ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos); -static int CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket); -static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, +static int CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream); +static int CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, XLogRecPtr blockpos, TimestampTz *last_status); @@ -414,6 +414,48 @@ CheckServerVersionForStreaming(PGconn *conn) return true; } +/* + * Prepare for ReceiveXlogStream() doing its work. + * + * Right now we just need to prepare an event for CopyStreamPoll() on windows. + */ +static void +XLogStreamPrepare(PGconn *conn, StreamCtl *stream) +{ +#ifdef WIN32 + Assert(stream->net_event == NULL); + stream->net_event = WSACreateEvent(); + if (stream->net_event == WSA_INVALID_EVENT) + { + pg_log_error("failed to create event for socket: error code %d", + WSAGetLastError()); + exit(1); + } + + if (WSAEventSelect(PQsocket(conn), stream->net_event, FD_READ | FD_CLOSE) != 0) + { + pg_log_error("failed to set up event for socket: error code %d", + WSAGetLastError()); + exit(1); + } +#endif +} + +/* + * Clean up after ReceiveXlogStream(), undoing XLogStreamPrepare()'s work. + */ +static void +XLogStreamDone(PGconn *conn, StreamCtl *stream) +{ +#ifdef WIN32 + Assert(stream->net_event != NULL); + /* reset event association for libpq socket, clean up event */ + WSAEventSelect(PQsocket(conn), NULL, 0); + WSACloseEvent(stream->net_event); + stream->net_event = NULL; +#endif +} + /* * Receive a log stream starting at the specified position. * @@ -462,13 +504,16 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) char slotcmd[128]; PGresult *res; XLogRecPtr stoppos; + bool ret = false; + + XLogStreamPrepare(conn, stream); /* * The caller should've checked the server version already, but doesn't do * any harm to check it here too. */ if (!CheckServerVersionForStreaming(conn)) - return false; + goto done; /* * Decide whether we want to report the flush position. If we report the @@ -506,14 +551,14 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) if (!RunIdentifySystem(conn, &sysidentifier, &servertli, NULL, NULL)) { pg_free(sysidentifier); - return false; + goto done; } if (strcmp(stream->sysidentifier, sysidentifier) != 0) { pg_log_error("system identifier does not match between base backup and streaming connection"); pg_free(sysidentifier); - return false; + goto done; } pg_free(sysidentifier); @@ -521,7 +566,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) { pg_log_error("starting timeline %u is not present in the server", stream->timeline); - return false; + goto done; } } @@ -549,7 +594,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) pg_log_error("could not send replication command \"%s\": %s", "TIMELINE_HISTORY", PQresultErrorMessage(res)); PQclear(res); - return false; + goto done; } /* @@ -575,7 +620,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) * callback tells us to stop here. */ if (stream->stream_stop(stream->startpos, stream->timeline, false)) - return true; + { + ret = true; + goto done; + } /* Initiate the replication stream at specified location */ snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", @@ -588,7 +636,7 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) pg_log_error("could not send replication command \"%s\": %s", "START_REPLICATION", PQresultErrorMessage(res)); PQclear(res); - return false; + goto done; } PQclear(res); @@ -672,7 +720,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) * complain. */ if (stream->stream_stop(stoppos, stream->timeline, false)) - return true; + { + ret = true; + goto done; + } else { pg_log_error("replication stream was terminated before stop point"); @@ -690,11 +741,15 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) } error: + Assert(ret == false); if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0) pg_log_error("could not close file \"%s\": %s", current_walfile_name, stream->walmethod->getlasterror()); walfile = NULL; - return false; + +done: + XLogStreamDone(conn, stream); + return ret; } /* @@ -813,7 +868,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout, last_status); - r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf); + r = CopyStreamReceive(conn, sleeptime, stream, ©buf); while (r != 0) { if (r == -1) @@ -858,7 +913,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, * Process the received data, and any subsequent data we can read * without blocking. */ - r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf); + r = CopyStreamReceive(conn, 0, stream, ©buf); } } @@ -877,8 +932,9 @@ error: * or interrupted by signal or stop_socket input, and -1 on an error. */ static int -CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) +CopyStreamPoll(PGconn *conn, long timeout_ms, StreamCtl *stream) { +#ifndef WIN32 int ret; fd_set input_mask; int connsocket; @@ -896,10 +952,10 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) FD_ZERO(&input_mask); FD_SET(connsocket, &input_mask); maxfd = connsocket; - if (stop_socket != PGINVALID_SOCKET) + if (stream->stop_socket != PGINVALID_SOCKET) { - FD_SET(stop_socket, &input_mask); - maxfd = Max(maxfd, stop_socket); + FD_SET(stream->stop_socket, &input_mask); + maxfd = Max(maxfd, stream->stop_socket); } if (timeout_ms < 0) @@ -924,6 +980,58 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) return 1; /* Got input on connection socket */ return 0; /* Got timeout or input on stop_socket */ +#else + int ret; + int rc; + int nevents = 0; + HANDLE events[2]; + + + events[0] = stream->net_event; + nevents++; + + if (stream->stop_event != NULL) + { + events[1] = stream->stop_event; + nevents++; + } + + /* map timeout_ms to WaitForMultipleObjects expectations */ + if (timeout_ms < 0) + timeout_ms = INFINITE; + + rc = WaitForMultipleObjects(nevents, events, FALSE, timeout_ms); + + if (rc == WAIT_FAILED) + { + pg_log_error("WaitForMultipleObjects() failed: error code %lu", + GetLastError()); + exit(1); + } + else if (rc == WAIT_TIMEOUT) + { + /* timeout exceeded */ + ret = 0; + } + else if (rc == WAIT_OBJECT_0) + { + /* Got input on connection socket */ + ret = 1; + } + else if (rc == (WAIT_OBJECT_0 + 1)) + { + Assert(stream->stop_event != NULL); + /* Got event on stop socket */ + ret = 0; + } + else + { + pg_log_error("unexpected return from WaitForMultipleObjects(): %d", rc); + exit(1); + } + + return ret; +#endif /* WIN32 */ } /* @@ -939,7 +1047,7 @@ CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket) * -1 on error. -2 if the server ended the COPY. */ static int -CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, +CopyStreamReceive(PGconn *conn, long timeout, StreamCtl *stream, char **buffer) { char *copybuf = NULL; @@ -960,7 +1068,7 @@ CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket, * the specified timeout, so that we can ping the server. Also stop * waiting if input appears on stop_socket. */ - ret = CopyStreamPoll(conn, timeout, stop_socket); + ret = CopyStreamPoll(conn, timeout, stream); if (ret <= 0) return ret; diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index 050d4bc69fd..c0b1911a6c1 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -40,8 +40,14 @@ typedef struct StreamCtl stream_stop_callback stream_stop; /* Stop streaming when returns true */ +#ifndef WIN32 pgsocket stop_socket; /* if valid, watch for input on this socket * and check stream_stop() when there is any */ +#else + HANDLE *stop_event; /* on windows, check an event instead */ + + HANDLE *net_event; /* event to wait for network IO */ +#endif WalWriteMethod *walmethod; /* How to write the WAL */ char *partial_suffix; /* Suffix appended to partially received files */ -- 2.34.0