On Tue, Jan 25, 2022 at 3:50 PM Tom Lane <t...@sss.pgh.pa.us> wrote: > Thomas Munro <thomas.mu...@gmail.com> writes: > > I vote for reverting in release branches only. I'll propose a better > > WES patch set for master that hopefully also covers async append etc > > (which I was already planning to do before we knew about this Windows > > problem). More soon. > > WFM, but we'll have to remember to revert this in v15 if we don't > have a solid fix by then.
Phew, after a couple of days of very slow compile/test cycles on Windows exploring a couple of different ideas, I finally have something new. First let me recap the three main ideas in this thread: 1. It sounds like no one really loves the WSAPoll() kludge, even though it apparently works for simple cases. It's not totally clear that it really works in enough cases, for one thing. It doesn't allow for a socket to be in two WESes at the same time, and I'm not sure I want to bank on Winsock's WSAPoll() being guaranteed to report POLLHUP when half closed (as mentioned, no other OS does AFAIK). 2. The long-lived-WaitEventSets-everywhere concept was initially appealling to me and solves the walreceiver problem (when combined with a sticky seen_fd_close flag), and I've managed to get that working correctly across libpq reconnects. As mentioned, I also have some toy patches along those lines for the equivalent but more complex problem in postgres_fdw, because I've been studying how to make parallel append generate a tidy stream of epoll_wait()/kevent() calls, instead of a quadratic explosion of setup/teardown spam. I'll write some more about those patches and hopefully propose them soon anyway, but on reflection I don't really want that Unix efficiency problem to be tangled up with this Windows correctness problem. That'd require a programming rule that I don't want to burden us with forever: you'd *never* be able to use a socket in more than one WaitEventSet, and WaitLatchOrSocket() would have to be removed. 3. The real solution to this problem is to recognise that we just have the event objects in the wrong place. WaitEventSets shouldn't own them: they need to be 1:1 with sockets, or Winsock will eat events. Likewise for the flag you need for edge->level conversion, or *we'll* eat events. Having now tried that, it's starting to feel like the best way forward, even though my initial prototype (see attached) is maybe a tad cumbersome with bookkeeping. I believe it means that all existing coding patterns *should* now be safe (not yet confirmed by testing), and we're free to put sockets in multiple WESes even at the same time if the need arises. The basic question is: how should a socket user find the associated event handle and flags? Some answers: 1. "pgsocket" could become a pointer to a heap-allocated wrapper object containing { socket, event, flags } on Windows, or something like that, but that seems a bit invasive and tangled up with public APIs like libpq, which put me off trying that. I'm willing to explore it if people object to my other idea. 2. "pgsocket" could stay unchanged, but we could have a parallel array with extra socket state, indexed by file descriptor. We could use new socket()/close() libpq events so that libpq's sockets could be registered in this scheme without libpq itself having to know anything about that. That worked pretty nicely when I developed it on my FreeBSD box, but on Windows I soon learned that SOCKET is really yet another name for HANDLE, so it's not a dense number space anchored at 0 like Unix file descriptors. The array could be prohibitively big. 3. I tried the same as #2 but with a hash table, and ran into another small problem when putting it all together: we probably don't want to longjump out of libpq callbacks on allocation failure. So, I modified simplehash to add a no-OOM behaviour. That's the POC patch set I'm attaching for show-and-tell. Some notes and TODOs in the commit messages and comments. Thoughts?
From bdd90aeb65d82ecae8fe58b441d25a1e1b129bf3 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.munro@gmail.com> Date: Sat, 29 Jan 2022 02:15:10 +1300 Subject: [PATCH 1/3] Add low level socket events for libpq. Provide a way to get a callback when a socket is created or closed. XXX TODO handle callback failure XXX TODO investigate overheads/other implications of having a callback installed --- src/interfaces/libpq/fe-connect.c | 24 ++++++++++++++++++++++++ src/interfaces/libpq/libpq-events.c | 11 +++++++++++ src/interfaces/libpq/libpq-events.h | 10 +++++++++- 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index a6a1db3356..ddc3f38cf1 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -452,7 +452,19 @@ pqDropConnection(PGconn *conn, bool flushInput) /* Close the socket itself */ if (conn->sock != PGINVALID_SOCKET) + { + /* Report that the socket is closing. */ + for (int i = 0; i < conn->nEvents; i++) + { + PGEventSocket evt; + + evt.conn = conn; + evt.socket = conn->sock; + (void) conn->events[i].proc(PGEVT_SOCKETCLOSE, &evt, + conn->events[i].passThrough); + } closesocket(conn->sock); + } conn->sock = PGINVALID_SOCKET; /* Optionally discard any unread data */ @@ -2576,6 +2588,18 @@ keep_going: /* We will come back to here until there is goto error_return; } + /* Report that a new socket has been created. */ + for (int i = 0; i < conn->nEvents; i++) + { + PGEventSocket evt; + + evt.conn = conn; + evt.socket = conn->sock; + (void) conn->events[i].proc(PGEVT_SOCKET, &evt, + conn->events[i].passThrough); + /* XXX check call result and fail */ + } + /* * Once we've identified a target address, all errors * except the preceding socket()-failure case should be diff --git a/src/interfaces/libpq/libpq-events.c b/src/interfaces/libpq/libpq-events.c index 7754c37748..2c549ec457 100644 --- a/src/interfaces/libpq/libpq-events.c +++ b/src/interfaces/libpq/libpq-events.c @@ -87,6 +87,17 @@ PQregisterEventProc(PGconn *conn, PGEventProc proc, return false; } + /* If we already have a socket, report a socket event immediately. */ + if (conn->sock != PGINVALID_SOCKET) + { + PGEventSocket evt; + + evt.conn = conn; + evt.socket = conn->sock; + (void) proc(PGEVT_SOCKET, &evt, passThrough); + /* XXX check return and fail (callback out of memory) */ + } + return true; } diff --git a/src/interfaces/libpq/libpq-events.h b/src/interfaces/libpq/libpq-events.h index d93b7c4d4e..da64dc866b 100644 --- a/src/interfaces/libpq/libpq-events.h +++ b/src/interfaces/libpq/libpq-events.h @@ -31,7 +31,9 @@ typedef enum PGEVT_CONNDESTROY, PGEVT_RESULTCREATE, PGEVT_RESULTCOPY, - PGEVT_RESULTDESTROY + PGEVT_RESULTDESTROY, + PGEVT_SOCKET, + PGEVT_SOCKETCLOSE } PGEventId; typedef struct @@ -66,6 +68,12 @@ typedef struct PGresult *result; } PGEventResultDestroy; +typedef struct +{ + PGconn *conn; + pgsocket socket; +} PGEventSocket; + typedef int (*PGEventProc) (PGEventId evtId, void *evtInfo, void *passThrough); /* Registers an event proc with the given PGconn. */ -- 2.33.1
From 9e03fb83dcd95b7b793728b21726a1494943d47e Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.munro@gmail.com> Date: Tue, 1 Feb 2022 11:21:57 +1300 Subject: [PATCH 2/3] Allow no-throw allocators in simplehash.h. Previously, elog-style error reporting was assumed. Allow SH_NO_OOM to be defined, to indicate that you'd like creation and insertion to return NULL to indicate failure instead, for use in no-throw programming contexts. A custom allocation function should return NULL on failure, and the built-in MemoryContext support will use MCXT_NO_OOM when this option is used. Also allow SH_RAW_FREE to be defined, and SH_RAW_ALLOCATOR_NOZERO to state that the SH_RAW_ALLOCATOR doesn't zero memory. These configuration points allow standard malloc and free to be used, which was mostly interesting to allow testing of the SH_NO_OOM with different allocator options. XXX Proof-of-concept --- src/include/lib/simplehash.h | 74 +++++++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h index 8192927010..758669e1ea 100644 --- a/src/include/lib/simplehash.h +++ b/src/include/lib/simplehash.h @@ -41,7 +41,11 @@ * - SH_SCOPE - in which scope (e.g. extern, static inline) do function * declarations reside * - SH_RAW_ALLOCATOR - if defined, memory contexts are not used; instead, - * use this to allocate bytes. The allocator must zero the returned space. + * use this to allocate bytes. The allocator must zero the returned space, + * or SH_RAW_ALLOCATOR_NOZERO must also be defined. + * - SH_RAW_FREE - if defined, use this to free memory, instead of pfree() + * - SH_NO_OOM - if defined, the allocator can fail by returning NULL, + * otherwise allocator failure is assumed to exit nonlocally (ERROR) * - SH_USE_NONDEFAULT_ALLOCATOR - if defined no element allocator functions * are defined, so you can supply your own * The following parameters are only relevant when SH_DEFINE is defined: @@ -204,14 +208,14 @@ SH_SCOPE void SH_DESTROY(SH_TYPE * tb); SH_SCOPE void SH_RESET(SH_TYPE * tb); /* void <prefix>_grow(<prefix>_hash *tb, uint64 newsize) */ -SH_SCOPE void SH_GROW(SH_TYPE * tb, uint64 newsize); +SH_SCOPE bool SH_GROW(SH_TYPE * tb, uint64 newsize); /* <element> *<prefix>_insert(<prefix>_hash *tb, <key> key, bool *found) */ SH_SCOPE SH_ELEMENT_TYPE *SH_INSERT(SH_TYPE * tb, SH_KEY_TYPE key, bool *found); /* * <element> *<prefix>_insert_hash(<prefix>_hash *tb, <key> key, uint32 hash, - * bool *found) + * bool *found) */ SH_SCOPE SH_ELEMENT_TYPE *SH_INSERT_HASH(SH_TYPE * tb, SH_KEY_TYPE key, uint32 hash, bool *found); @@ -282,6 +286,12 @@ SH_SCOPE void SH_STAT(SH_TYPE * tb); #define SH_COMPARE_KEYS(tb, ahash, akey, b) (SH_EQUAL(tb, b->SH_KEY, akey)) #endif +#ifdef SH_NO_OOM +#define SH_MCXT_FLAGS (MCXT_ALLOC_ZERO | MCXT_ALLOC_NO_OOM) +#else +#define SH_MCXT_FLAGS MCXT_ALLOC_ZERO +#endif + /* * Wrap the following definitions in include guards, to avoid multiple * definition errors if this header is included more than once. The rest of @@ -400,10 +410,18 @@ static inline void * SH_ALLOCATE(SH_TYPE * type, Size size) { #ifdef SH_RAW_ALLOCATOR - return SH_RAW_ALLOCATOR(size); + void *result = SH_RAW_ALLOCATOR(size); +#ifdef SH_RAW_ALLOCATOR_NOZERO +#ifdef SH_NO_OOM + if (!result) + return NULL; +#endif + memset(result, 0, size); +#endif + return result; #else return MemoryContextAllocExtended(type->ctx, size, - MCXT_ALLOC_HUGE | MCXT_ALLOC_ZERO); + SH_MCXT_FLAGS | MCXT_ALLOC_HUGE); #endif } @@ -411,7 +429,11 @@ SH_ALLOCATE(SH_TYPE * type, Size size) static inline void SH_FREE(SH_TYPE * type, void *pointer) { +#ifdef SH_RAW_FREE + SH_RAW_FREE(pointer); +#else pfree(pointer); +#endif } #endif @@ -439,7 +461,16 @@ SH_CREATE(MemoryContext ctx, uint32 nelements, void *private_data) #ifdef SH_RAW_ALLOCATOR tb = SH_RAW_ALLOCATOR(sizeof(SH_TYPE)); #else - tb = MemoryContextAllocZero(ctx, sizeof(SH_TYPE)); + tb = MemoryContextAllocExtended(ctx, sizeof(SH_TYPE), SH_MCXT_FLAGS); +#endif +#ifdef SH_NO_OOM + if (!tb) + return NULL; +#endif +#ifdef SH_RAW_ALLOCATOR_NOZERO + memset(tb, 0, sizeof(SH_TYPE)); +#endif +#ifndef SH_RAW_ALLOCATOR tb->ctx = ctx; #endif tb->private_data = private_data; @@ -451,6 +482,14 @@ SH_CREATE(MemoryContext ctx, uint32 nelements, void *private_data) tb->data = SH_ALLOCATE(tb, sizeof(SH_ELEMENT_TYPE) * tb->size); +#ifdef SH_NO_OOM + if (!tb->data) + { + SH_FREE(tb, tb); + return NULL; + } +#endif + return tb; } @@ -476,8 +515,10 @@ SH_RESET(SH_TYPE * tb) * Usually this will automatically be called by insertions/deletions, when * necessary. But resizing to the exact input size can be advantageous * performance-wise, when known at some point. + * + * Return true on success. Can only return false if SH_NO_OOM is defined. */ -SH_SCOPE void +SH_SCOPE bool SH_GROW(SH_TYPE * tb, uint64 newsize) { uint64 oldsize = tb->size; @@ -494,9 +535,18 @@ SH_GROW(SH_TYPE * tb, uint64 newsize) /* compute parameters for new table */ SH_COMPUTE_PARAMETERS(tb, newsize); - tb->data = SH_ALLOCATE(tb, sizeof(SH_ELEMENT_TYPE) * tb->size); + newdata = SH_ALLOCATE(tb, sizeof(SH_ELEMENT_TYPE) * newsize); + +#ifdef SH_NO_OOM + if (!newdata) + return false; +#endif +#ifdef SH_RAW_ALLOCATOR_NOZERO + memset(newdata, 0, sizeof(SH_ELEMENT_TYPE) * newsize); +#endif - newdata = tb->data; + tb->data = newdata; + tb->size = newsize; /* * Copy entries from the old data to newdata. We theoretically could use @@ -581,6 +631,8 @@ SH_GROW(SH_TYPE * tb, uint64 newsize) } SH_FREE(tb, olddata); + + return true; } /* @@ -615,7 +667,8 @@ restart: * When optimizing, it can be very useful to print these out. */ /* SH_STAT(tb); */ - SH_GROW(tb, tb->size * 2); + if (!SH_GROW(tb, tb->size * 2)) + return NULL; /* SH_STAT(tb); */ } @@ -1147,6 +1200,7 @@ SH_STAT(SH_TYPE * tb) #undef SH_GROW_MAX_MOVE #undef SH_GROW_MIN_FILLFACTOR #undef SH_MAX_SIZE +#undef SH_MCXT_FLAGS /* types */ #undef SH_TYPE -- 2.33.1
From 2222901c1947bb4c72f26b5706f5f3248cec96b8 Mon Sep 17 00:00:00 2001 From: Thomas Munro <thomas.munro@gmail.com> Date: Sun, 30 Jan 2022 13:01:13 +1300 Subject: [PATCH 3/3] Switch to per-socket Windows event handles. Previously, each WaitEventSet would create a new operating system 'event' object for each socket. Create a dedicated event and some state flags for each socket, independent of WaitEventSets. This fixes three problems: 1. When a socket is used in a new WaitEventSet (for example, one after another when using temporary WaitEventSets), queued but not yet reported FD_CLOSE events could be lost by the operating system. This is now avoided with a single long-lived event for the socket. 2. We need to report Winsock's FD_CLOSE event as WL_SOCKET_READABLE with level-triggered semantics, but it's only reported once by Winsock on graceful socket shutdown, so could be lost if a caller waits twice in a row without reading the socket. This is avoided by adding a flag to remember that we've already received FD_CLOSE. 3. It should also be possible to use a socket in two WaitEventSet objects at the same time (though not from two threads without more work), since the network event mask will be adjusted on each wait if required when you switch between sets containing the same socket. Though this isn't currently needed by existing PostgreSQL code AFAIK, it's a nice restriction to lift, to match Unix. Manage the new extra per-socket state in a hash table indexed by socket, and require clients of WaitEventSet to register and deregister sockets with a new API. On Unix this is a no-op, other than correctness checks in assertion builds. For libwalreceiver and postgres_fdw, a new libpq-events callback automatically manages socket registration. XXX Proof of concept XXX There is some more temporary event-switching going on inside src/backend/port/win32/socket.c, but not in code paths that were affected by the recent graceful shutdown brokenness. They may need to be integrated with this, potentially widening the set of sockets that need to be registered with socket_table.c to 'all of them'? Discussion: https://postgr.es/m/CA%2BhUKG%2BOeoETZQ%3DQw5Ub5h3tmwQhBmDA%3DnuNO3KG%3DzWfUypFAw%40mail.gmail.com --- contrib/postgres_fdw/connection.c | 25 +++ src/backend/libpq/pqcomm.c | 3 + src/backend/port/Makefile | 3 +- src/backend/port/socket_table.c | 160 ++++++++++++++++++ src/backend/postmaster/pgstat.c | 38 +++-- src/backend/postmaster/syslogger.c | 3 + .../libpqwalreceiver/libpqwalreceiver.c | 33 ++++ src/backend/storage/ipc/latch.c | 73 ++++---- src/include/port.h | 18 ++ src/include/storage/latch.h | 2 +- 10 files changed, 302 insertions(+), 56 deletions(-) create mode 100644 src/backend/port/socket_table.c diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 29fcb6a76e..783b9a3a74 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -17,6 +17,7 @@ #include "catalog/pg_user_mapping.h" #include "commands/defrem.h" #include "funcapi.h" +#include "libpq-events.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -109,6 +110,7 @@ static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); +static int pgfdw_eventproc(PGEventId evtId, void *evtInfo, void *passThrough); /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -469,6 +471,10 @@ connect_pg_server(ForeignServer *server, UserMapping *user) server->servername), errdetail_internal("%s", pchomp(PQerrorMessage(conn))))); + /* Make sure that all sockets are registered for use by latch.c. */ + if (!PQregisterEventProc(conn, pgfdw_eventproc, "postgres_fdw", NULL)) + elog(ERROR, "postgres_fdw could not register for libpq events"); + /* * Check that non-superuser has used password to establish connection; * otherwise, he's piggybacking on the postgres server's user @@ -1709,3 +1715,22 @@ disconnect_cached_connections(Oid serverid) return result; } + +/* + * A callback for libpq to allow its sockets to be used in WaitEventSet on Windows. + */ +static int +pgfdw_eventproc(PGEventId evtId, void *evtInfo, void *passThrough) +{ + if (evtId == PGEVT_SOCKET) + { + PGEventSocket *evt = (PGEventSocket *) evtInfo; + return SocketTableAdd(evt->socket, true) ? 1 : 0; + } + else if (evtId == PGEVT_SOCKETCLOSE) + { + PGEventSocket *evt = (PGEventSocket *) evtInfo; + SocketTableDrop(evt->socket); + } + return 1; +} diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index f05723dc92..bb35618af4 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -204,6 +204,9 @@ pq_init(void) (errmsg("could not set socket to nonblocking mode: %m"))); #endif + /* Make sure the socket can be used in a WaitEventSet on Windows. */ + SocketTableAdd(MyProcPort->sock, false); + FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3); socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile index 2d00b4f05a..03df581e06 100644 --- a/src/backend/port/Makefile +++ b/src/backend/port/Makefile @@ -25,7 +25,8 @@ OBJS = \ $(TAS) \ atomics.o \ pg_sema.o \ - pg_shmem.o + pg_shmem.o\ + socket_table.o ifeq ($(PORTNAME), win32) SUBDIRS += win32 diff --git a/src/backend/port/socket_table.c b/src/backend/port/socket_table.c new file mode 100644 index 0000000000..c2bbc2e0cb --- /dev/null +++ b/src/backend/port/socket_table.c @@ -0,0 +1,160 @@ +/*------------------------------------------------------------------------- + * + * socket_table.c + * Routines for dealing with extra per-socket state on Windows. + * + * Windows sockets can only safely be associate with one 'event', or they risk + * losing FD_CLOSE events. FD_CLOSE is also edge-triggered and not + * resettable, so we need space for state to make it level-triggered. + * + * These functions are no-ops on Unix systems, except in assertion builds + * where we do the minimum book keeping required to report failure to register + * sockets appropriately. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/port/socket_table.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "common/hashfn.h" + +typedef struct SocketTableEntry +{ + pgsocket sock; + int status; + ExtraSocketState *extra; +} SocketTableEntry; + +#define SH_PREFIX socktab +#define SH_ELEMENT_TYPE SocketTableEntry +#define SH_KEY_TYPE pgsocket +#define SH_KEY sock +#define SH_HASH_KEY(tb, key) hash_bytes((uint8 *) &(key), sizeof(key)) +#define SH_EQUAL(tb, a, b) ((a) == (b)) +#define SH_SCOPE static inline +#define SH_NO_OOM +#define SH_DECLARE +#define SH_DEFINE +#include "lib/simplehash.h" + +#if defined(WIN32) || defined(USE_ASSERT_CHECKING) +static socktab_hash *socket_table; +#endif + +/* + * Must be called exactly once for each socket before including it in code + * that requires ExtraSocketState, such as WaitEventSet. The socket must be + * not already registered, and must be dropped before closing it. Return NULL + * on out-of-memory, if no_oom is true, otherwise raises errors. + */ +ExtraSocketState * +SocketTableAdd(pgsocket sock, bool no_oom) +{ +#if defined(WIN32) || defined(USE_ASSERT_CHECKING) + SocketTableEntry *ste; + ExtraSocketState *ess; + bool found; + + /* + * Create a new SocketTableEntry object. We can't use a pointer into the + * hash table because it would move around, and we want WaitEventSet to be + * able to hold pointers. + */ + if (!(ess = malloc(sizeof(*ess)))) + goto out_of_memory; + memset(ess, 0, sizeof(*ess)); + +#ifdef WIN32 + /* Set up Windows kernel event for this socket. */ + ess->event_handle = WSACreateEvent(); + if (ess->event_handle == WSA_INVALID_EVENT) + goto out_of_memory; +#endif + + /* Create socket_table on demand. */ + /* XXX dedicated memory context for debugging? */ + if (!socket_table && + (!(socket_table = socktab_create(TopMemoryContext, 128, NULL)))) + goto out_of_memory; + + /* Try to insert. */ + ste = socktab_insert(socket_table, sock, &found); + if (!ste) + goto out_of_memory; + if (found) + { +#ifdef WIN32 + WSACloseEvent(ess->event_handle); +#endif + free(ess); + elog(ERROR, "cannot register socket, already registered"); + } + + /* Success. */ + ste->extra = ess; + return ess; + + out_of_memory: + if (ess) + { +#ifdef WIN32 + if (ess->event_handle != WSA_INVALID_EVENT) + WSACloseEvent(ess->event_handle); +#endif + free(ess); + } + if (!no_oom) + elog(ERROR, "out of memory"); + return NULL; +#else + return NULL; +#endif +} + +/* + * Unregister a socket that was registered. This must be done before closing + * the socket. + */ +void +SocketTableDrop(pgsocket sock) +{ +#if defined(WIN32) || defined(USE_ASSERT_CHECKING) + SocketTableEntry *ste = NULL; + ExtraSocketState *ess; + + if (socket_table) + ste = socktab_lookup(socket_table, sock); + if (!ste) + elog(ERROR, "cannot unregister socket that is not registered"); + ess = ste->extra; + +#ifdef WIN32 + WSACloseEvent(ess->event_handle); +#endif + + free(ess); + socktab_delete_item(socket_table, ste); +#endif +} + +ExtraSocketState * +SocketTableGet(pgsocket sock) +{ +#if defined(WIN32) || defined(USE_ASSERT_CHECKING) + SocketTableEntry *ste = NULL; + + if (socket_table) + ste = socktab_lookup(socket_table, sock); + if (!ste) + elog(ERROR, "socket is not registered"); + return ste->extra; +#else + return NULL; +#endif +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 0646f53098..dd95353880 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -382,6 +382,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len); static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len); +static void pgstat_release_socket(void); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -483,8 +484,7 @@ pgstat_init(void) ereport(LOG, (errcode_for_socket_access(), errmsg("could not bind socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + pgstat_release_socket(); continue; } @@ -494,8 +494,7 @@ pgstat_init(void) ereport(LOG, (errcode_for_socket_access(), errmsg("could not get address of socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + pgstat_release_socket(); continue; } @@ -510,8 +509,7 @@ pgstat_init(void) ereport(LOG, (errcode_for_socket_access(), errmsg("could not connect socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + pgstat_release_socket(); continue; } @@ -531,8 +529,7 @@ retry1: ereport(LOG, (errcode_for_socket_access(), errmsg("could not send test message on socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + pgstat_release_socket(); continue; } @@ -557,8 +554,7 @@ retry1: ereport(LOG, (errcode_for_socket_access(), errmsg("select() failed in statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + pgstat_release_socket(); continue; } if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset)) @@ -572,8 +568,7 @@ retry1: ereport(LOG, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("test message did not get through on socket for statistics collector"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + pgstat_release_socket(); continue; } @@ -587,8 +582,7 @@ retry2: ereport(LOG, (errcode_for_socket_access(), errmsg("could not receive test message on socket for statistics collector: %m"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + pgstat_release_socket(); continue; } @@ -597,8 +591,7 @@ retry2: ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("incorrect test message transmission on socket for statistics collector"))); - closesocket(pgStatSock); - pgStatSock = PGINVALID_SOCKET; + pgstat_release_socket(); continue; } @@ -669,7 +662,7 @@ startup_failed: pg_freeaddrinfo_all(hints.ai_family, addrs); if (pgStatSock != PGINVALID_SOCKET) - closesocket(pgStatSock); + pgstat_release_socket(); pgStatSock = PGINVALID_SOCKET; /* @@ -3527,6 +3520,9 @@ PgstatCollectorMain(int argc, char *argv[]) pgStatRunningInCollector = true; pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true); + /* Make sure pgStatSock can be used in a WaitEventSet on Windows. */ + SocketTableAdd(pgStatSock, false); + /* Prepare to wait for our latch or data in our socket. */ wes = CreateWaitEventSet(CurrentMemoryContext, 3); AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); @@ -6418,3 +6414,11 @@ pgstat_count_slru_truncate(int slru_idx) { slru_entry(slru_idx)->m_truncate += 1; } + +static void +pgstat_release_socket(void) +{ + SocketTableDrop(pgStatSock); + closesocket(pgStatSock); + pgStatSock = PGINVALID_SOCKET; +} diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c index 25e2131e31..aa551b2719 100644 --- a/src/backend/postmaster/syslogger.c +++ b/src/backend/postmaster/syslogger.c @@ -301,6 +301,9 @@ SysLoggerMain(int argc, char *argv[]) */ whereToSendOutput = DestNone; + /* Make sure the pipe can be used in a WaitEventSet on Windows. */ + SocketTableAdd((pgsocket) syslogPipe[0], false); + /* * Set up a reusable WaitEventSet object we'll use to wait for our latch, * and (except on Windows) our socket. diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0d89db4e6a..8508f93aa6 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -23,6 +23,7 @@ #include "catalog/pg_type.h" #include "common/connect.h" #include "funcapi.h" +#include "libpq-events.h" #include "libpq-fe.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -83,6 +84,10 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const Oid *retTypes); static void libpqrcv_disconnect(WalReceiverConn *conn); +static int libpqrcv_eventproc(PGEventId evtId, + void *evtInfo, + void *passThrough); + static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, libpqrcv_check_conninfo, @@ -182,6 +187,15 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, return NULL; } + if (PQregisterEventProc(conn->streamConn, libpqrcv_eventproc, + "libwalrcv", NULL) == 0) + { + PQfinish(conn->streamConn); + conn->streamConn = NULL; + *err = pstrdup("could not register for libpq events"); + return NULL; + } + /* * Poll connection until we have OK or FAILED status. * @@ -1165,3 +1179,22 @@ stringlist_to_identifierstr(PGconn *conn, List *strings) return res.data; } + +/* + * A callback for libpq to allow its sockets to be used in WaitEventSet on Windows. + */ +static int +libpqrcv_eventproc(PGEventId evtId, void *evtInfo, void *passThrough) +{ + if (evtId == PGEVT_SOCKET) + { + PGEventSocket *evt = (PGEventSocket *) evtInfo; + return SocketTableAdd(evt->socket, true) ? 1 : 0; + } + else if (evtId == PGEVT_SOCKETCLOSE) + { + PGEventSocket *evt = (PGEventSocket *) evtInfo; + SocketTableDrop(evt->socket); + } + return 1; +} diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 61c876beff..5a6fb36fa2 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -60,6 +60,7 @@ #include "storage/shmem.h" #include "utils/memutils.h" + /* * Select the fd readiness primitive to use. Normally the "most modern" * primitive supported by the OS will be used, but for testing it can be @@ -803,28 +804,6 @@ FreeWaitEventSet(WaitEventSet *set) #elif defined(WAIT_USE_KQUEUE) close(set->kqueue_fd); ReleaseExternalFD(); -#elif defined(WAIT_USE_WIN32) - WaitEvent *cur_event; - - for (cur_event = set->events; - cur_event < (set->events + set->nevents); - cur_event++) - { - if (cur_event->events & WL_LATCH_SET) - { - /* uses the latch's HANDLE */ - } - else if (cur_event->events & WL_POSTMASTER_DEATH) - { - /* uses PostmasterHandle */ - } - else - { - /* Clean up the event object we created for the socket */ - WSAEventSelect(cur_event->fd, NULL, 0); - WSACloseEvent(set->handles[cur_event->pos + 1]); - } - } #endif pfree(set); @@ -897,9 +876,17 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, event->fd = fd; event->events = events; event->user_data = user_data; + + if (events & WL_SOCKET_MASK) + { #ifdef WIN32 - event->reset = false; + /* Point to our Windows event tracking state for the socket. */ + event->extra_socket_state = SocketTableGet(fd); +#else + /* On Unix, check it's registered to avoid portability bugs. */ + (void) SocketTableGet(fd); #endif + } if (events == WL_LATCH_SET) { @@ -1265,6 +1252,7 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) } else { + ExtraSocketState *ess; int flags = FD_CLOSE; /* always check for errors/EOF */ if (event->events & WL_SOCKET_READABLE) @@ -1274,18 +1262,19 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) if (event->events & WL_SOCKET_CONNECTED) flags |= FD_CONNECT; - if (*handle == WSA_INVALID_EVENT) - { - *handle = WSACreateEvent(); - if (*handle == WSA_INVALID_EVENT) - elog(ERROR, "failed to create event for socket: error code %d", - WSAGetLastError()); - } - if (WSAEventSelect(event->fd, *handle, flags) != 0) + /* Find the event associated with this socket. */ + ess = event->extra_socket_state; + *handle = ess->event_handle; + + /* + * Adjust the flags for this event, if they don't already match what we + * want to wait for. + */ + if (ess->flags != flags && + WSAEventSelect(event->fd, *handle, flags) != 0) elog(ERROR, "failed to set up event for socket: error code %d", WSAGetLastError()); - - Assert(event->fd != PGINVALID_SOCKET); + ess->flags = flags; } } #endif @@ -1845,10 +1834,19 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, cur_event < (set->events + set->nevents); cur_event++) { - if (cur_event->reset) - { + /* Reset the event mask if necessary. */ + if (cur_event->events & WL_SOCKET_MASK) WaitEventAdjustWin32(set, cur_event); - cur_event->reset = false; + + /* If we've already seen FD_CLOSE, keep reporting readiness. */ + if ((cur_event->events & WL_SOCKET_MASK) && + cur_event->extra_socket_state->seen_fd_close) + { + occurred_events->pos = cur_event->pos; + occurred_events->user_data = cur_event->user_data; + occurred_events->events = cur_event->events; + occurred_events->fd = cur_event->fd; + return 1; } /* @@ -1984,7 +1982,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, * for the behavior of socket events. *------ */ - cur_event->reset = true; + cur_event->extra_socket_state->flags = 0; } if ((cur_event->events & WL_SOCKET_WRITEABLE) && (resEvents.lNetworkEvents & FD_WRITE)) @@ -2002,6 +2000,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, { /* EOF/error, so signal all caller-requested socket flags */ occurred_events->events |= (cur_event->events & WL_SOCKET_MASK); + cur_event->extra_socket_state->seen_fd_close = true; } if (occurred_events->events != 0) diff --git a/src/include/port.h b/src/include/port.h index 3d103a2b31..e780852bd1 100644 --- a/src/include/port.h +++ b/src/include/port.h @@ -543,4 +543,22 @@ extern char *wait_result_to_str(int exit_status); extern bool wait_result_is_signal(int exit_status, int signum); extern bool wait_result_is_any_signal(int exit_status, bool include_command_not_found); +/* backend/port/socket_table.c */ +#if !defined(FRONTEND) +struct ExtraSocketState +{ +#ifdef WIN32 + HANDLE event_handle; /* one event for the life of the socket */ + int flags; /* most recent WSAEventSelect() flags */ + bool seen_fd_close; /* has FD_CLOSE been received? */ +#else + int dummy; /* none of this is needed for Unix */ +#endif +}; +typedef struct ExtraSocketState ExtraSocketState; +extern ExtraSocketState *SocketTableAdd(pgsocket sock, bool no_oom); +extern ExtraSocketState *SocketTableGet(pgsocket sock); +extern void SocketTableDrop(pgsocket sock); +#endif + #endif /* PG_PORT_H */ diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h index 3aa7b33834..576d339829 100644 --- a/src/include/storage/latch.h +++ b/src/include/storage/latch.h @@ -146,7 +146,7 @@ typedef struct WaitEvent pgsocket fd; /* socket fd associated with event */ void *user_data; /* pointer provided in AddWaitEventToSet */ #ifdef WIN32 - bool reset; /* Is reset of the event required? */ + ExtraSocketState *extra_socket_state; #endif } WaitEvent; -- 2.33.1