On Tue, Oct 12, 2021 at 3:10 AM Maksim Milyutin <milyuti...@gmail.com> wrote:
> Good work. I have tested your patch on Linux and FreeBSD on three basic
> cases: client killing, cable breakdown (via manipulations with firewall)
> and silent closing client connection before completion of previously
> started query in asynchronous manner. And all works fine.

Thanks for the testing and review!

> +    WaitEvent events[3];
>
> 3 is looks like as magic constant. We might to specify a constant for
> all event groups in FeBeWaitSet.

Yeah.  In fact, we really just need one event.  Fixed.

> +    ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, NULL);
> +    ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos,
> WL_SOCKET_CLOSED, NULL);
> +    rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
> +    ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
> MyLatch);
>
> AFAICS, side effect to
> (FeBeWaitSet->events[FeBeWaitSetSocketPos]).events by setting
> WL_SOCKET_CLOSED value under calling of pq_check_connection() function
> doesn't have negative impact later, does it? That is, all
> WaitEventSetWait() calls have to setup socket events on its own from
> scratch.

Correct: every site that waits for FeBeWaitSet explicitly modifies the
socket event to say what it's waiting for (and that is often a no-op
internally), so we don't have to worry about restoring the previous
state.  I've added a comment about that.  We should work harder to
restore the latch than my previous patch did, though.  Now I'm using a
PG_FINALLY() block.

I'm hoping to push this soon, after another round of testing, if
there's no more feedback.

There is one more OS that could be added, but I'll leave it out of the
initial commit, pending further investigation.  Since I recently had
to set up a Windows dev VM up to test some other portability stuff, I
had a chance to test the FD_CLOSE hypothesis.  You just have to do
this to enable it:

@@ -2069,6 +2069,7 @@ WaitEventSetCanReportClosed(void)
 {
 #if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
        defined(WAIT_USE_EPOLL) || \
+       defined(WAIT_USE_WIN32) || \
        defined(WAIT_USE_KQUEUE)
        return true;
 #else

It seems to work!  I'm not sure why it works, or whether we can count
on it, though.  These sentences from the documentation[1] seem to
contract each other:

"FD_CLOSE being posted after all data is read from a socket. An
application should check for remaining data upon receipt of FD_CLOSE
to avoid any possibility of losing data."

My test says that the first sentence is wrong, but the second doesn't
exactly say that it has reliable POLLRDHUP nature, and I haven't found
one that does, yet.  Perhaps we can convince ourselves of that in
follow-up work.

For the record, I tested two scenarios.  The client was a Unix system,
the server a Windows 10 VM.

1.  Connecting with psql and running "SELECT pg_sleep(60)" and then
killing the psql process.  I'm not surprised that this one worked; it
would work if we tested for WL_SOCKET_READABLE too, but we already
decided that's not good enough.
2.  Connecting from a C program that does PQsendQuery(conn, "SELECT
pg_sleep(60)") and then immediately PQfinish(conn), to test whether
the FD_CLOSE event is reported even though there is an unconsumed 'X'
in the socket.  I wouldn't want to ship the feature on an OS where
this case doesn't get reported, or doesn't get reported sometimes,
because it'd be unreliable and unlike the behaviour on other OSes.
But it worked for me.

[1] 
https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsaeventselect
From 842d96842487881b05ed3d806caa14e84c8be4f9 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 30 Apr 2021 10:38:40 +1200
Subject: [PATCH v3 1/2] Add WL_SOCKET_CLOSED for socket shutdown events.

Provide a way for WaitEventSet to report that the remote peer has shut
down its socket, independently of whether there is any buffered data
remaining to be read.  This works only on systems where the kernel
exposes that information, namely:

* WAIT_USE_POLL builds, if the POLLRDHUP extension is available
* WAIT_USE_EPOLL builds, using EPOLLRDHUP
* WAIT_USE_KQUEUE builds, using EV_EOF

Reviewed-by: Zhihong Yu <z...@yugabyte.com>
Reviewed-by: Maksim Milyutin <milyuti...@gmail.com>
Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 src/backend/storage/ipc/latch.c | 79 +++++++++++++++++++++++++++++----
 src/include/storage/latch.h     |  6 ++-
 2 files changed, 74 insertions(+), 11 deletions(-)

diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 1d893cf863..54e928c564 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -841,6 +841,7 @@ FreeWaitEventSet(WaitEventSet *set)
  * - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
  *	 can be combined with other WL_SOCKET_* events (on non-Windows
  *	 platforms, this is the same as WL_SOCKET_WRITEABLE)
+ * - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
  * - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
  *
  * Returns the offset in WaitEventSet->events (starting from 0), which can be
@@ -1043,12 +1044,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
 	else
 	{
 		Assert(event->fd != PGINVALID_SOCKET);
-		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		Assert(event->events & (WL_SOCKET_READABLE |
+								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_CLOSED));
 
 		if (event->events & WL_SOCKET_READABLE)
 			epoll_ev.events |= EPOLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			epoll_ev.events |= EPOLLOUT;
+		if (event->events & WL_SOCKET_CLOSED)
+			epoll_ev.events |= EPOLLRDHUP;
 	}
 
 	/*
@@ -1087,12 +1092,18 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 	}
 	else
 	{
-		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		Assert(event->events & (WL_SOCKET_READABLE |
+								WL_SOCKET_WRITEABLE |
+								WL_SOCKET_CLOSED));
 		pollfd->events = 0;
 		if (event->events & WL_SOCKET_READABLE)
 			pollfd->events |= POLLIN;
 		if (event->events & WL_SOCKET_WRITEABLE)
 			pollfd->events |= POLLOUT;
+#ifdef POLLRDHUP
+		if (event->events & WL_SOCKET_CLOSED)
+			pollfd->events |= POLLRDHUP;
+#endif
 	}
 
 	Assert(event->fd != PGINVALID_SOCKET);
@@ -1165,7 +1176,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 	Assert(event->events != WL_LATCH_SET || set->latch != NULL);
 	Assert(event->events == WL_LATCH_SET ||
 		   event->events == WL_POSTMASTER_DEATH ||
-		   (event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)));
+		   (event->events & (WL_SOCKET_READABLE |
+							 WL_SOCKET_WRITEABLE |
+							 WL_SOCKET_CLOSED)));
 
 	if (event->events == WL_POSTMASTER_DEATH)
 	{
@@ -1188,9 +1201,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 		 * old event mask to the new event mask, since kevent treats readable
 		 * and writable as separate events.
 		 */
-		if (old_events & WL_SOCKET_READABLE)
+		if (old_events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
 			old_filt_read = true;
-		if (event->events & WL_SOCKET_READABLE)
+		if (event->events & (WL_SOCKET_READABLE | WL_SOCKET_CLOSED))
 			new_filt_read = true;
 		if (old_events & WL_SOCKET_WRITEABLE)
 			old_filt_write = true;
@@ -1210,7 +1223,10 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
 									 event);
 	}
 
-	Assert(count > 0);
+	/* For WL_SOCKET_READ -> WL_SOCKET_CLOSED, no change needed. */
+	if (count == 0)
+		return;
+
 	Assert(count <= 2);
 
 	rc = kevent(set->kqueue_fd, &k_ev[0], count, NULL, 0, NULL);
@@ -1525,7 +1541,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				returned_events++;
 			}
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			Assert(cur_event->fd != PGINVALID_SOCKET);
 
@@ -1543,6 +1561,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_WRITEABLE;
 			}
 
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_epoll_event->events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)))
+			{
+				/* remote peer shut down, or error */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+
 			if (occurred_events->events != 0)
 			{
 				occurred_events->fd = cur_event->fd;
@@ -1668,7 +1693,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 			occurred_events++;
 			returned_events++;
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			Assert(cur_event->fd >= 0);
 
@@ -1679,6 +1706,14 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_READABLE;
 			}
 
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_kqueue_event->filter == EVFILT_READ) &&
+				(cur_kqueue_event->flags & EV_EOF))
+			{
+				/* the remote peer has shut down */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+
 			if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
 				(cur_kqueue_event->filter == EVFILT_WRITE))
 			{
@@ -1789,7 +1824,9 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				returned_events++;
 			}
 		}
-		else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+		else if (cur_event->events & (WL_SOCKET_READABLE |
+									  WL_SOCKET_WRITEABLE |
+									  WL_SOCKET_CLOSED))
 		{
 			int			errflags = POLLHUP | POLLERR | POLLNVAL;
 
@@ -1809,6 +1846,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 				occurred_events->events |= WL_SOCKET_WRITEABLE;
 			}
 
+#ifdef POLLRDHUP
+			if ((cur_event->events & WL_SOCKET_CLOSED) &&
+				(cur_pollfd->revents & (POLLRDHUP | errflags)))
+			{
+				/* remote peer closed, or error */
+				occurred_events->events |= WL_SOCKET_CLOSED;
+			}
+#endif
+
 			if (occurred_events->events != 0)
 			{
 				occurred_events->fd = cur_event->fd;
@@ -2015,6 +2061,21 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 }
 #endif
 
+/*
+ * Return whether the current build options can report WL_SOCKET_CLOSED.
+ */
+bool
+WaitEventSetCanReportClosed(void)
+{
+#if (defined(WAIT_USE_POLL) && defined(POLLRDHUP)) || \
+	defined(WAIT_USE_EPOLL) || \
+	defined(WAIT_USE_KQUEUE)
+	return true;
+#else
+	return false;
+#endif
+}
+
 /*
  * Get the number of wait events registered in a given WaitEventSet.
  */
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index 44f9368c64..d78ff0bede 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -134,10 +134,11 @@ typedef struct Latch
 /* avoid having to deal with case on platforms not requiring it */
 #define WL_SOCKET_CONNECTED  WL_SOCKET_WRITEABLE
 #endif
-
+#define WL_SOCKET_CLOSED 	 (1 << 7)
 #define WL_SOCKET_MASK		(WL_SOCKET_READABLE | \
 							 WL_SOCKET_WRITEABLE | \
-							 WL_SOCKET_CONNECTED)
+							 WL_SOCKET_CONNECTED | \
+							 WL_SOCKET_CLOSED)
 
 typedef struct WaitEvent
 {
@@ -180,5 +181,6 @@ extern int	WaitLatchOrSocket(Latch *latch, int wakeEvents,
 							  pgsocket sock, long timeout, uint32 wait_event_info);
 extern void InitializeLatchWaitSet(void);
 extern int	GetNumRegisteredWaitEvents(WaitEventSet *set);
+extern bool	WaitEventSetCanReportClosed(void);
 
 #endif							/* LATCH_H */
-- 
2.33.1

From 2d191c0d3bbc240e125633d51f9f1e003630b866 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Fri, 30 Apr 2021 10:48:32 +1200
Subject: [PATCH v3 2/2] Use WL_SOCKET_CLOSED for
 client_connection_check_interval.

Previously we used poll() directly to check for a POLLRDHUP event.
Instead, use WaitEventSet to poll the socket for WL_SOCKET_CLOSED, which
knows how to detect equivalent events on many more operating systems.

Reviewed-by: Zhihong Yu <z...@yugabyte.com>
Reviewed-by: Maksim Milyutin <milyuti...@gmail.com>
Discussion: https://postgr.es/m/77def86b27e41f0efcba411460e929ae%40postgrespro.ru
---
 doc/src/sgml/config.sgml     |  6 ++---
 src/backend/libpq/pqcomm.c   | 44 ++++++++++++++++++++++++------------
 src/backend/utils/misc/guc.c |  7 ++----
 3 files changed, 34 insertions(+), 23 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 4ac617615c..ab52d4bd2c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1012,9 +1012,9 @@ include_dir 'conf.d'
         the kernel reports that the connection is closed.
        </para>
        <para>
-        This option is currently available only on systems that support the
-        non-standard <symbol>POLLRDHUP</symbol> extension to the
-        <symbol>poll</symbol> system call, including Linux.
+        This option relies on kernel events exposed by Linux, macOS, illumos
+        and the BSD family of operating systems, and is not currently available
+        on other systems.
        </para>
        <para>
         If the value is specified without units, it is taken as milliseconds.
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 4c37df09cf..c235a682c2 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -1959,22 +1959,36 @@ pq_settcpusertimeout(int timeout, Port *port)
 bool
 pq_check_connection(void)
 {
-#if defined(POLLRDHUP)
-	/*
-	 * POLLRDHUP is a Linux extension to poll(2) to detect sockets closed by
-	 * the other end.  We don't have a portable way to do that without
-	 * actually trying to read or write data on other systems.  We don't want
-	 * to read because that would be confused by pipelined queries and COPY
-	 * data. Perhaps in future we'll try to write a heartbeat message instead.
-	 */
-	struct pollfd pollfd;
+	WaitEvent	event;
 	int			rc;
 
-	pollfd.fd = MyProcPort->sock;
-	pollfd.events = POLLOUT | POLLIN | POLLRDHUP;
-	pollfd.revents = 0;
+	/*
+	 * Temporarily ignore the latch, so that we can poll for just the one
+	 * event we care about.
+	 */
+	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET, NULL);
 
-	rc = poll(&pollfd, 1, 0);
+	PG_TRY();
+	{
+		/*
+		 * It's OK to clobber the socket event to report only the event we're
+		 * interested in without restoring the previous state afterwards,
+		 * because every FeBeWaitSet wait site does the same.
+		 */
+		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED,
+						NULL);
+		rc = WaitEventSetWait(FeBeWaitSet, 0, &event, 1, 0);
+	}
+	PG_FINALLY();
+	{
+		/*
+		 * Restore the latch, so we can't leave FeBeWaitSet in a broken state
+		 * that ignores latches.
+		 */
+		ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
+						MyLatch);
+	}
+	PG_END_TRY();
 
 	if (rc < 0)
 	{
@@ -1983,9 +1997,9 @@ pq_check_connection(void)
 				 errmsg("could not poll socket: %m")));
 		return false;
 	}
-	else if (rc == 1 && (pollfd.revents & (POLLHUP | POLLRDHUP)))
+
+	if (rc == 1 && (event.events & WL_SOCKET_CLOSED))
 		return false;
-#endif
 
 	return true;
 }
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index ee6a838b3a..ca45b36754 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -12169,14 +12169,11 @@ check_huge_page_size(int *newval, void **extra, GucSource source)
 static bool
 check_client_connection_check_interval(int *newval, void **extra, GucSource source)
 {
-#ifndef POLLRDHUP
-	/* Linux only, for now.  See pq_check_connection(). */
-	if (*newval != 0)
+	if (!WaitEventSetCanReportClosed() && *newval != 0)
 	{
-		GUC_check_errdetail("client_connection_check_interval must be set to 0 on platforms that lack POLLRDHUP.");
+		GUC_check_errdetail("client_connection_check_interval must be set to 0 on this platform");
 		return false;
 	}
-#endif
 	return true;
 }
 
-- 
2.33.1

Reply via email to