On Thu Nov 23, 2023 at 3:19 AM CST, Heikki Linnakangas wrote:
On 22/11/2023 23:29, Tristan Partin wrote:
> Ha, you're right. I had this working yesterday, but convinced myself it
> didn't. I had a do while loop wrapping the blocking call. Here is a v4,
> which seems to pass the tests that were pointed out to be failing
> earlier.
Thanks! This suffers from a classic race condition:
> + if (cancel_pressed)
> + break;
> +
> + sock = PQsocket(n_conn);
> + if (sock == -1)
> + break;
> +
> + rc = pqSocketPoll(sock, for_read, !for_read, (time_t)-1);
> + Assert(rc != 0); /* Timeout is impossible. */
> + if (rc == -1)
> + {
> + success = false;
> + break;
> + }
If a signal arrives between the "if (cancel_pressed)" check and
pqSocketPoll(), pgSocketPoll will "miss" the signal and block
indefinitely. There are three solutions to this:
1. Use a timeout, so that you wake up periodically to check for any
missed signals. Easy solution, the downsides are that you will not react
as quickly if the signal is missed, and you waste some cycles by waking
up unnecessarily.
2. The Self-pipe trick: https://cr.yp.to/docs/selfpipe.html. We also use
that in src/backend/storage/ipc/latch.c. It's portable, but somewhat
complicated.
3. Use pselect() or ppoll(). They were created specifically to address
this problem. Not fully portable, I think it's missing on Windows at least.
Maybe use pselect() if it's available, and select() with a timeout if
it's not.
First I have ever heard of this race condition, and now I will commit it
to durable storage :). I went ahead and did option #3 that you proposed.
On Windows, I have a 1 second timeout for the select/poll. That seemed
like a good balance of user experience and spinning the CPU. But I am
open to other values. I don't have a Windows VM, but maybe I should set
one up...
I am not completely in love with the code I have written. Lots of
conditional compilation which makes it hard to read. Looking forward to
another round of review to see what y'all think.
For what it's worth, I did try #2, but since psql installs its own
SIGINT handler, the code became really hairy.
--
Tristan Partin
Neon (https://neon.tech)
From 4fdccf9211ac78bbd7430488d515646f9e9cce9b Mon Sep 17 00:00:00 2001
From: Tristan Partin <tris...@neon.tech>
Date: Mon, 24 Jul 2023 11:12:59 -0500
Subject: [PATCH v5] Allow SIGINT to cancel psql database reconnections
After installing the SIGINT handler in psql, SIGINT can no longer cancel
database reconnections. For instance, if the user starts a reconnection
and then needs to do some form of interaction (ie psql is polling),
there is no way to cancel the reconnection process currently.
Use PQconnectStartParams() in order to insert a CancelRequested check
into the polling loop.
---
meson.build | 1 +
src/bin/psql/command.c | 222 ++++++++++++++++++++++++++++++++++++-
src/include/pg_config.h.in | 3 +
src/tools/msvc/Solution.pm | 1 +
4 files changed, 226 insertions(+), 1 deletion(-)
diff --git a/meson.build b/meson.build
index ee58ee7a06..2d63485c53 100644
--- a/meson.build
+++ b/meson.build
@@ -2440,6 +2440,7 @@ func_checks = [
['posix_fadvise'],
['posix_fallocate'],
['ppoll'],
+ ['pselect'],
['pstat'],
['pthread_barrier_wait', {'dependencies': [thread_dep]}],
['pthread_is_threaded_np', {'dependencies': [thread_dep]}],
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index 82cc091568..c325fedb51 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -11,6 +11,11 @@
#include <time.h>
#include <pwd.h>
#include <utime.h>
+#if HAVE_POLL
+#include <poll.h>
+#else
+#include <sys/select.h>
+#endif
#ifndef WIN32
#include <sys/stat.h> /* for stat() */
#include <sys/time.h> /* for setitimer() */
@@ -40,6 +45,7 @@
#include "large_obj.h"
#include "libpq-fe.h"
#include "libpq/pqcomm.h"
+#include "libpq/pqsignal.h"
#include "mainloop.h"
#include "portability/instr_time.h"
#include "pqexpbuffer.h"
@@ -3251,6 +3257,169 @@ copy_previous_query(PQExpBuffer query_buf, PQExpBuffer previous_buf)
return false;
}
+/*
+ * Check a file descriptor for read and/or write data, possibly waiting.
+ * If neither forRead nor forWrite are set, immediately return a timeout
+ * condition (without waiting). Return >0 if condition is met, 0
+ * if a timeout occurred, -1 if an error or interrupt occurred.
+ *
+ * Timeout is infinite if end_time is -1. Timeout is immediate (no blocking)
+ * if end_time is 0 (or indeed, any time before now).
+ */
+static int
+pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time)
+{
+ /*
+ * We use functions in the following order if available:
+ * - ppoll(2) OR pselect(2)
+ * - poll(2) OR select(2)
+ */
+#ifdef HAVE_POLL
+ struct pollfd input_fd;
+#ifdef HAVE_PPOLL
+ int rc;
+ sigset_t emptyset;
+ sigset_t blockset;
+ sigset_t origset;
+ struct timespec timeout;
+ struct timespec *ptr_timeout;
+#else
+ int timeout_ms;
+#endif
+
+ if (!forRead && !forWrite)
+ return 0;
+
+ input_fd.fd = sock;
+ input_fd.events = POLLERR;
+ input_fd.revents = 0;
+
+ if (forRead)
+ input_fd.events |= POLLIN;
+ if (forWrite)
+ input_fd.events |= POLLOUT;
+
+ /* Compute appropriate timeout interval */
+#ifdef HAVE_PPOLL
+ sigemptyset(&blockset);
+ sigaddset(&blockset, SIGINT);
+ sigprocmask(SIG_BLOCK, &blockset, &origset);
+
+ if (end_time == ((time_t) -1))
+ ptr_timeout = NULL;
+ else
+ {
+ timeout.tv_sec = end_time;
+ timeout.tv_nsec = 0;
+ ptr_timeout = &timeout;
+ }
+#else
+ if (end_time == ((time_t) -1))
+ timeout_ms = -1;
+ else
+ {
+ time_t now = time(NULL);
+
+ if (end_time > now)
+ timeout_ms = (end_time - now) * 1000;
+ else
+ timeout_ms = 0;
+ }
+#endif
+
+#ifdef HAVE_PPOLL
+ sigemptyset(&emptyset);
+ if (cancel_pressed)
+ {
+ sigprocmask(SIG_SETMASK, &origset, NULL);
+ return 1;
+ }
+
+ rc = ppoll(&input_fd, 1, ptr_timeout, &emptyset);
+ sigprocmask(SIG_SETMASK, &origset, NULL);
+ return rc;
+#else
+ return poll(&input_fd, 1, timeout_ms);
+#endif
+#else /* !HAVE_POLL */
+
+ fd_set input_mask;
+ fd_set output_mask;
+ fd_set except_mask;
+#ifdef HAVE_PSELECT
+ int rc;
+ sigset_t emptyset;
+ sigset_t blockset;
+ sigset_t origset;
+ struct timespec timeout;
+ struct timespec *ptr_timeout;
+#else
+ struct timeval timeout;
+ struct timeval *ptr_timeout;
+#endif
+
+ if (!forRead && !forWrite)
+ return 0;
+
+ FD_ZERO(&input_mask);
+ FD_ZERO(&output_mask);
+ FD_ZERO(&except_mask);
+ if (forRead)
+ FD_SET(sock, &input_mask);
+
+ if (forWrite)
+ FD_SET(sock, &output_mask);
+ FD_SET(sock, &except_mask);
+
+ /* Compute appropriate timeout interval */
+#ifdef HAVE_PSELECT
+ sigemptyset(&blockset);
+ sigaddset(&blockset, SIGINT);
+ sigprocmask(SIG_BLOCK, &blockset, &origset);
+
+ if (end_time == ((time_t) -1))
+ ptr_timeout = NULL;
+ else
+ {
+ timeout.tv_sec = end_time;
+ timeout.tv_nsec = 0;
+ ptr_timeout = &timeout;
+ }
+#else
+ if (end_time == ((time_t) -1))
+ ptr_timeout = NULL;
+ else
+ {
+ time_t now = time(NULL);
+
+ if (end_time > now)
+ timeout.tv_sec = end_time - now;
+ else
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 0;
+ ptr_timeout = &timeout;
+ }
+#endif
+
+#ifdef HAVE_PSELECT
+ sigemptyset(&emptyset);
+ if (cancel_pressed)
+ {
+ sigprocmask(SIG_SETMASK, &origset, NULL);
+ return 1;
+ }
+
+ rc = pselect(sock + 1, &input_mask, &output_mask,
+ &except_mask, ptr_timeout, &emptyset);
+ sigprocmask(SIG_SETMASK, &origset, NULL);
+ return rc;
+#else
+ return select(sock + 1, &input_mask, &output_mask,
+ &except_mask, ptr_timeout);
+#endif
+#endif /* HAVE_POLL */
+}
+
/*
* Ask the user for a password; 'username' is the username the
* password is for, if one has been explicitly specified.
@@ -3324,6 +3493,7 @@ do_connect(enum trivalue reuse_previous_specification,
bool keep_password = true;
bool has_connection_string;
bool reuse_previous;
+ bool for_read;
has_connection_string = dbname ?
recognized_connection_string(dbname) : false;
@@ -3614,11 +3784,61 @@ do_connect(enum trivalue reuse_previous_specification,
values[paramnum] = NULL;
/* Note we do not want libpq to re-expand the dbname parameter */
- n_conn = PQconnectdbParams(keywords, values, false);
+ n_conn = PQconnectStartParams(keywords, values, false);
pg_free(keywords);
pg_free(values);
+ for_read = false;
+ while (true) {
+ int rc;
+ int sock;
+ time_t timeout;
+
+ if (cancel_pressed)
+ break;
+
+ sock = PQsocket(n_conn);
+ if (sock == -1)
+ break;
+
+ /*
+ * We use ppoll(2)/pselect(2) to account for the race condition in
+ * which SIGINT is sent after checking cancel_pressed. But on
+ * platforms that don't have either function, we can just spin the
+ * CPU a bit polling, so set the timeout to 1 second if we don't
+ * have the aforementioned functions, otherwise set timeout to
+ * a negative value indicating we will sit and wait forever.
+ */
+#if defined(HAVE_PPOLL) || defined(HAVE_PSELECT)
+ timeout = -1;
+#else
+ timeout = 1;
+#endif
+
+ rc = pqSocketPoll(sock, for_read, !for_read, timeout);
+ if (rc == -1)
+ {
+ success = false;
+ break;
+ }
+
+ switch (PQconnectPoll(n_conn)) {
+ case PGRES_POLLING_OK:
+ case PGRES_POLLING_FAILED:
+ goto finish;
+ case PGRES_POLLING_READING:
+ for_read = true;
+ continue;
+ case PGRES_POLLING_WRITING:
+ for_read = false;
+ continue;
+ case PGRES_POLLING_ACTIVE:
+ pg_unreachable();
+ }
+ }
+
+ finish:
if (PQstatus(n_conn) == CONNECTION_OK)
break;
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index d8a2985567..f9fd7d0de7 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -333,6 +333,9 @@
/* Define to 1 if you have the `ppoll' function. */
#undef HAVE_PPOLL
+/* Define to 1 if you have the `pselect' function. */
+#undef HAVE_PSELECT
+
/* Define if you have POSIX threads libraries and header files. */
#undef HAVE_PTHREAD
diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm
index 98a5b5d872..d035f44f73 100644
--- a/src/tools/msvc/Solution.pm
+++ b/src/tools/msvc/Solution.pm
@@ -308,6 +308,7 @@ sub GenerateFiles
HAVE_POSIX_FADVISE => undef,
HAVE_POSIX_FALLOCATE => undef,
HAVE_PPOLL => undef,
+ HAVE_PSELECT => undef,
HAVE_PTHREAD => undef,
HAVE_PTHREAD_BARRIER_WAIT => undef,
HAVE_PTHREAD_IS_THREADED_NP => undef,
--
Tristan Partin
Neon (https://neon.tech)