On Tue, Jan 25, 2022 at 3:50 PM Tom Lane <t...@sss.pgh.pa.us> wrote:
> Thomas Munro <thomas.mu...@gmail.com> writes:
> > I vote for reverting in release branches only.  I'll propose a better
> > WES patch set for master that hopefully also covers async append etc
> > (which I was already planning to do before we knew about this Windows
> > problem).  More soon.
>
> WFM, but we'll have to remember to revert this in v15 if we don't
> have a solid fix by then.

Phew, after a couple of days of very slow compile/test cycles on
Windows exploring a couple of different ideas, I finally have
something new.  First let me recap the three main ideas in this
thread:

1.  It sounds like no one really loves the WSAPoll() kludge, even
though it apparently works for simple cases.  It's not totally clear
that it really works in enough cases, for one thing.  It doesn't allow
for a socket to be in two WESes at the same time, and I'm not sure I
want to bank on Winsock's WSAPoll() being guaranteed to report POLLHUP
when half closed (as mentioned, no other OS does AFAIK).

2.  The long-lived-WaitEventSets-everywhere concept was initially
appealling to me and solves the walreceiver problem (when combined
with a sticky seen_fd_close flag), and I've managed to get that
working correctly across libpq reconnects.  As mentioned, I also have
some toy patches along those lines for the equivalent but more complex
problem in postgres_fdw, because I've been studying how to make
parallel append generate a tidy stream of epoll_wait()/kevent() calls,
instead of a quadratic explosion of setup/teardown spam.  I'll write
some more about those patches and hopefully propose them soon anyway,
but on reflection I don't really want that Unix efficiency problem to
be tangled up with this Windows correctness problem.  That'd require a
programming rule that I don't want to burden us with forever: you'd
*never* be able to use a socket in more than one WaitEventSet, and
WaitLatchOrSocket() would have to be removed.

3.  The real solution to this problem is to recognise that we just
have the event objects in the wrong place.  WaitEventSets shouldn't
own them: they need to be 1:1 with sockets, or Winsock will eat
events.  Likewise for the flag you need for edge->level conversion, or
*we'll* eat events.  Having now tried that, it's starting to feel like
the best way forward, even though my initial prototype (see attached)
is maybe a tad cumbersome with bookkeeping.  I believe it means that
all existing coding patterns *should* now be safe (not yet confirmed
by testing), and we're free to put sockets in multiple WESes even at
the same time if the need arises.

The basic question is: how should a socket user find the associated
event handle and flags?  Some answers:

1.  "pgsocket" could become a pointer to a heap-allocated wrapper
object containing { socket, event, flags } on Windows, or something
like that, but that seems a bit invasive and tangled up with public
APIs like libpq, which put me off trying that.  I'm willing to explore
it if people object to my other idea.

2.  "pgsocket" could stay unchanged, but we could have a parallel
array with extra socket state, indexed by file descriptor.  We could
use new socket()/close() libpq events so that libpq's sockets could be
registered in this scheme without libpq itself having to know anything
about that.  That worked pretty nicely when I developed it on my
FreeBSD box, but on Windows I soon learned that SOCKET is really yet
another name for HANDLE, so it's not a dense number space anchored at
0 like Unix file descriptors.  The array could be prohibitively big.

3.  I tried the same as #2 but with a hash table, and ran into another
small problem when putting it all together: we probably don't want to
longjump out of libpq callbacks on allocation failure.  So, I modified
simplehash to add a no-OOM behaviour.  That's the POC patch set I'm
attaching for show-and-tell.  Some notes and TODOs in the commit
messages and comments.

Thoughts?
From bdd90aeb65d82ecae8fe58b441d25a1e1b129bf3 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 29 Jan 2022 02:15:10 +1300
Subject: [PATCH 1/3] Add low level socket events for libpq.

Provide a way to get a callback when a socket is created or closed.

XXX TODO handle callback failure
XXX TODO investigate overheads/other implications of having a callback
installed
---
 src/interfaces/libpq/fe-connect.c   | 24 ++++++++++++++++++++++++
 src/interfaces/libpq/libpq-events.c | 11 +++++++++++
 src/interfaces/libpq/libpq-events.h | 10 +++++++++-
 3 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index a6a1db3356..ddc3f38cf1 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -452,7 +452,19 @@ pqDropConnection(PGconn *conn, bool flushInput)
 
 	/* Close the socket itself */
 	if (conn->sock != PGINVALID_SOCKET)
+	{
+		/* Report that the socket is closing. */
+		for (int i = 0; i < conn->nEvents; i++)
+		{
+			PGEventSocket evt;
+
+			evt.conn = conn;
+			evt.socket = conn->sock;
+			(void) conn->events[i].proc(PGEVT_SOCKETCLOSE, &evt,
+										conn->events[i].passThrough);
+		}
 		closesocket(conn->sock);
+	}
 	conn->sock = PGINVALID_SOCKET;
 
 	/* Optionally discard any unread data */
@@ -2576,6 +2588,18 @@ keep_going:						/* We will come back to here until there is
 						goto error_return;
 					}
 
+					/* Report that a new socket has been created. */
+					for (int i = 0; i < conn->nEvents; i++)
+					{
+						PGEventSocket evt;
+
+						evt.conn = conn;
+						evt.socket = conn->sock;
+						(void) conn->events[i].proc(PGEVT_SOCKET, &evt,
+													conn->events[i].passThrough);
+						/* XXX check call result and fail */
+					}
+
 					/*
 					 * Once we've identified a target address, all errors
 					 * except the preceding socket()-failure case should be
diff --git a/src/interfaces/libpq/libpq-events.c b/src/interfaces/libpq/libpq-events.c
index 7754c37748..2c549ec457 100644
--- a/src/interfaces/libpq/libpq-events.c
+++ b/src/interfaces/libpq/libpq-events.c
@@ -87,6 +87,17 @@ PQregisterEventProc(PGconn *conn, PGEventProc proc,
 		return false;
 	}
 
+	/* If we already have a socket, report a socket event immediately. */
+	if (conn->sock != PGINVALID_SOCKET)
+	{
+		PGEventSocket evt;
+
+		evt.conn = conn;
+		evt.socket = conn->sock;
+		(void) proc(PGEVT_SOCKET, &evt, passThrough);
+		/* XXX check return and fail (callback out of memory) */
+	}
+
 	return true;
 }
 
diff --git a/src/interfaces/libpq/libpq-events.h b/src/interfaces/libpq/libpq-events.h
index d93b7c4d4e..da64dc866b 100644
--- a/src/interfaces/libpq/libpq-events.h
+++ b/src/interfaces/libpq/libpq-events.h
@@ -31,7 +31,9 @@ typedef enum
 	PGEVT_CONNDESTROY,
 	PGEVT_RESULTCREATE,
 	PGEVT_RESULTCOPY,
-	PGEVT_RESULTDESTROY
+	PGEVT_RESULTDESTROY,
+	PGEVT_SOCKET,
+	PGEVT_SOCKETCLOSE
 } PGEventId;
 
 typedef struct
@@ -66,6 +68,12 @@ typedef struct
 	PGresult   *result;
 } PGEventResultDestroy;
 
+typedef struct
+{
+	PGconn	   *conn;
+	pgsocket   socket;
+} PGEventSocket;
+
 typedef int (*PGEventProc) (PGEventId evtId, void *evtInfo, void *passThrough);
 
 /* Registers an event proc with the given PGconn. */
-- 
2.33.1

From 9e03fb83dcd95b7b793728b21726a1494943d47e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 1 Feb 2022 11:21:57 +1300
Subject: [PATCH 2/3] Allow no-throw allocators in simplehash.h.

Previously, elog-style error reporting was assumed.  Allow SH_NO_OOM to
be defined, to indicate that you'd like creation and insertion to return
NULL to indicate failure instead, for use in no-throw programming
contexts.  A custom allocation function should return NULL on failure,
and the built-in MemoryContext support will use MCXT_NO_OOM when this
option is used.

Also allow SH_RAW_FREE to be defined, and SH_RAW_ALLOCATOR_NOZERO to
state that the SH_RAW_ALLOCATOR doesn't zero memory.  These
configuration points allow standard malloc and free to be used, which
was mostly interesting to allow testing of the SH_NO_OOM with different
allocator options.

XXX Proof-of-concept
---
 src/include/lib/simplehash.h | 74 +++++++++++++++++++++++++++++++-----
 1 file changed, 64 insertions(+), 10 deletions(-)

diff --git a/src/include/lib/simplehash.h b/src/include/lib/simplehash.h
index 8192927010..758669e1ea 100644
--- a/src/include/lib/simplehash.h
+++ b/src/include/lib/simplehash.h
@@ -41,7 +41,11 @@
  *	  - SH_SCOPE - in which scope (e.g. extern, static inline) do function
  *		declarations reside
  *	  - SH_RAW_ALLOCATOR - if defined, memory contexts are not used; instead,
- *	    use this to allocate bytes. The allocator must zero the returned space.
+ *	    use this to allocate bytes. The allocator must zero the returned space,
+ *	    or SH_RAW_ALLOCATOR_NOZERO must also be defined.
+ *	  - SH_RAW_FREE - if defined, use this to free memory, instead of pfree()
+ *	  - SH_NO_OOM - if defined, the allocator can fail by returning NULL,
+ *	    otherwise allocator failure is assumed to exit nonlocally (ERROR)
  *	  - SH_USE_NONDEFAULT_ALLOCATOR - if defined no element allocator functions
  *		are defined, so you can supply your own
  *	  The following parameters are only relevant when SH_DEFINE is defined:
@@ -204,14 +208,14 @@ SH_SCOPE void SH_DESTROY(SH_TYPE * tb);
 SH_SCOPE void SH_RESET(SH_TYPE * tb);
 
 /* void <prefix>_grow(<prefix>_hash *tb, uint64 newsize) */
-SH_SCOPE void SH_GROW(SH_TYPE * tb, uint64 newsize);
+SH_SCOPE bool SH_GROW(SH_TYPE * tb, uint64 newsize);
 
 /* <element> *<prefix>_insert(<prefix>_hash *tb, <key> key, bool *found) */
 SH_SCOPE	SH_ELEMENT_TYPE *SH_INSERT(SH_TYPE * tb, SH_KEY_TYPE key, bool *found);
 
 /*
  * <element> *<prefix>_insert_hash(<prefix>_hash *tb, <key> key, uint32 hash,
- * 								  bool *found)
+ *								  bool *found)
  */
 SH_SCOPE	SH_ELEMENT_TYPE *SH_INSERT_HASH(SH_TYPE * tb, SH_KEY_TYPE key,
 											uint32 hash, bool *found);
@@ -282,6 +286,12 @@ SH_SCOPE void SH_STAT(SH_TYPE * tb);
 #define SH_COMPARE_KEYS(tb, ahash, akey, b) (SH_EQUAL(tb, b->SH_KEY, akey))
 #endif
 
+#ifdef SH_NO_OOM
+#define SH_MCXT_FLAGS (MCXT_ALLOC_ZERO | MCXT_ALLOC_NO_OOM)
+#else
+#define SH_MCXT_FLAGS MCXT_ALLOC_ZERO
+#endif
+
 /*
  * Wrap the following definitions in include guards, to avoid multiple
  * definition errors if this header is included more than once.  The rest of
@@ -400,10 +410,18 @@ static inline void *
 SH_ALLOCATE(SH_TYPE * type, Size size)
 {
 #ifdef SH_RAW_ALLOCATOR
-	return SH_RAW_ALLOCATOR(size);
+	void *result = SH_RAW_ALLOCATOR(size);
+#ifdef SH_RAW_ALLOCATOR_NOZERO
+#ifdef SH_NO_OOM
+	if (!result)
+		return NULL;
+#endif
+	memset(result, 0, size);
+#endif
+	return result;
 #else
 	return MemoryContextAllocExtended(type->ctx, size,
-									  MCXT_ALLOC_HUGE | MCXT_ALLOC_ZERO);
+									  SH_MCXT_FLAGS | MCXT_ALLOC_HUGE);
 #endif
 }
 
@@ -411,7 +429,11 @@ SH_ALLOCATE(SH_TYPE * type, Size size)
 static inline void
 SH_FREE(SH_TYPE * type, void *pointer)
 {
+#ifdef SH_RAW_FREE
+	SH_RAW_FREE(pointer);
+#else
 	pfree(pointer);
+#endif
 }
 
 #endif
@@ -439,7 +461,16 @@ SH_CREATE(MemoryContext ctx, uint32 nelements, void *private_data)
 #ifdef SH_RAW_ALLOCATOR
 	tb = SH_RAW_ALLOCATOR(sizeof(SH_TYPE));
 #else
-	tb = MemoryContextAllocZero(ctx, sizeof(SH_TYPE));
+	tb = MemoryContextAllocExtended(ctx, sizeof(SH_TYPE), SH_MCXT_FLAGS);
+#endif
+#ifdef SH_NO_OOM
+	if (!tb)
+		return NULL;
+#endif
+#ifdef SH_RAW_ALLOCATOR_NOZERO
+	memset(tb, 0, sizeof(SH_TYPE));
+#endif
+#ifndef SH_RAW_ALLOCATOR
 	tb->ctx = ctx;
 #endif
 	tb->private_data = private_data;
@@ -451,6 +482,14 @@ SH_CREATE(MemoryContext ctx, uint32 nelements, void *private_data)
 
 	tb->data = SH_ALLOCATE(tb, sizeof(SH_ELEMENT_TYPE) * tb->size);
 
+#ifdef SH_NO_OOM
+	if (!tb->data)
+	{
+		SH_FREE(tb, tb);
+		return NULL;
+	}
+#endif
+
 	return tb;
 }
 
@@ -476,8 +515,10 @@ SH_RESET(SH_TYPE * tb)
  * Usually this will automatically be called by insertions/deletions, when
  * necessary. But resizing to the exact input size can be advantageous
  * performance-wise, when known at some point.
+ *
+ * Return true on success.  Can only return false if SH_NO_OOM is defined.
  */
-SH_SCOPE void
+SH_SCOPE bool
 SH_GROW(SH_TYPE * tb, uint64 newsize)
 {
 	uint64		oldsize = tb->size;
@@ -494,9 +535,18 @@ SH_GROW(SH_TYPE * tb, uint64 newsize)
 	/* compute parameters for new table */
 	SH_COMPUTE_PARAMETERS(tb, newsize);
 
-	tb->data = SH_ALLOCATE(tb, sizeof(SH_ELEMENT_TYPE) * tb->size);
+	newdata = SH_ALLOCATE(tb, sizeof(SH_ELEMENT_TYPE) * newsize);
+
+#ifdef SH_NO_OOM
+	if (!newdata)
+		return false;
+#endif
+#ifdef SH_RAW_ALLOCATOR_NOZERO
+	memset(newdata, 0, sizeof(SH_ELEMENT_TYPE) * newsize);
+#endif
 
-	newdata = tb->data;
+	tb->data = newdata;
+	tb->size = newsize;
 
 	/*
 	 * Copy entries from the old data to newdata. We theoretically could use
@@ -581,6 +631,8 @@ SH_GROW(SH_TYPE * tb, uint64 newsize)
 	}
 
 	SH_FREE(tb, olddata);
+
+	return true;
 }
 
 /*
@@ -615,7 +667,8 @@ restart:
 		 * When optimizing, it can be very useful to print these out.
 		 */
 		/* SH_STAT(tb); */
-		SH_GROW(tb, tb->size * 2);
+		if (!SH_GROW(tb, tb->size * 2))
+			return NULL;
 		/* SH_STAT(tb); */
 	}
 
@@ -1147,6 +1200,7 @@ SH_STAT(SH_TYPE * tb)
 #undef SH_GROW_MAX_MOVE
 #undef SH_GROW_MIN_FILLFACTOR
 #undef SH_MAX_SIZE
+#undef SH_MCXT_FLAGS
 
 /* types */
 #undef SH_TYPE
-- 
2.33.1

From 2222901c1947bb4c72f26b5706f5f3248cec96b8 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sun, 30 Jan 2022 13:01:13 +1300
Subject: [PATCH 3/3] Switch to per-socket Windows event handles.

Previously, each WaitEventSet would create a new operating system
'event' object for each socket.  Create a dedicated event and some state
flags for each socket, independent of WaitEventSets.  This fixes three
problems:

1.  When a socket is used in a new WaitEventSet (for example, one after
another when using temporary WaitEventSets), queued but not yet reported
FD_CLOSE events could be lost by the operating system.  This is now
avoided with a single long-lived event for the socket.

2.  We need to report Winsock's FD_CLOSE event as WL_SOCKET_READABLE
with level-triggered semantics, but it's only reported once by Winsock
on graceful socket shutdown, so could be lost if a caller waits twice in
a row without reading the socket.  This is avoided by adding a flag to
remember that we've already received FD_CLOSE.

3.  It should also be possible to use a socket in two WaitEventSet
objects at the same time (though not from two threads without more
work), since the network event mask will be adjusted on each wait if
required when you switch between sets containing the same socket.
Though this isn't currently needed by existing PostgreSQL code AFAIK,
it's a nice restriction to lift, to match Unix.

Manage the new extra per-socket state in a hash table indexed by socket,
and require clients of WaitEventSet to register and deregister sockets
with a new API.  On Unix this is a no-op, other than correctness checks
in assertion builds.  For libwalreceiver and postgres_fdw, a new
libpq-events callback automatically manages socket registration.

XXX Proof of concept

XXX There is some more temporary event-switching going on inside
src/backend/port/win32/socket.c, but not in code paths that were
affected by the recent graceful shutdown brokenness.  They may need to
be integrated with this, potentially widening the set of sockets that
need to be registered with socket_table.c to 'all of them'?

Discussion: https://postgr.es/m/CA%2BhUKG%2BOeoETZQ%3DQw5Ub5h3tmwQhBmDA%3DnuNO3KG%3DzWfUypFAw%40mail.gmail.com
---
 contrib/postgres_fdw/connection.c             |  25 +++
 src/backend/libpq/pqcomm.c                    |   3 +
 src/backend/port/Makefile                     |   3 +-
 src/backend/port/socket_table.c               | 160 ++++++++++++++++++
 src/backend/postmaster/pgstat.c               |  38 +++--
 src/backend/postmaster/syslogger.c            |   3 +
 .../libpqwalreceiver/libpqwalreceiver.c       |  33 ++++
 src/backend/storage/ipc/latch.c               |  73 ++++----
 src/include/port.h                            |  18 ++
 src/include/storage/latch.h                   |   2 +-
 10 files changed, 302 insertions(+), 56 deletions(-)
 create mode 100644 src/backend/port/socket_table.c

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 29fcb6a76e..783b9a3a74 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,6 +17,7 @@
 #include "catalog/pg_user_mapping.h"
 #include "commands/defrem.h"
 #include "funcapi.h"
+#include "libpq-events.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -109,6 +110,7 @@ static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql,
 								bool toplevel);
 static bool UserMappingPasswordRequired(UserMapping *user);
 static bool disconnect_cached_connections(Oid serverid);
+static int	pgfdw_eventproc(PGEventId evtId, void *evtInfo, void *passThrough);
 
 /*
  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -469,6 +471,10 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
 							server->servername),
 					 errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
 
+		/* Make sure that all sockets are registered for use by latch.c. */
+		if (!PQregisterEventProc(conn, pgfdw_eventproc, "postgres_fdw", NULL))
+			elog(ERROR, "postgres_fdw could not register for libpq events");
+
 		/*
 		 * Check that non-superuser has used password to establish connection;
 		 * otherwise, he's piggybacking on the postgres server's user
@@ -1709,3 +1715,22 @@ disconnect_cached_connections(Oid serverid)
 
 	return result;
 }
+
+/*
+ * A callback for libpq to allow its sockets to be used in WaitEventSet on Windows.
+ */
+static int
+pgfdw_eventproc(PGEventId evtId, void *evtInfo, void *passThrough)
+{
+	if (evtId == PGEVT_SOCKET)
+	{
+		PGEventSocket *evt = (PGEventSocket *) evtInfo;
+		return SocketTableAdd(evt->socket, true) ? 1 : 0;
+	}
+	else if (evtId == PGEVT_SOCKETCLOSE)
+	{
+		PGEventSocket *evt = (PGEventSocket *) evtInfo;
+		SocketTableDrop(evt->socket);
+	}
+	return 1;
+}
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index f05723dc92..bb35618af4 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -204,6 +204,9 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
+	/* Make sure the socket can be used in a WaitEventSet on Windows. */
+	SocketTableAdd(MyProcPort->sock, false);
+
 	FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
 	socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
 								   MyProcPort->sock, NULL, NULL);
diff --git a/src/backend/port/Makefile b/src/backend/port/Makefile
index 2d00b4f05a..03df581e06 100644
--- a/src/backend/port/Makefile
+++ b/src/backend/port/Makefile
@@ -25,7 +25,8 @@ OBJS = \
 	$(TAS) \
 	atomics.o \
 	pg_sema.o \
-	pg_shmem.o
+	pg_shmem.o\
+	socket_table.o
 
 ifeq ($(PORTNAME), win32)
 SUBDIRS += win32
diff --git a/src/backend/port/socket_table.c b/src/backend/port/socket_table.c
new file mode 100644
index 0000000000..c2bbc2e0cb
--- /dev/null
+++ b/src/backend/port/socket_table.c
@@ -0,0 +1,160 @@
+/*-------------------------------------------------------------------------
+ *
+ * socket_table.c
+ *	  Routines for dealing with extra per-socket state on Windows.
+ *
+ * Windows sockets can only safely be associate with one 'event', or they risk
+ * losing FD_CLOSE events.  FD_CLOSE is also edge-triggered and not
+ * resettable, so we need space for state to make it level-triggered.
+ *
+ * These functions are no-ops on Unix systems, except in assertion builds
+ * where we do the minimum book keeping required to report failure to register
+ * sockets appropriately.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/port/socket_table.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "common/hashfn.h"
+
+typedef struct SocketTableEntry
+{
+	pgsocket	sock;
+	int			status;
+	ExtraSocketState *extra;
+} SocketTableEntry;
+
+#define SH_PREFIX		socktab
+#define SH_ELEMENT_TYPE SocketTableEntry
+#define SH_KEY_TYPE		pgsocket
+#define SH_KEY			sock
+#define SH_HASH_KEY(tb, key)	hash_bytes((uint8 *) &(key), sizeof(key))
+#define SH_EQUAL(tb, a, b)		((a) == (b))
+#define SH_SCOPE static inline
+#define SH_NO_OOM
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
+#if defined(WIN32) || defined(USE_ASSERT_CHECKING)
+static socktab_hash *socket_table;
+#endif
+
+/*
+ * Must be called exactly once for each socket before including it in code
+ * that requires ExtraSocketState, such as WaitEventSet.  The socket must be
+ * not already registered, and must be dropped before closing it.  Return NULL
+ * on out-of-memory, if no_oom is true, otherwise raises errors.
+ */
+ExtraSocketState *
+SocketTableAdd(pgsocket sock, bool no_oom)
+{
+#if defined(WIN32) || defined(USE_ASSERT_CHECKING)
+	SocketTableEntry *ste;
+	ExtraSocketState *ess;
+	bool		found;
+
+	/*
+	 * Create a new SocketTableEntry object.  We can't use a pointer into the
+	 * hash table because it would move around, and we want WaitEventSet to be
+	 * able to hold pointers.
+	 */
+	if (!(ess = malloc(sizeof(*ess))))
+		goto out_of_memory;
+	memset(ess, 0, sizeof(*ess));
+
+#ifdef WIN32
+	/* Set up Windows kernel event for this socket. */
+	ess->event_handle = WSACreateEvent();
+	if (ess->event_handle == WSA_INVALID_EVENT)
+		goto out_of_memory;
+#endif
+
+	/* Create socket_table on demand. */
+	/* XXX dedicated memory context for debugging? */
+	if (!socket_table &&
+		(!(socket_table = socktab_create(TopMemoryContext, 128, NULL))))
+		goto out_of_memory;
+
+	/* Try to insert. */
+	ste = socktab_insert(socket_table, sock, &found);
+	if (!ste)
+		goto out_of_memory;
+	if (found)
+	{
+#ifdef WIN32
+		WSACloseEvent(ess->event_handle);
+#endif
+		free(ess);
+		elog(ERROR, "cannot register socket, already registered");
+	}
+
+	/* Success. */
+	ste->extra = ess;
+	return ess;
+
+ out_of_memory:
+	if (ess)
+	{
+#ifdef WIN32
+		if (ess->event_handle != WSA_INVALID_EVENT)
+			WSACloseEvent(ess->event_handle);
+#endif
+		free(ess);
+	}
+	if (!no_oom)
+		elog(ERROR, "out of memory");
+	return NULL;
+#else
+	return NULL;
+#endif
+}
+
+/*
+ * Unregister a socket that was registered.  This must be done before closing
+ * the socket.
+ */
+void
+SocketTableDrop(pgsocket sock)
+{
+#if defined(WIN32) || defined(USE_ASSERT_CHECKING)
+	SocketTableEntry *ste = NULL;
+	ExtraSocketState *ess;
+
+	if (socket_table)
+		ste = socktab_lookup(socket_table, sock);
+	if (!ste)
+		elog(ERROR, "cannot unregister socket that is not registered");
+	ess = ste->extra;
+
+#ifdef WIN32
+	WSACloseEvent(ess->event_handle);
+#endif
+
+	free(ess);
+	socktab_delete_item(socket_table, ste);
+#endif
+}
+
+ExtraSocketState *
+SocketTableGet(pgsocket sock)
+{
+#if defined(WIN32) || defined(USE_ASSERT_CHECKING)
+	SocketTableEntry *ste = NULL;
+
+	if (socket_table)
+		ste = socktab_lookup(socket_table, sock);
+	if (!ste)
+		elog(ERROR, "socket is not registered");
+	return ste->extra;
+#else
+	return NULL;
+#endif
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 0646f53098..dd95353880 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -382,6 +382,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_release_socket(void);
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -483,8 +484,7 @@ pgstat_init(void)
 			ereport(LOG,
 					(errcode_for_socket_access(),
 					 errmsg("could not bind socket for statistics collector: %m")));
-			closesocket(pgStatSock);
-			pgStatSock = PGINVALID_SOCKET;
+			pgstat_release_socket();
 			continue;
 		}
 
@@ -494,8 +494,7 @@ pgstat_init(void)
 			ereport(LOG,
 					(errcode_for_socket_access(),
 					 errmsg("could not get address of socket for statistics collector: %m")));
-			closesocket(pgStatSock);
-			pgStatSock = PGINVALID_SOCKET;
+			pgstat_release_socket();
 			continue;
 		}
 
@@ -510,8 +509,7 @@ pgstat_init(void)
 			ereport(LOG,
 					(errcode_for_socket_access(),
 					 errmsg("could not connect socket for statistics collector: %m")));
-			closesocket(pgStatSock);
-			pgStatSock = PGINVALID_SOCKET;
+			pgstat_release_socket();
 			continue;
 		}
 
@@ -531,8 +529,7 @@ retry1:
 			ereport(LOG,
 					(errcode_for_socket_access(),
 					 errmsg("could not send test message on socket for statistics collector: %m")));
-			closesocket(pgStatSock);
-			pgStatSock = PGINVALID_SOCKET;
+			pgstat_release_socket();
 			continue;
 		}
 
@@ -557,8 +554,7 @@ retry1:
 			ereport(LOG,
 					(errcode_for_socket_access(),
 					 errmsg("select() failed in statistics collector: %m")));
-			closesocket(pgStatSock);
-			pgStatSock = PGINVALID_SOCKET;
+			pgstat_release_socket();
 			continue;
 		}
 		if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
@@ -572,8 +568,7 @@ retry1:
 			ereport(LOG,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
 					 errmsg("test message did not get through on socket for statistics collector")));
-			closesocket(pgStatSock);
-			pgStatSock = PGINVALID_SOCKET;
+			pgstat_release_socket();
 			continue;
 		}
 
@@ -587,8 +582,7 @@ retry2:
 			ereport(LOG,
 					(errcode_for_socket_access(),
 					 errmsg("could not receive test message on socket for statistics collector: %m")));
-			closesocket(pgStatSock);
-			pgStatSock = PGINVALID_SOCKET;
+			pgstat_release_socket();
 			continue;
 		}
 
@@ -597,8 +591,7 @@ retry2:
 			ereport(LOG,
 					(errcode(ERRCODE_INTERNAL_ERROR),
 					 errmsg("incorrect test message transmission on socket for statistics collector")));
-			closesocket(pgStatSock);
-			pgStatSock = PGINVALID_SOCKET;
+			pgstat_release_socket();
 			continue;
 		}
 
@@ -669,7 +662,7 @@ startup_failed:
 		pg_freeaddrinfo_all(hints.ai_family, addrs);
 
 	if (pgStatSock != PGINVALID_SOCKET)
-		closesocket(pgStatSock);
+		pgstat_release_socket();
 	pgStatSock = PGINVALID_SOCKET;
 
 	/*
@@ -3527,6 +3520,9 @@ PgstatCollectorMain(int argc, char *argv[])
 	pgStatRunningInCollector = true;
 	pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
 
+	/* Make sure pgStatSock can be used in a WaitEventSet on Windows. */
+	SocketTableAdd(pgStatSock, false);
+
 	/* Prepare to wait for our latch or data in our socket. */
 	wes = CreateWaitEventSet(CurrentMemoryContext, 3);
 	AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
@@ -6418,3 +6414,11 @@ pgstat_count_slru_truncate(int slru_idx)
 {
 	slru_entry(slru_idx)->m_truncate += 1;
 }
+
+static void
+pgstat_release_socket(void)
+{
+	SocketTableDrop(pgStatSock);
+	closesocket(pgStatSock);
+	pgStatSock = PGINVALID_SOCKET;
+}
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index 25e2131e31..aa551b2719 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -301,6 +301,9 @@ SysLoggerMain(int argc, char *argv[])
 	 */
 	whereToSendOutput = DestNone;
 
+	/* Make sure the pipe can be used in a WaitEventSet on Windows. */
+	SocketTableAdd((pgsocket) syslogPipe[0], false);
+
 	/*
 	 * Set up a reusable WaitEventSet object we'll use to wait for our latch,
 	 * and (except on Windows) our socket.
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0d89db4e6a..8508f93aa6 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -23,6 +23,7 @@
 #include "catalog/pg_type.h"
 #include "common/connect.h"
 #include "funcapi.h"
+#include "libpq-events.h"
 #include "libpq-fe.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
@@ -83,6 +84,10 @@ static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const Oid *retTypes);
 static void libpqrcv_disconnect(WalReceiverConn *conn);
 
+static int libpqrcv_eventproc(PGEventId evtId,
+							  void *evtInfo,
+							  void *passThrough);
+
 static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	libpqrcv_connect,
 	libpqrcv_check_conninfo,
@@ -182,6 +187,15 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		return NULL;
 	}
 
+	if (PQregisterEventProc(conn->streamConn, libpqrcv_eventproc,
+							"libwalrcv", NULL) == 0)
+	{
+		PQfinish(conn->streamConn);
+		conn->streamConn = NULL;
+		*err = pstrdup("could not register for libpq events");
+		return NULL;
+	}
+
 	/*
 	 * Poll connection until we have OK or FAILED status.
 	 *
@@ -1165,3 +1179,22 @@ stringlist_to_identifierstr(PGconn *conn, List *strings)
 
 	return res.data;
 }
+
+/*
+ * A callback for libpq to allow its sockets to be used in WaitEventSet on Windows.
+ */
+static int
+libpqrcv_eventproc(PGEventId evtId, void *evtInfo, void *passThrough)
+{
+	if (evtId == PGEVT_SOCKET)
+	{
+		PGEventSocket *evt = (PGEventSocket *) evtInfo;
+		return SocketTableAdd(evt->socket, true) ? 1 : 0;
+	}
+	else if (evtId == PGEVT_SOCKETCLOSE)
+	{
+		PGEventSocket *evt = (PGEventSocket *) evtInfo;
+		SocketTableDrop(evt->socket);
+	}
+	return 1;
+}
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 61c876beff..5a6fb36fa2 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -60,6 +60,7 @@
 #include "storage/shmem.h"
 #include "utils/memutils.h"
 
+
 /*
  * Select the fd readiness primitive to use. Normally the "most modern"
  * primitive supported by the OS will be used, but for testing it can be
@@ -803,28 +804,6 @@ FreeWaitEventSet(WaitEventSet *set)
 #elif defined(WAIT_USE_KQUEUE)
 	close(set->kqueue_fd);
 	ReleaseExternalFD();
-#elif defined(WAIT_USE_WIN32)
-	WaitEvent  *cur_event;
-
-	for (cur_event = set->events;
-		 cur_event < (set->events + set->nevents);
-		 cur_event++)
-	{
-		if (cur_event->events & WL_LATCH_SET)
-		{
-			/* uses the latch's HANDLE */
-		}
-		else if (cur_event->events & WL_POSTMASTER_DEATH)
-		{
-			/* uses PostmasterHandle */
-		}
-		else
-		{
-			/* Clean up the event object we created for the socket */
-			WSAEventSelect(cur_event->fd, NULL, 0);
-			WSACloseEvent(set->handles[cur_event->pos + 1]);
-		}
-	}
 #endif
 
 	pfree(set);
@@ -897,9 +876,17 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
 	event->fd = fd;
 	event->events = events;
 	event->user_data = user_data;
+
+	if (events & WL_SOCKET_MASK)
+	{
 #ifdef WIN32
-	event->reset = false;
+		/* Point to our Windows event tracking state for the socket. */
+		event->extra_socket_state = SocketTableGet(fd);
+#else
+		/* On Unix, check it's registered to avoid portability bugs. */
+		(void) SocketTableGet(fd);
 #endif
+	}
 
 	if (events == WL_LATCH_SET)
 	{
@@ -1265,6 +1252,7 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 	}
 	else
 	{
+		ExtraSocketState *ess;
 		int			flags = FD_CLOSE;	/* always check for errors/EOF */
 
 		if (event->events & WL_SOCKET_READABLE)
@@ -1274,18 +1262,19 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
 		if (event->events & WL_SOCKET_CONNECTED)
 			flags |= FD_CONNECT;
 
-		if (*handle == WSA_INVALID_EVENT)
-		{
-			*handle = WSACreateEvent();
-			if (*handle == WSA_INVALID_EVENT)
-				elog(ERROR, "failed to create event for socket: error code %d",
-					 WSAGetLastError());
-		}
-		if (WSAEventSelect(event->fd, *handle, flags) != 0)
+		/* Find the event associated with this socket. */
+		ess = event->extra_socket_state;
+		*handle = ess->event_handle;
+
+		/*
+		 * Adjust the flags for this event, if they don't already match what we
+		 * want to wait for.
+		 */
+		if (ess->flags != flags &&
+			WSAEventSelect(event->fd, *handle, flags) != 0)
 			elog(ERROR, "failed to set up event for socket: error code %d",
 				 WSAGetLastError());
-
-		Assert(event->fd != PGINVALID_SOCKET);
+		ess->flags = flags;
 	}
 }
 #endif
@@ -1845,10 +1834,19 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 		 cur_event < (set->events + set->nevents);
 		 cur_event++)
 	{
-		if (cur_event->reset)
-		{
+		/* Reset the event mask if necessary. */
+		if (cur_event->events & WL_SOCKET_MASK)
 			WaitEventAdjustWin32(set, cur_event);
-			cur_event->reset = false;
+
+		/* If we've already seen FD_CLOSE, keep reporting readiness. */
+		if ((cur_event->events & WL_SOCKET_MASK) &&
+			cur_event->extra_socket_state->seen_fd_close)
+		{
+			occurred_events->pos = cur_event->pos;
+			occurred_events->user_data = cur_event->user_data;
+			occurred_events->events = cur_event->events;
+			occurred_events->fd = cur_event->fd;
+			return 1;
 		}
 
 		/*
@@ -1984,7 +1982,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 			 * for the behavior of socket events.
 			 *------
 			 */
-			cur_event->reset = true;
+			cur_event->extra_socket_state->flags = 0;
 		}
 		if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
 			(resEvents.lNetworkEvents & FD_WRITE))
@@ -2002,6 +2000,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 		{
 			/* EOF/error, so signal all caller-requested socket flags */
 			occurred_events->events |= (cur_event->events & WL_SOCKET_MASK);
+			cur_event->extra_socket_state->seen_fd_close = true;
 		}
 
 		if (occurred_events->events != 0)
diff --git a/src/include/port.h b/src/include/port.h
index 3d103a2b31..e780852bd1 100644
--- a/src/include/port.h
+++ b/src/include/port.h
@@ -543,4 +543,22 @@ extern char *wait_result_to_str(int exit_status);
 extern bool wait_result_is_signal(int exit_status, int signum);
 extern bool wait_result_is_any_signal(int exit_status, bool include_command_not_found);
 
+/* backend/port/socket_table.c */
+#if !defined(FRONTEND)
+struct ExtraSocketState
+{
+#ifdef WIN32
+	HANDLE		event_handle;		/* one event for the life of the socket */
+	int			flags;				/* most recent WSAEventSelect() flags */
+	bool		seen_fd_close;		/* has FD_CLOSE been received? */
+#else
+	int			dummy;				/* none of this is needed for Unix */
+#endif
+};
+typedef struct ExtraSocketState ExtraSocketState;
+extern ExtraSocketState *SocketTableAdd(pgsocket sock, bool no_oom);
+extern ExtraSocketState *SocketTableGet(pgsocket sock);
+extern void SocketTableDrop(pgsocket sock);
+#endif
+
 #endif							/* PG_PORT_H */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 3aa7b33834..576d339829 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -146,7 +146,7 @@ typedef struct WaitEvent
 	pgsocket	fd;				/* socket fd associated with event */
 	void	   *user_data;		/* pointer provided in AddWaitEventToSet */
 #ifdef WIN32
-	bool		reset;			/* Is reset of the event required? */
+	ExtraSocketState *extra_socket_state;
 #endif
 } WaitEvent;
 
-- 
2.33.1

Reply via email to