On Thu Nov 23, 2023 at 3:19 AM CST, Heikki Linnakangas wrote:
On 22/11/2023 23:29, Tristan Partin wrote:
> Ha, you're right. I had this working yesterday, but convinced myself it
> didn't. I had a do while loop wrapping the blocking call. Here is a v4,
> which seems to pass the tests that were pointed out to be failing
> earlier.

Thanks! This suffers from a classic race condition:

> +                  if (cancel_pressed)
> +                          break;
> +
> +                  sock = PQsocket(n_conn);
> +                  if (sock == -1)
> +                          break;
> +
> +                  rc = pqSocketPoll(sock, for_read, !for_read, (time_t)-1);
> +                  Assert(rc != 0); /* Timeout is impossible. */
> +                  if (rc == -1)
> +                  {
> +                          success = false;
> +                          break;
> +                  }

If a signal arrives between the "if (cancel_pressed)" check and pqSocketPoll(), pgSocketPoll will "miss" the signal and block indefinitely. There are three solutions to this:

1. Use a timeout, so that you wake up periodically to check for any missed signals. Easy solution, the downsides are that you will not react as quickly if the signal is missed, and you waste some cycles by waking up unnecessarily.

2. The Self-pipe trick: https://cr.yp.to/docs/selfpipe.html. We also use that in src/backend/storage/ipc/latch.c. It's portable, but somewhat complicated.

3. Use pselect() or ppoll(). They were created specifically to address this problem. Not fully portable, I think it's missing on Windows at least.

Maybe use pselect() if it's available, and select() with a timeout if it's not.

First I have ever heard of this race condition, and now I will commit it to durable storage :). I went ahead and did option #3 that you proposed. On Windows, I have a 1 second timeout for the select/poll. That seemed like a good balance of user experience and spinning the CPU. But I am open to other values. I don't have a Windows VM, but maybe I should set one up...

I am not completely in love with the code I have written. Lots of conditional compilation which makes it hard to read. Looking forward to another round of review to see what y'all think.

For what it's worth, I did try #2, but since psql installs its own SIGINT handler, the code became really hairy.

--
Tristan Partin
Neon (https://neon.tech)
From 4fdccf9211ac78bbd7430488d515646f9e9cce9b Mon Sep 17 00:00:00 2001
From: Tristan Partin <tris...@neon.tech>
Date: Mon, 24 Jul 2023 11:12:59 -0500
Subject: [PATCH v5] Allow SIGINT to cancel psql database reconnections

After installing the SIGINT handler in psql, SIGINT can no longer cancel
database reconnections. For instance, if the user starts a reconnection
and then needs to do some form of interaction (ie psql is polling),
there is no way to cancel the reconnection process currently.

Use PQconnectStartParams() in order to insert a CancelRequested check
into the polling loop.
---
 meson.build                |   1 +
 src/bin/psql/command.c     | 222 ++++++++++++++++++++++++++++++++++++-
 src/include/pg_config.h.in |   3 +
 src/tools/msvc/Solution.pm |   1 +
 4 files changed, 226 insertions(+), 1 deletion(-)

diff --git a/meson.build b/meson.build
index ee58ee7a06..2d63485c53 100644
--- a/meson.build
+++ b/meson.build
@@ -2440,6 +2440,7 @@ func_checks = [
   ['posix_fadvise'],
   ['posix_fallocate'],
   ['ppoll'],
+  ['pselect'],
   ['pstat'],
   ['pthread_barrier_wait', {'dependencies': [thread_dep]}],
   ['pthread_is_threaded_np', {'dependencies': [thread_dep]}],
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index 82cc091568..c325fedb51 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -11,6 +11,11 @@
 #include <time.h>
 #include <pwd.h>
 #include <utime.h>
+#if HAVE_POLL
+#include <poll.h>
+#else
+#include <sys/select.h>
+#endif
 #ifndef WIN32
 #include <sys/stat.h>			/* for stat() */
 #include <sys/time.h>			/* for setitimer() */
@@ -40,6 +45,7 @@
 #include "large_obj.h"
 #include "libpq-fe.h"
 #include "libpq/pqcomm.h"
+#include "libpq/pqsignal.h"
 #include "mainloop.h"
 #include "portability/instr_time.h"
 #include "pqexpbuffer.h"
@@ -3251,6 +3257,169 @@ copy_previous_query(PQExpBuffer query_buf, PQExpBuffer previous_buf)
 	return false;
 }
 
+/*
+ * Check a file descriptor for read and/or write data, possibly waiting.
+ * If neither forRead nor forWrite are set, immediately return a timeout
+ * condition (without waiting).  Return >0 if condition is met, 0
+ * if a timeout occurred, -1 if an error or interrupt occurred.
+ *
+ * Timeout is infinite if end_time is -1.  Timeout is immediate (no blocking)
+ * if end_time is 0 (or indeed, any time before now).
+ */
+static int
+pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time)
+{
+	/*
+	 * We use functions in the following order if available:
+	 *   - ppoll(2) OR pselect(2)
+	 *   - poll(2) OR select(2)
+	 */
+#ifdef HAVE_POLL
+	struct pollfd input_fd;
+#ifdef HAVE_PPOLL
+	int				rc;
+	sigset_t		emptyset;
+	sigset_t		blockset;
+	sigset_t		origset;
+	struct timespec timeout;
+	struct timespec *ptr_timeout;
+#else
+	int			timeout_ms;
+#endif
+
+	if (!forRead && !forWrite)
+		return 0;
+
+	input_fd.fd = sock;
+	input_fd.events = POLLERR;
+	input_fd.revents = 0;
+
+	if (forRead)
+		input_fd.events |= POLLIN;
+	if (forWrite)
+		input_fd.events |= POLLOUT;
+
+	/* Compute appropriate timeout interval */
+#ifdef HAVE_PPOLL
+	sigemptyset(&blockset);
+	sigaddset(&blockset, SIGINT);
+	sigprocmask(SIG_BLOCK, &blockset, &origset);
+
+	if (end_time == ((time_t) -1))
+		ptr_timeout = NULL;
+	else
+	{
+		timeout.tv_sec = end_time;
+		timeout.tv_nsec = 0;
+		ptr_timeout = &timeout;
+	}
+#else
+	if (end_time == ((time_t) -1))
+		timeout_ms = -1;
+	else
+	{
+		time_t		now = time(NULL);
+
+		if (end_time > now)
+			timeout_ms = (end_time - now) * 1000;
+		else
+			timeout_ms = 0;
+	}
+#endif
+
+#ifdef HAVE_PPOLL
+	sigemptyset(&emptyset);
+	if (cancel_pressed)
+	{
+		sigprocmask(SIG_SETMASK, &origset, NULL);
+		return 1;
+	}
+
+	rc = ppoll(&input_fd, 1, ptr_timeout, &emptyset);
+	sigprocmask(SIG_SETMASK, &origset, NULL);
+	return rc;
+#else
+	return poll(&input_fd, 1, timeout_ms);
+#endif
+#else							/* !HAVE_POLL */
+
+	fd_set		input_mask;
+	fd_set		output_mask;
+	fd_set		except_mask;
+#ifdef HAVE_PSELECT
+	int 			rc;
+	sigset_t		emptyset;
+	sigset_t		blockset;
+	sigset_t		origset;
+	struct timespec timeout;
+	struct timespec *ptr_timeout;
+#else
+	struct timeval timeout;
+	struct timeval *ptr_timeout;
+#endif
+
+	if (!forRead && !forWrite)
+		return 0;
+
+	FD_ZERO(&input_mask);
+	FD_ZERO(&output_mask);
+	FD_ZERO(&except_mask);
+	if (forRead)
+		FD_SET(sock, &input_mask);
+
+	if (forWrite)
+		FD_SET(sock, &output_mask);
+	FD_SET(sock, &except_mask);
+
+	/* Compute appropriate timeout interval */
+#ifdef HAVE_PSELECT
+	sigemptyset(&blockset);
+	sigaddset(&blockset, SIGINT);
+	sigprocmask(SIG_BLOCK, &blockset, &origset);
+
+	if (end_time == ((time_t) -1))
+		ptr_timeout = NULL;
+	else
+	{
+		timeout.tv_sec = end_time;
+		timeout.tv_nsec = 0;
+		ptr_timeout = &timeout;
+	}
+#else
+	if (end_time == ((time_t) -1))
+		ptr_timeout = NULL;
+	else
+	{
+		time_t		now = time(NULL);
+
+		if (end_time > now)
+			timeout.tv_sec = end_time - now;
+		else
+			timeout.tv_sec = 0;
+		timeout.tv_usec = 0;
+		ptr_timeout = &timeout;
+	}
+#endif
+
+#ifdef HAVE_PSELECT
+	sigemptyset(&emptyset);
+	if (cancel_pressed)
+	{
+		sigprocmask(SIG_SETMASK, &origset, NULL);
+		return 1;
+	}
+
+	rc = pselect(sock + 1, &input_mask, &output_mask,
+				 &except_mask, ptr_timeout, &emptyset);
+	sigprocmask(SIG_SETMASK, &origset, NULL);
+	return rc;
+#else
+	return select(sock + 1, &input_mask, &output_mask,
+				  &except_mask, ptr_timeout);
+#endif
+#endif							/* HAVE_POLL */
+}
+
 /*
  * Ask the user for a password; 'username' is the username the
  * password is for, if one has been explicitly specified.
@@ -3324,6 +3493,7 @@ do_connect(enum trivalue reuse_previous_specification,
 	bool		keep_password = true;
 	bool		has_connection_string;
 	bool		reuse_previous;
+	bool		for_read;
 
 	has_connection_string = dbname ?
 		recognized_connection_string(dbname) : false;
@@ -3614,11 +3784,61 @@ do_connect(enum trivalue reuse_previous_specification,
 		values[paramnum] = NULL;
 
 		/* Note we do not want libpq to re-expand the dbname parameter */
-		n_conn = PQconnectdbParams(keywords, values, false);
+		n_conn = PQconnectStartParams(keywords, values, false);
 
 		pg_free(keywords);
 		pg_free(values);
 
+		for_read = false;
+		while (true) {
+			int		rc;
+			int		sock;
+			time_t	timeout;
+
+			if (cancel_pressed)
+				break;
+
+			sock = PQsocket(n_conn);
+			if (sock == -1)
+				break;
+
+			/*
+			 * We use ppoll(2)/pselect(2) to account for the race condition in
+			 * which SIGINT is sent after checking cancel_pressed. But on
+			 * platforms that don't have either function, we can just spin the
+			 * CPU a bit polling, so set the timeout to 1 second if we don't
+			 * have the aforementioned functions, otherwise set timeout to
+			 * a negative value indicating we will sit and wait forever.
+			 */
+#if defined(HAVE_PPOLL) || defined(HAVE_PSELECT)
+			timeout = -1;
+#else
+			timeout = 1;
+#endif
+
+			rc = pqSocketPoll(sock, for_read, !for_read, timeout);
+			if (rc == -1)
+			{
+				success = false;
+				break;
+			}
+
+			switch (PQconnectPoll(n_conn)) {
+			case PGRES_POLLING_OK:
+			case PGRES_POLLING_FAILED:
+				goto finish;
+			case PGRES_POLLING_READING:
+				for_read = true;
+				continue;
+			case PGRES_POLLING_WRITING:
+				for_read = false;
+				continue;
+			case PGRES_POLLING_ACTIVE:
+				pg_unreachable();
+			}
+		}
+
+	finish:
 		if (PQstatus(n_conn) == CONNECTION_OK)
 			break;
 
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index d8a2985567..f9fd7d0de7 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -333,6 +333,9 @@
 /* Define to 1 if you have the `ppoll' function. */
 #undef HAVE_PPOLL
 
+/* Define to 1 if you have the `pselect' function. */
+#undef HAVE_PSELECT
+
 /* Define if you have POSIX threads libraries and header files. */
 #undef HAVE_PTHREAD
 
diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm
index 98a5b5d872..d035f44f73 100644
--- a/src/tools/msvc/Solution.pm
+++ b/src/tools/msvc/Solution.pm
@@ -308,6 +308,7 @@ sub GenerateFiles
 		HAVE_POSIX_FADVISE => undef,
 		HAVE_POSIX_FALLOCATE => undef,
 		HAVE_PPOLL => undef,
+		HAVE_PSELECT => undef,
 		HAVE_PTHREAD => undef,
 		HAVE_PTHREAD_BARRIER_WAIT => undef,
 		HAVE_PTHREAD_IS_THREADED_NP => undef,
-- 
Tristan Partin
Neon (https://neon.tech)

Reply via email to