On Sun, 28 Jan 2024 at 10:51, Jelte Fennema-Nio <postg...@jeltef.nl> wrote:
> Both of those are fixed now.

Okay, there turned out to also be an issue on Windows with
setKeepalivesWin32 not being available in fe-cancel.c. That's fixed
now too (as well as some minor formatting issues).
From 4efbb0c75341f4612f0c5b8d5d3fe3f8f9c3b43c Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Fri, 26 Jan 2024 17:01:28 +0100
Subject: [PATCH v28 2/5] libpq: Add pq_release_conn_hosts function

In a follow up PR we'll need to free this connhost field in a function
defined in fe-cancel.c

So this extracts the logic to a dedicated extern function.
---
 src/interfaces/libpq/fe-connect.c | 38 ++++++++++++++++++++-----------
 src/interfaces/libpq/libpq-int.h  |  1 +
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 5d08b4904d3..bc1f6521650 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -4395,19 +4395,7 @@ freePGconn(PGconn *conn)
 		free(conn->events[i].name);
 	}
 
-	/* clean up pg_conn_host structures */
-	for (int i = 0; i < conn->nconnhost; ++i)
-	{
-		free(conn->connhost[i].host);
-		free(conn->connhost[i].hostaddr);
-		free(conn->connhost[i].port);
-		if (conn->connhost[i].password != NULL)
-		{
-			explicit_bzero(conn->connhost[i].password, strlen(conn->connhost[i].password));
-			free(conn->connhost[i].password);
-		}
-	}
-	free(conn->connhost);
+	pq_release_conn_hosts(conn);
 
 	free(conn->client_encoding_initial);
 	free(conn->events);
@@ -4526,6 +4514,30 @@ release_conn_addrinfo(PGconn *conn)
 	}
 }
 
+/*
+ * pq_release_conn_hosts
+ *	 - Free the host list in the PGconn.
+ */
+void
+pq_release_conn_hosts(PGconn *conn)
+{
+	if (conn->connhost)
+	{
+		for (int i = 0; i < conn->nconnhost; ++i)
+		{
+			free(conn->connhost[i].host);
+			free(conn->connhost[i].hostaddr);
+			free(conn->connhost[i].port);
+			if (conn->connhost[i].password != NULL)
+			{
+				explicit_bzero(conn->connhost[i].password, strlen(conn->connhost[i].password));
+				free(conn->connhost[i].password);
+			}
+		}
+		free(conn->connhost);
+	}
+}
+
 /*
  * sendTerminateConn
  *	 - Send a terminate message to backend.
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 48c10b474f5..4cbad2c2c83 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -680,6 +680,7 @@ extern int	pqPacketSend(PGconn *conn, char pack_type,
 extern bool pqGetHomeDirectory(char *buf, int bufsize);
 extern bool pq_parse_int_param(const char *value, int *result, PGconn *conn,
 							   const char *context);
+extern void pq_release_conn_hosts(PGconn *conn);
 
 extern pgthreadlock_t pg_g_threadlock;
 
-- 
2.34.1

From f1168ac4c3dd758a77be3ceb8c40bacb9aebef8c Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Fri, 26 Jan 2024 16:47:51 +0100
Subject: [PATCH v28 3/5] libpq: Change some static functions to extern

This is in preparation of a follow up commit that starts using these
functions from fe-cancel.c.
---
 src/interfaces/libpq/fe-connect.c | 85 +++++++++++++++----------------
 src/interfaces/libpq/libpq-int.h  |  6 +++
 2 files changed, 46 insertions(+), 45 deletions(-)

diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index bc1f6521650..aeb3adc0e31 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -387,15 +387,10 @@ static const char uri_designator[] = "postgresql://";
 static const char short_uri_designator[] = "postgres://";
 
 static bool connectOptions1(PGconn *conn, const char *conninfo);
-static bool connectOptions2(PGconn *conn);
-static int	connectDBStart(PGconn *conn);
-static int	connectDBComplete(PGconn *conn);
 static PGPing internal_ping(PGconn *conn);
-static PGconn *makeEmptyPGconn(void);
 static void pqFreeCommandQueue(PGcmdQueueEntry *queue);
 static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions);
 static void freePGconn(PGconn *conn);
-static void closePGconn(PGconn *conn);
 static void release_conn_addrinfo(PGconn *conn);
 static int	store_conn_addrinfo(PGconn *conn, struct addrinfo *addrlist);
 static void sendTerminateConn(PGconn *conn);
@@ -644,7 +639,7 @@ pqDropServerData(PGconn *conn)
  * PQconnectStart or PQconnectStartParams (which differ in the same way as
  * PQconnectdb and PQconnectdbParams) and PQconnectPoll.
  *
- * Internally, the static functions connectDBStart, connectDBComplete
+ * Internally, the static functions pqConnectDBStart, pqConnectDBComplete
  * are part of the connection procedure.
  */
 
@@ -678,7 +673,7 @@ PQconnectdbParams(const char *const *keywords,
 	PGconn	   *conn = PQconnectStartParams(keywords, values, expand_dbname);
 
 	if (conn && conn->status != CONNECTION_BAD)
-		(void) connectDBComplete(conn);
+		(void) pqConnectDBComplete(conn);
 
 	return conn;
 }
@@ -731,7 +726,7 @@ PQconnectdb(const char *conninfo)
 	PGconn	   *conn = PQconnectStart(conninfo);
 
 	if (conn && conn->status != CONNECTION_BAD)
-		(void) connectDBComplete(conn);
+		(void) pqConnectDBComplete(conn);
 
 	return conn;
 }
@@ -785,7 +780,7 @@ PQconnectStartParams(const char *const *keywords,
 	 * to initialize conn->errorMessage to empty.  All subsequent steps during
 	 * connection initialization will only append to that buffer.
 	 */
-	conn = makeEmptyPGconn();
+	conn = pqMakeEmptyPGconn();
 	if (conn == NULL)
 		return NULL;
 
@@ -819,15 +814,15 @@ PQconnectStartParams(const char *const *keywords,
 	/*
 	 * Compute derived options
 	 */
-	if (!connectOptions2(conn))
+	if (!pqConnectOptions2(conn))
 		return conn;
 
 	/*
 	 * Connect to the database
 	 */
-	if (!connectDBStart(conn))
+	if (!pqConnectDBStart(conn))
 	{
-		/* Just in case we failed to set it in connectDBStart */
+		/* Just in case we failed to set it in pqConnectDBStart */
 		conn->status = CONNECTION_BAD;
 	}
 
@@ -863,7 +858,7 @@ PQconnectStart(const char *conninfo)
 	 * to initialize conn->errorMessage to empty.  All subsequent steps during
 	 * connection initialization will only append to that buffer.
 	 */
-	conn = makeEmptyPGconn();
+	conn = pqMakeEmptyPGconn();
 	if (conn == NULL)
 		return NULL;
 
@@ -876,15 +871,15 @@ PQconnectStart(const char *conninfo)
 	/*
 	 * Compute derived options
 	 */
-	if (!connectOptions2(conn))
+	if (!pqConnectOptions2(conn))
 		return conn;
 
 	/*
 	 * Connect to the database
 	 */
-	if (!connectDBStart(conn))
+	if (!pqConnectDBStart(conn))
 	{
-		/* Just in case we failed to set it in connectDBStart */
+		/* Just in case we failed to set it in pqConnectDBStart */
 		conn->status = CONNECTION_BAD;
 	}
 
@@ -895,7 +890,7 @@ PQconnectStart(const char *conninfo)
  * Move option values into conn structure
  *
  * Don't put anything cute here --- intelligence should be in
- * connectOptions2 ...
+ * pqConnectOptions2 ...
  *
  * Returns true on success. On failure, returns false and sets error message.
  */
@@ -933,7 +928,7 @@ fillPGconn(PGconn *conn, PQconninfoOption *connOptions)
  *
  * Internal subroutine to set up connection parameters given an already-
  * created PGconn and a conninfo string.  Derived settings should be
- * processed by calling connectOptions2 next.  (We split them because
+ * processed by calling pqConnectOptions2 next.  (We split them because
  * PQsetdbLogin overrides defaults in between.)
  *
  * Returns true if OK, false if trouble (in which case errorMessage is set
@@ -1055,15 +1050,15 @@ libpq_prng_init(PGconn *conn)
 }
 
 /*
- *		connectOptions2
+ *		pqConnectOptions2
  *
  * Compute derived connection options after absorbing all user-supplied info.
  *
  * Returns true if OK, false if trouble (in which case errorMessage is set
  * and so is conn->status).
  */
-static bool
-connectOptions2(PGconn *conn)
+bool
+pqConnectOptions2(PGconn *conn)
 {
 	int			i;
 
@@ -1822,7 +1817,7 @@ PQsetdbLogin(const char *pghost, const char *pgport, const char *pgoptions,
 	 * to initialize conn->errorMessage to empty.  All subsequent steps during
 	 * connection initialization will only append to that buffer.
 	 */
-	conn = makeEmptyPGconn();
+	conn = pqMakeEmptyPGconn();
 	if (conn == NULL)
 		return NULL;
 
@@ -1901,14 +1896,14 @@ PQsetdbLogin(const char *pghost, const char *pgport, const char *pgoptions,
 	/*
 	 * Compute derived options
 	 */
-	if (!connectOptions2(conn))
+	if (!pqConnectOptions2(conn))
 		return conn;
 
 	/*
 	 * Connect to the database
 	 */
-	if (connectDBStart(conn))
-		(void) connectDBComplete(conn);
+	if (pqConnectDBStart(conn))
+		(void) pqConnectDBComplete(conn);
 
 	return conn;
 
@@ -2323,14 +2318,14 @@ setTCPUserTimeout(PGconn *conn)
 }
 
 /* ----------
- * connectDBStart -
+ * pqConnectDBStart -
  *		Begin the process of making a connection to the backend.
  *
  * Returns 1 if successful, 0 if not.
  * ----------
  */
-static int
-connectDBStart(PGconn *conn)
+int
+pqConnectDBStart(PGconn *conn)
 {
 	if (!conn)
 		return 0;
@@ -2393,14 +2388,14 @@ connect_errReturn:
 
 
 /*
- *		connectDBComplete
+ *		pqConnectDBComplete
  *
  * Block and complete a connection.
  *
  * Returns 1 on success, 0 on failure.
  */
-static int
-connectDBComplete(PGconn *conn)
+int
+pqConnectDBComplete(PGconn *conn)
 {
 	PostgresPollingStatusType flag = PGRES_POLLING_WRITING;
 	time_t		finish_time = ((time_t) -1);
@@ -2750,7 +2745,7 @@ keep_going:						/* We will come back to here until there is
 			 * combining it with the insertion.
 			 *
 			 * We don't need to initialize conn->prng_state here, because that
-			 * already happened in connectOptions2.
+			 * already happened in pqConnectOptions2.
 			 */
 			for (int i = 1; i < conn->naddr; i++)
 			{
@@ -4227,7 +4222,7 @@ internal_ping(PGconn *conn)
 
 	/* Attempt to complete the connection */
 	if (conn->status != CONNECTION_BAD)
-		(void) connectDBComplete(conn);
+		(void) pqConnectDBComplete(conn);
 
 	/* Definitely OK if we succeeded */
 	if (conn->status != CONNECTION_BAD)
@@ -4279,11 +4274,11 @@ internal_ping(PGconn *conn)
 
 
 /*
- * makeEmptyPGconn
+ * pqMakeEmptyPGconn
  *	 - create a PGconn data structure with (as yet) no interesting data
  */
-static PGconn *
-makeEmptyPGconn(void)
+PGconn *
+pqMakeEmptyPGconn(void)
 {
 	PGconn	   *conn;
 
@@ -4376,7 +4371,7 @@ makeEmptyPGconn(void)
  * freePGconn
  *	 - free an idle (closed) PGconn data structure
  *
- * NOTE: this should not overlap any functionality with closePGconn().
+ * NOTE: this should not overlap any functionality with pqClosePGconn().
  * Clearing/resetting of transient state belongs there; what we do here is
  * release data that is to be held for the life of the PGconn structure.
  * If a value ought to be cleared/freed during PQreset(), do it there not here.
@@ -4562,15 +4557,15 @@ sendTerminateConn(PGconn *conn)
 }
 
 /*
- * closePGconn
+ * pqClosePGconn
  *	 - properly close a connection to the backend
  *
  * This should reset or release all transient state, but NOT the connection
  * parameters.  On exit, the PGconn should be in condition to start a fresh
  * connection with the same parameters (see PQreset()).
  */
-static void
-closePGconn(PGconn *conn)
+void
+pqClosePGconn(PGconn *conn)
 {
 	/*
 	 * If possible, send Terminate message to close the connection politely.
@@ -4613,7 +4608,7 @@ PQfinish(PGconn *conn)
 {
 	if (conn)
 	{
-		closePGconn(conn);
+		pqClosePGconn(conn);
 		freePGconn(conn);
 	}
 }
@@ -4627,9 +4622,9 @@ PQreset(PGconn *conn)
 {
 	if (conn)
 	{
-		closePGconn(conn);
+		pqClosePGconn(conn);
 
-		if (connectDBStart(conn) && connectDBComplete(conn))
+		if (pqConnectDBStart(conn) && pqConnectDBComplete(conn))
 		{
 			/*
 			 * Notify event procs of successful reset.
@@ -4660,9 +4655,9 @@ PQresetStart(PGconn *conn)
 {
 	if (conn)
 	{
-		closePGconn(conn);
+		pqClosePGconn(conn);
 
-		return connectDBStart(conn);
+		return pqConnectDBStart(conn);
 	}
 
 	return 0;
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4cbad2c2c83..c1ff12dd396 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -681,6 +681,12 @@ extern bool pqGetHomeDirectory(char *buf, int bufsize);
 extern bool pq_parse_int_param(const char *value, int *result, PGconn *conn,
 							   const char *context);
 extern void pq_release_conn_hosts(PGconn *conn);
+extern bool pqConnectOptions2(PGconn *conn);
+extern int	pqConnectDBStart(PGconn *conn);
+extern int	pqConnectDBComplete(PGconn *conn);
+extern PGconn *pqMakeEmptyPGconn(void);
+extern bool pqCopyPGconn(PGconn *srcConn, PGconn *dstConn);
+extern void pqClosePGconn(PGconn *conn);
 
 extern pgthreadlock_t pg_g_threadlock;
 
-- 
2.34.1

From d5cdd1451ecd9160d285bdfe3cdcf6df452c5249 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Fri, 26 Jan 2024 14:35:48 +0100
Subject: [PATCH v28 1/5] libpq: Move cancellation related functions to
 fe-cancel.c

In follow up commits we'll add more functions related to cancellations
this groups those all together instead of grouping them with all the
other functions in fe-connect.c
---
 src/interfaces/libpq/Makefile     |   1 +
 src/interfaces/libpq/fe-cancel.c  | 387 ++++++++++++++++++++++++++++
 src/interfaces/libpq/fe-connect.c | 411 ++----------------------------
 src/interfaces/libpq/libpq-int.h  |   6 +
 src/interfaces/libpq/meson.build  |   1 +
 5 files changed, 416 insertions(+), 390 deletions(-)
 create mode 100644 src/interfaces/libpq/fe-cancel.c

diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index fce17bc72a0..bfcc7cdde99 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -30,6 +30,7 @@ endif
 OBJS = \
 	$(WIN32RES) \
 	fe-auth-scram.o \
+	fe-cancel.o \
 	fe-connect.o \
 	fe-exec.o \
 	fe-lobj.o \
diff --git a/src/interfaces/libpq/fe-cancel.c b/src/interfaces/libpq/fe-cancel.c
new file mode 100644
index 00000000000..ce28d39f3f5
--- /dev/null
+++ b/src/interfaces/libpq/fe-cancel.c
@@ -0,0 +1,387 @@
+/*-------------------------------------------------------------------------
+ *
+ * fe-cancel.c
+ *	  functions related to setting up a connection to the backend
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/interfaces/libpq/fe-cancel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#include "libpq-fe.h"
+#include "libpq-int.h"
+#include "port/pg_bswap.h"
+
+/*
+ * PQgetCancel: get a PGcancel structure corresponding to a connection.
+ *
+ * A copy is needed to be able to cancel a running query from a different
+ * thread. If the same structure is used all structure members would have
+ * to be individually locked (if the entire structure was locked, it would
+ * be impossible to cancel a synchronous query because the structure would
+ * have to stay locked for the duration of the query).
+ */
+PGcancel *
+PQgetCancel(PGconn *conn)
+{
+	PGcancel   *cancel;
+
+	if (!conn)
+		return NULL;
+
+	if (conn->sock == PGINVALID_SOCKET)
+		return NULL;
+
+	cancel = malloc(sizeof(PGcancel));
+	if (cancel == NULL)
+		return NULL;
+
+	memcpy(&cancel->raddr, &conn->raddr, sizeof(SockAddr));
+	cancel->be_pid = conn->be_pid;
+	cancel->be_key = conn->be_key;
+	/* We use -1 to indicate an unset connection option */
+	cancel->pgtcp_user_timeout = -1;
+	cancel->keepalives = -1;
+	cancel->keepalives_idle = -1;
+	cancel->keepalives_interval = -1;
+	cancel->keepalives_count = -1;
+	if (conn->pgtcp_user_timeout != NULL)
+	{
+		if (!pq_parse_int_param(conn->pgtcp_user_timeout,
+								&cancel->pgtcp_user_timeout,
+								conn, "tcp_user_timeout"))
+			goto fail;
+	}
+	if (conn->keepalives != NULL)
+	{
+		if (!pq_parse_int_param(conn->keepalives,
+								&cancel->keepalives,
+								conn, "keepalives"))
+			goto fail;
+	}
+	if (conn->keepalives_idle != NULL)
+	{
+		if (!pq_parse_int_param(conn->keepalives_idle,
+								&cancel->keepalives_idle,
+								conn, "keepalives_idle"))
+			goto fail;
+	}
+	if (conn->keepalives_interval != NULL)
+	{
+		if (!pq_parse_int_param(conn->keepalives_interval,
+								&cancel->keepalives_interval,
+								conn, "keepalives_interval"))
+			goto fail;
+	}
+	if (conn->keepalives_count != NULL)
+	{
+		if (!pq_parse_int_param(conn->keepalives_count,
+								&cancel->keepalives_count,
+								conn, "keepalives_count"))
+			goto fail;
+	}
+
+	return cancel;
+
+fail:
+	free(cancel);
+	return NULL;
+}
+
+/* PQfreeCancel: free a cancel structure */
+void
+PQfreeCancel(PGcancel *cancel)
+{
+	free(cancel);
+}
+
+
+/*
+ * Sets an integer socket option on a TCP socket, if the provided value is
+ * not negative.  Returns false if setsockopt fails for some reason.
+ *
+ * CAUTION: This needs to be signal safe, since it's used by PQcancel.
+ */
+#if defined(TCP_USER_TIMEOUT) || !defined(WIN32)
+static bool
+optional_setsockopt(int fd, int protoid, int optid, int value)
+{
+	if (value < 0)
+		return true;
+	if (setsockopt(fd, protoid, optid, (char *) &value, sizeof(value)) < 0)
+		return false;
+	return true;
+}
+#endif
+
+
+/*
+ * PQcancel: request query cancel
+ *
+ * The return value is true if the cancel request was successfully
+ * dispatched, false if not (in which case an error message is available).
+ * Note: successful dispatch is no guarantee that there will be any effect at
+ * the backend.  The application must read the operation result as usual.
+ *
+ * On failure, an error message is stored in *errbuf, which must be of size
+ * errbufsize (recommended size is 256 bytes).  *errbuf is not changed on
+ * success return.
+ *
+ * CAUTION: we want this routine to be safely callable from a signal handler
+ * (for example, an application might want to call it in a SIGINT handler).
+ * This means we cannot use any C library routine that might be non-reentrant.
+ * malloc/free are often non-reentrant, and anything that might call them is
+ * just as dangerous.  We avoid sprintf here for that reason.  Building up
+ * error messages with strcpy/strcat is tedious but should be quite safe.
+ * We also save/restore errno in case the signal handler support doesn't.
+ */
+int
+PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
+{
+	int			save_errno = SOCK_ERRNO;
+	pgsocket	tmpsock = PGINVALID_SOCKET;
+	int			maxlen;
+	struct
+	{
+		uint32		packetlen;
+		CancelRequestPacket cp;
+	}			crp;
+
+	if (!cancel)
+	{
+		strlcpy(errbuf, "PQcancel() -- no cancel object supplied", errbufsize);
+		/* strlcpy probably doesn't change errno, but be paranoid */
+		SOCK_ERRNO_SET(save_errno);
+		return false;
+	}
+
+	/*
+	 * We need to open a temporary connection to the postmaster. Do this with
+	 * only kernel calls.
+	 */
+	if ((tmpsock = socket(cancel->raddr.addr.ss_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
+	{
+		strlcpy(errbuf, "PQcancel() -- socket() failed: ", errbufsize);
+		goto cancel_errReturn;
+	}
+
+	/*
+	 * Since this connection will only be used to send a single packet of
+	 * data, we don't need NODELAY.  We also don't set the socket to
+	 * nonblocking mode, because the API definition of PQcancel requires the
+	 * cancel to be sent in a blocking way.
+	 *
+	 * We do set socket options related to keepalives and other TCP timeouts.
+	 * This ensures that this function does not block indefinitely when
+	 * reasonable keepalive and timeout settings have been provided.
+	 */
+	if (cancel->raddr.addr.ss_family != AF_UNIX &&
+		cancel->keepalives != 0)
+	{
+#ifndef WIN32
+		if (!optional_setsockopt(tmpsock, SOL_SOCKET, SO_KEEPALIVE, 1))
+		{
+			strlcpy(errbuf, "PQcancel() -- setsockopt(SO_KEEPALIVE) failed: ", errbufsize);
+			goto cancel_errReturn;
+		}
+
+#ifdef PG_TCP_KEEPALIVE_IDLE
+		if (!optional_setsockopt(tmpsock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
+								 cancel->keepalives_idle))
+		{
+			strlcpy(errbuf, "PQcancel() -- setsockopt(" PG_TCP_KEEPALIVE_IDLE_STR ") failed: ", errbufsize);
+			goto cancel_errReturn;
+		}
+#endif
+
+#ifdef TCP_KEEPINTVL
+		if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_KEEPINTVL,
+								 cancel->keepalives_interval))
+		{
+			strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_KEEPINTVL) failed: ", errbufsize);
+			goto cancel_errReturn;
+		}
+#endif
+
+#ifdef TCP_KEEPCNT
+		if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_KEEPCNT,
+								 cancel->keepalives_count))
+		{
+			strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_KEEPCNT) failed: ", errbufsize);
+			goto cancel_errReturn;
+		}
+#endif
+
+#else							/* WIN32 */
+
+#ifdef SIO_KEEPALIVE_VALS
+		if (!pqSetKeepalivesWin32(tmpsock,
+								  cancel->keepalives_idle,
+								  cancel->keepalives_interval))
+		{
+			strlcpy(errbuf, "PQcancel() -- WSAIoctl(SIO_KEEPALIVE_VALS) failed: ", errbufsize);
+			goto cancel_errReturn;
+		}
+#endif							/* SIO_KEEPALIVE_VALS */
+#endif							/* WIN32 */
+
+		/* TCP_USER_TIMEOUT works the same way on Unix and Windows */
+#ifdef TCP_USER_TIMEOUT
+		if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_USER_TIMEOUT,
+								 cancel->pgtcp_user_timeout))
+		{
+			strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_USER_TIMEOUT) failed: ", errbufsize);
+			goto cancel_errReturn;
+		}
+#endif
+	}
+
+retry3:
+	if (connect(tmpsock, (struct sockaddr *) &cancel->raddr.addr,
+				cancel->raddr.salen) < 0)
+	{
+		if (SOCK_ERRNO == EINTR)
+			/* Interrupted system call - we'll just try again */
+			goto retry3;
+		strlcpy(errbuf, "PQcancel() -- connect() failed: ", errbufsize);
+		goto cancel_errReturn;
+	}
+
+	/* Create and send the cancel request packet. */
+
+	crp.packetlen = pg_hton32((uint32) sizeof(crp));
+	crp.cp.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE);
+	crp.cp.backendPID = pg_hton32(cancel->be_pid);
+	crp.cp.cancelAuthCode = pg_hton32(cancel->be_key);
+
+retry4:
+	if (send(tmpsock, (char *) &crp, sizeof(crp), 0) != (int) sizeof(crp))
+	{
+		if (SOCK_ERRNO == EINTR)
+			/* Interrupted system call - we'll just try again */
+			goto retry4;
+		strlcpy(errbuf, "PQcancel() -- send() failed: ", errbufsize);
+		goto cancel_errReturn;
+	}
+
+	/*
+	 * Wait for the postmaster to close the connection, which indicates that
+	 * it's processed the request.  Without this delay, we might issue another
+	 * command only to find that our cancel zaps that command instead of the
+	 * one we thought we were canceling.  Note we don't actually expect this
+	 * read to obtain any data, we are just waiting for EOF to be signaled.
+	 */
+retry5:
+	if (recv(tmpsock, (char *) &crp, 1, 0) < 0)
+	{
+		if (SOCK_ERRNO == EINTR)
+			/* Interrupted system call - we'll just try again */
+			goto retry5;
+		/* we ignore other error conditions */
+	}
+
+	/* All done */
+	closesocket(tmpsock);
+	SOCK_ERRNO_SET(save_errno);
+	return true;
+
+cancel_errReturn:
+
+	/*
+	 * Make sure we don't overflow the error buffer. Leave space for the \n at
+	 * the end, and for the terminating zero.
+	 */
+	maxlen = errbufsize - strlen(errbuf) - 2;
+	if (maxlen >= 0)
+	{
+		/*
+		 * We can't invoke strerror here, since it's not signal-safe.  Settle
+		 * for printing the decimal value of errno.  Even that has to be done
+		 * the hard way.
+		 */
+		int			val = SOCK_ERRNO;
+		char		buf[32];
+		char	   *bufp;
+
+		bufp = buf + sizeof(buf) - 1;
+		*bufp = '\0';
+		do
+		{
+			*(--bufp) = (val % 10) + '0';
+			val /= 10;
+		} while (val > 0);
+		bufp -= 6;
+		memcpy(bufp, "error ", 6);
+		strncat(errbuf, bufp, maxlen);
+		strcat(errbuf, "\n");
+	}
+	if (tmpsock != PGINVALID_SOCKET)
+		closesocket(tmpsock);
+	SOCK_ERRNO_SET(save_errno);
+	return false;
+}
+
+/*
+ * PQrequestCancel: old, not thread-safe function for requesting query cancel
+ *
+ * Returns true if able to send the cancel request, false if not.
+ *
+ * On failure, the error message is saved in conn->errorMessage; this means
+ * that this can't be used when there might be other active operations on
+ * the connection object.
+ *
+ * NOTE: error messages will be cut off at the current size of the
+ * error message buffer, since we dare not try to expand conn->errorMessage!
+ */
+int
+PQrequestCancel(PGconn *conn)
+{
+	int			r;
+	PGcancel   *cancel;
+
+	/* Check we have an open connection */
+	if (!conn)
+		return false;
+
+	if (conn->sock == PGINVALID_SOCKET)
+	{
+		strlcpy(conn->errorMessage.data,
+				"PQrequestCancel() -- connection is not open\n",
+				conn->errorMessage.maxlen);
+		conn->errorMessage.len = strlen(conn->errorMessage.data);
+		conn->errorReported = 0;
+
+		return false;
+	}
+
+	cancel = PQgetCancel(conn);
+	if (cancel)
+	{
+		r = PQcancel(cancel, conn->errorMessage.data,
+					 conn->errorMessage.maxlen);
+		PQfreeCancel(cancel);
+	}
+	else
+	{
+		strlcpy(conn->errorMessage.data, "out of memory",
+				conn->errorMessage.maxlen);
+		r = false;
+	}
+
+	if (!r)
+	{
+		conn->errorMessage.len = strlen(conn->errorMessage.data);
+		conn->errorReported = 0;
+	}
+
+	return r;
+}
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 79e0b73d618..5d08b4904d3 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -443,8 +443,6 @@ static void pgpassfileWarning(PGconn *conn);
 static void default_threadlock(int acquire);
 static bool sslVerifyProtocolVersion(const char *version);
 static bool sslVerifyProtocolRange(const char *min, const char *max);
-static bool parse_int_param(const char *value, int *result, PGconn *conn,
-							const char *context);
 
 
 /* global variable because fe-auth.c needs to access it */
@@ -2081,9 +2079,9 @@ useKeepalives(PGconn *conn)
  * store it in *result, complaining if there is any trailing garbage or an
  * overflow.  This allows any number of leading and trailing whitespaces.
  */
-static bool
-parse_int_param(const char *value, int *result, PGconn *conn,
-				const char *context)
+bool
+pq_parse_int_param(const char *value, int *result, PGconn *conn,
+				   const char *context)
 {
 	char	   *end;
 	long		numval;
@@ -2134,8 +2132,8 @@ setKeepalivesIdle(PGconn *conn)
 	if (conn->keepalives_idle == NULL)
 		return 1;
 
-	if (!parse_int_param(conn->keepalives_idle, &idle, conn,
-						 "keepalives_idle"))
+	if (!pq_parse_int_param(conn->keepalives_idle, &idle, conn,
+							"keepalives_idle"))
 		return 0;
 	if (idle < 0)
 		idle = 0;
@@ -2168,8 +2166,8 @@ setKeepalivesInterval(PGconn *conn)
 	if (conn->keepalives_interval == NULL)
 		return 1;
 
-	if (!parse_int_param(conn->keepalives_interval, &interval, conn,
-						 "keepalives_interval"))
+	if (!pq_parse_int_param(conn->keepalives_interval, &interval, conn,
+							"keepalives_interval"))
 		return 0;
 	if (interval < 0)
 		interval = 0;
@@ -2203,8 +2201,8 @@ setKeepalivesCount(PGconn *conn)
 	if (conn->keepalives_count == NULL)
 		return 1;
 
-	if (!parse_int_param(conn->keepalives_count, &count, conn,
-						 "keepalives_count"))
+	if (!pq_parse_int_param(conn->keepalives_count, &count, conn,
+							"keepalives_count"))
 		return 0;
 	if (count < 0)
 		count = 0;
@@ -2233,8 +2231,8 @@ setKeepalivesCount(PGconn *conn)
  *
  * CAUTION: This needs to be signal safe, since it's used by PQcancel.
  */
-static int
-setKeepalivesWin32(pgsocket sock, int idle, int interval)
+int
+pqSetKeepalivesWin32(pgsocket sock, int idle, int interval)
 {
 	struct tcp_keepalive ka;
 	DWORD		retsize;
@@ -2269,15 +2267,15 @@ prepKeepalivesWin32(PGconn *conn)
 	int			interval = -1;
 
 	if (conn->keepalives_idle &&
-		!parse_int_param(conn->keepalives_idle, &idle, conn,
-						 "keepalives_idle"))
+		!pq_parse_int_param(conn->keepalives_idle, &idle, conn,
+							"keepalives_idle"))
 		return 0;
 	if (conn->keepalives_interval &&
-		!parse_int_param(conn->keepalives_interval, &interval, conn,
-						 "keepalives_interval"))
+		!pq_parse_int_param(conn->keepalives_interval, &interval, conn,
+							"keepalives_interval"))
 		return 0;
 
-	if (!setKeepalivesWin32(conn->sock, idle, interval))
+	if (!pqSetKeepalivesWin32(conn->sock, idle, interval))
 	{
 		libpq_append_conn_error(conn, "%s(%s) failed: error code %d",
 								"WSAIoctl", "SIO_KEEPALIVE_VALS",
@@ -2300,8 +2298,8 @@ setTCPUserTimeout(PGconn *conn)
 	if (conn->pgtcp_user_timeout == NULL)
 		return 1;
 
-	if (!parse_int_param(conn->pgtcp_user_timeout, &timeout, conn,
-						 "tcp_user_timeout"))
+	if (!pq_parse_int_param(conn->pgtcp_user_timeout, &timeout, conn,
+							"tcp_user_timeout"))
 		return 0;
 
 	if (timeout < 0)
@@ -2418,8 +2416,8 @@ connectDBComplete(PGconn *conn)
 	 */
 	if (conn->connect_timeout != NULL)
 	{
-		if (!parse_int_param(conn->connect_timeout, &timeout, conn,
-							 "connect_timeout"))
+		if (!pq_parse_int_param(conn->connect_timeout, &timeout, conn,
+								"connect_timeout"))
 		{
 			/* mark the connection as bad to report the parsing failure */
 			conn->status = CONNECTION_BAD;
@@ -2666,7 +2664,7 @@ keep_going:						/* We will come back to here until there is
 			thisport = DEF_PGPORT;
 		else
 		{
-			if (!parse_int_param(ch->port, &thisport, conn, "port"))
+			if (!pq_parse_int_param(ch->port, &thisport, conn, "port"))
 				goto error_return;
 
 			if (thisport < 1 || thisport > 65535)
@@ -4694,373 +4692,6 @@ PQresetPoll(PGconn *conn)
 	return PGRES_POLLING_FAILED;
 }
 
-/*
- * PQgetCancel: get a PGcancel structure corresponding to a connection.
- *
- * A copy is needed to be able to cancel a running query from a different
- * thread. If the same structure is used all structure members would have
- * to be individually locked (if the entire structure was locked, it would
- * be impossible to cancel a synchronous query because the structure would
- * have to stay locked for the duration of the query).
- */
-PGcancel *
-PQgetCancel(PGconn *conn)
-{
-	PGcancel   *cancel;
-
-	if (!conn)
-		return NULL;
-
-	if (conn->sock == PGINVALID_SOCKET)
-		return NULL;
-
-	cancel = malloc(sizeof(PGcancel));
-	if (cancel == NULL)
-		return NULL;
-
-	memcpy(&cancel->raddr, &conn->raddr, sizeof(SockAddr));
-	cancel->be_pid = conn->be_pid;
-	cancel->be_key = conn->be_key;
-	/* We use -1 to indicate an unset connection option */
-	cancel->pgtcp_user_timeout = -1;
-	cancel->keepalives = -1;
-	cancel->keepalives_idle = -1;
-	cancel->keepalives_interval = -1;
-	cancel->keepalives_count = -1;
-	if (conn->pgtcp_user_timeout != NULL)
-	{
-		if (!parse_int_param(conn->pgtcp_user_timeout,
-							 &cancel->pgtcp_user_timeout,
-							 conn, "tcp_user_timeout"))
-			goto fail;
-	}
-	if (conn->keepalives != NULL)
-	{
-		if (!parse_int_param(conn->keepalives,
-							 &cancel->keepalives,
-							 conn, "keepalives"))
-			goto fail;
-	}
-	if (conn->keepalives_idle != NULL)
-	{
-		if (!parse_int_param(conn->keepalives_idle,
-							 &cancel->keepalives_idle,
-							 conn, "keepalives_idle"))
-			goto fail;
-	}
-	if (conn->keepalives_interval != NULL)
-	{
-		if (!parse_int_param(conn->keepalives_interval,
-							 &cancel->keepalives_interval,
-							 conn, "keepalives_interval"))
-			goto fail;
-	}
-	if (conn->keepalives_count != NULL)
-	{
-		if (!parse_int_param(conn->keepalives_count,
-							 &cancel->keepalives_count,
-							 conn, "keepalives_count"))
-			goto fail;
-	}
-
-	return cancel;
-
-fail:
-	free(cancel);
-	return NULL;
-}
-
-/* PQfreeCancel: free a cancel structure */
-void
-PQfreeCancel(PGcancel *cancel)
-{
-	free(cancel);
-}
-
-
-/*
- * Sets an integer socket option on a TCP socket, if the provided value is
- * not negative.  Returns false if setsockopt fails for some reason.
- *
- * CAUTION: This needs to be signal safe, since it's used by PQcancel.
- */
-#if defined(TCP_USER_TIMEOUT) || !defined(WIN32)
-static bool
-optional_setsockopt(int fd, int protoid, int optid, int value)
-{
-	if (value < 0)
-		return true;
-	if (setsockopt(fd, protoid, optid, (char *) &value, sizeof(value)) < 0)
-		return false;
-	return true;
-}
-#endif
-
-
-/*
- * PQcancel: request query cancel
- *
- * The return value is true if the cancel request was successfully
- * dispatched, false if not (in which case an error message is available).
- * Note: successful dispatch is no guarantee that there will be any effect at
- * the backend.  The application must read the operation result as usual.
- *
- * On failure, an error message is stored in *errbuf, which must be of size
- * errbufsize (recommended size is 256 bytes).  *errbuf is not changed on
- * success return.
- *
- * CAUTION: we want this routine to be safely callable from a signal handler
- * (for example, an application might want to call it in a SIGINT handler).
- * This means we cannot use any C library routine that might be non-reentrant.
- * malloc/free are often non-reentrant, and anything that might call them is
- * just as dangerous.  We avoid sprintf here for that reason.  Building up
- * error messages with strcpy/strcat is tedious but should be quite safe.
- * We also save/restore errno in case the signal handler support doesn't.
- */
-int
-PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
-{
-	int			save_errno = SOCK_ERRNO;
-	pgsocket	tmpsock = PGINVALID_SOCKET;
-	int			maxlen;
-	struct
-	{
-		uint32		packetlen;
-		CancelRequestPacket cp;
-	}			crp;
-
-	if (!cancel)
-	{
-		strlcpy(errbuf, "PQcancel() -- no cancel object supplied", errbufsize);
-		/* strlcpy probably doesn't change errno, but be paranoid */
-		SOCK_ERRNO_SET(save_errno);
-		return false;
-	}
-
-	/*
-	 * We need to open a temporary connection to the postmaster. Do this with
-	 * only kernel calls.
-	 */
-	if ((tmpsock = socket(cancel->raddr.addr.ss_family, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
-	{
-		strlcpy(errbuf, "PQcancel() -- socket() failed: ", errbufsize);
-		goto cancel_errReturn;
-	}
-
-	/*
-	 * Since this connection will only be used to send a single packet of
-	 * data, we don't need NODELAY.  We also don't set the socket to
-	 * nonblocking mode, because the API definition of PQcancel requires the
-	 * cancel to be sent in a blocking way.
-	 *
-	 * We do set socket options related to keepalives and other TCP timeouts.
-	 * This ensures that this function does not block indefinitely when
-	 * reasonable keepalive and timeout settings have been provided.
-	 */
-	if (cancel->raddr.addr.ss_family != AF_UNIX &&
-		cancel->keepalives != 0)
-	{
-#ifndef WIN32
-		if (!optional_setsockopt(tmpsock, SOL_SOCKET, SO_KEEPALIVE, 1))
-		{
-			strlcpy(errbuf, "PQcancel() -- setsockopt(SO_KEEPALIVE) failed: ", errbufsize);
-			goto cancel_errReturn;
-		}
-
-#ifdef PG_TCP_KEEPALIVE_IDLE
-		if (!optional_setsockopt(tmpsock, IPPROTO_TCP, PG_TCP_KEEPALIVE_IDLE,
-								 cancel->keepalives_idle))
-		{
-			strlcpy(errbuf, "PQcancel() -- setsockopt(" PG_TCP_KEEPALIVE_IDLE_STR ") failed: ", errbufsize);
-			goto cancel_errReturn;
-		}
-#endif
-
-#ifdef TCP_KEEPINTVL
-		if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_KEEPINTVL,
-								 cancel->keepalives_interval))
-		{
-			strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_KEEPINTVL) failed: ", errbufsize);
-			goto cancel_errReturn;
-		}
-#endif
-
-#ifdef TCP_KEEPCNT
-		if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_KEEPCNT,
-								 cancel->keepalives_count))
-		{
-			strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_KEEPCNT) failed: ", errbufsize);
-			goto cancel_errReturn;
-		}
-#endif
-
-#else							/* WIN32 */
-
-#ifdef SIO_KEEPALIVE_VALS
-		if (!setKeepalivesWin32(tmpsock,
-								cancel->keepalives_idle,
-								cancel->keepalives_interval))
-		{
-			strlcpy(errbuf, "PQcancel() -- WSAIoctl(SIO_KEEPALIVE_VALS) failed: ", errbufsize);
-			goto cancel_errReturn;
-		}
-#endif							/* SIO_KEEPALIVE_VALS */
-#endif							/* WIN32 */
-
-		/* TCP_USER_TIMEOUT works the same way on Unix and Windows */
-#ifdef TCP_USER_TIMEOUT
-		if (!optional_setsockopt(tmpsock, IPPROTO_TCP, TCP_USER_TIMEOUT,
-								 cancel->pgtcp_user_timeout))
-		{
-			strlcpy(errbuf, "PQcancel() -- setsockopt(TCP_USER_TIMEOUT) failed: ", errbufsize);
-			goto cancel_errReturn;
-		}
-#endif
-	}
-
-retry3:
-	if (connect(tmpsock, (struct sockaddr *) &cancel->raddr.addr,
-				cancel->raddr.salen) < 0)
-	{
-		if (SOCK_ERRNO == EINTR)
-			/* Interrupted system call - we'll just try again */
-			goto retry3;
-		strlcpy(errbuf, "PQcancel() -- connect() failed: ", errbufsize);
-		goto cancel_errReturn;
-	}
-
-	/* Create and send the cancel request packet. */
-
-	crp.packetlen = pg_hton32((uint32) sizeof(crp));
-	crp.cp.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE);
-	crp.cp.backendPID = pg_hton32(cancel->be_pid);
-	crp.cp.cancelAuthCode = pg_hton32(cancel->be_key);
-
-retry4:
-	if (send(tmpsock, (char *) &crp, sizeof(crp), 0) != (int) sizeof(crp))
-	{
-		if (SOCK_ERRNO == EINTR)
-			/* Interrupted system call - we'll just try again */
-			goto retry4;
-		strlcpy(errbuf, "PQcancel() -- send() failed: ", errbufsize);
-		goto cancel_errReturn;
-	}
-
-	/*
-	 * Wait for the postmaster to close the connection, which indicates that
-	 * it's processed the request.  Without this delay, we might issue another
-	 * command only to find that our cancel zaps that command instead of the
-	 * one we thought we were canceling.  Note we don't actually expect this
-	 * read to obtain any data, we are just waiting for EOF to be signaled.
-	 */
-retry5:
-	if (recv(tmpsock, (char *) &crp, 1, 0) < 0)
-	{
-		if (SOCK_ERRNO == EINTR)
-			/* Interrupted system call - we'll just try again */
-			goto retry5;
-		/* we ignore other error conditions */
-	}
-
-	/* All done */
-	closesocket(tmpsock);
-	SOCK_ERRNO_SET(save_errno);
-	return true;
-
-cancel_errReturn:
-
-	/*
-	 * Make sure we don't overflow the error buffer. Leave space for the \n at
-	 * the end, and for the terminating zero.
-	 */
-	maxlen = errbufsize - strlen(errbuf) - 2;
-	if (maxlen >= 0)
-	{
-		/*
-		 * We can't invoke strerror here, since it's not signal-safe.  Settle
-		 * for printing the decimal value of errno.  Even that has to be done
-		 * the hard way.
-		 */
-		int			val = SOCK_ERRNO;
-		char		buf[32];
-		char	   *bufp;
-
-		bufp = buf + sizeof(buf) - 1;
-		*bufp = '\0';
-		do
-		{
-			*(--bufp) = (val % 10) + '0';
-			val /= 10;
-		} while (val > 0);
-		bufp -= 6;
-		memcpy(bufp, "error ", 6);
-		strncat(errbuf, bufp, maxlen);
-		strcat(errbuf, "\n");
-	}
-	if (tmpsock != PGINVALID_SOCKET)
-		closesocket(tmpsock);
-	SOCK_ERRNO_SET(save_errno);
-	return false;
-}
-
-
-/*
- * PQrequestCancel: old, not thread-safe function for requesting query cancel
- *
- * Returns true if able to send the cancel request, false if not.
- *
- * On failure, the error message is saved in conn->errorMessage; this means
- * that this can't be used when there might be other active operations on
- * the connection object.
- *
- * NOTE: error messages will be cut off at the current size of the
- * error message buffer, since we dare not try to expand conn->errorMessage!
- */
-int
-PQrequestCancel(PGconn *conn)
-{
-	int			r;
-	PGcancel   *cancel;
-
-	/* Check we have an open connection */
-	if (!conn)
-		return false;
-
-	if (conn->sock == PGINVALID_SOCKET)
-	{
-		strlcpy(conn->errorMessage.data,
-				"PQrequestCancel() -- connection is not open\n",
-				conn->errorMessage.maxlen);
-		conn->errorMessage.len = strlen(conn->errorMessage.data);
-		conn->errorReported = 0;
-
-		return false;
-	}
-
-	cancel = PQgetCancel(conn);
-	if (cancel)
-	{
-		r = PQcancel(cancel, conn->errorMessage.data,
-					 conn->errorMessage.maxlen);
-		PQfreeCancel(cancel);
-	}
-	else
-	{
-		strlcpy(conn->errorMessage.data, "out of memory",
-				conn->errorMessage.maxlen);
-		r = false;
-	}
-
-	if (!r)
-	{
-		conn->errorMessage.len = strlen(conn->errorMessage.data);
-		conn->errorReported = 0;
-	}
-
-	return r;
-}
-
-
 /*
  * pqPacketSend() -- convenience routine to send a message to server.
  *
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index f0143726bbc..48c10b474f5 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -678,12 +678,18 @@ extern void pqDropConnection(PGconn *conn, bool flushInput);
 extern int	pqPacketSend(PGconn *conn, char pack_type,
 						 const void *buf, size_t buf_len);
 extern bool pqGetHomeDirectory(char *buf, int bufsize);
+extern bool pq_parse_int_param(const char *value, int *result, PGconn *conn,
+							   const char *context);
 
 extern pgthreadlock_t pg_g_threadlock;
 
 #define pglock_thread()		pg_g_threadlock(true)
 #define pgunlock_thread()	pg_g_threadlock(false)
 
+#if defined(WIN32) && defined(SIO_KEEPALIVE_VALS)
+extern int	pqSetKeepalivesWin32(pgsocket sock, int idle, int interval);
+#endif
+
 /* === in fe-exec.c === */
 
 extern void pqSetResultError(PGresult *res, PQExpBuffer errorMessage, int offset);
diff --git a/src/interfaces/libpq/meson.build b/src/interfaces/libpq/meson.build
index c76a1e40c83..a47b6f425dd 100644
--- a/src/interfaces/libpq/meson.build
+++ b/src/interfaces/libpq/meson.build
@@ -6,6 +6,7 @@
 libpq_sources = files(
   'fe-auth-scram.c',
   'fe-auth.c',
+  'fe-cancel.c',
   'fe-connect.c',
   'fe-exec.c',
   'fe-lobj.c',

base-commit: a3a836fb5e51183eae624d43225279306c2285b8
-- 
2.34.1

From cb5b87e6e0c9127352013453bc2e944696d0925a Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Thu, 14 Dec 2023 13:39:09 +0100
Subject: [PATCH v28 5/5] Start using new libpq cancel APIs

A previous commit introduced new APIs to libpq for cancelling queries.
This replaces the usage of the old APIs in the codebase with these newer
ones.
---
 contrib/dblink/dblink.c                       |  30 +++--
 contrib/postgres_fdw/connection.c             | 105 +++++++++++++++---
 .../postgres_fdw/expected/postgres_fdw.out    |  15 +++
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   7 ++
 src/fe_utils/connect_utils.c                  |  11 +-
 src/test/isolation/isolationtester.c          |  29 ++---
 6 files changed, 145 insertions(+), 52 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 19a362526d2..81749b2cdd0 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1346,22 +1346,32 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query);
 Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
-	int			res;
 	PGconn	   *conn;
-	PGcancel   *cancel;
-	char		errbuf[256];
+	PGcancelConn *cancelConn;
+	char	   *msg;
 
 	dblink_init();
 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-	cancel = PQgetCancel(conn);
+	cancelConn = PQcancelConn(conn);
 
-	res = PQcancel(cancel, errbuf, 256);
-	PQfreeCancel(cancel);
+	PG_TRY();
+	{
+		if (!PQcancelSend(cancelConn))
+		{
+			msg = pchomp(PQcancelErrorMessage(cancelConn));
+		}
+		else
+		{
+			msg = "OK";
+		}
+	}
+	PG_FINALLY();
+	{
+		PQcancelFinish(cancelConn);
+	}
+	PG_END_TRY();
 
-	if (res == 1)
-		PG_RETURN_TEXT_P(cstring_to_text("OK"));
-	else
-		PG_RETURN_TEXT_P(cstring_to_text(errbuf));
+	PG_RETURN_TEXT_P(cstring_to_text(msg));
 }
 
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4931ebf5915..3ac74ff6a7f 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_cancel_query(PGconn *conn);
-static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
 								   bool consume_input);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@@ -1315,36 +1315,104 @@ pgfdw_cancel_query(PGconn *conn)
 	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
 										  CONNECTION_CLEANUP_TIMEOUT);
 
-	if (!pgfdw_cancel_query_begin(conn))
+	if (!pgfdw_cancel_query_begin(conn, endtime))
 		return false;
 	return pgfdw_cancel_query_end(conn, endtime, false);
 }
 
 static bool
-pgfdw_cancel_query_begin(PGconn *conn)
+pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
 {
-	PGcancel   *cancel;
-	char		errbuf[256];
+	bool		timed_out = false;
+	bool		failed = false;
+	PGcancelConn *cancel_conn = PQcancelConn(conn);
 
-	/*
-	 * Issue cancel request.  Unfortunately, there's no good way to limit the
-	 * amount of time that we might block inside PQgetCancel().
-	 */
-	if ((cancel = PQgetCancel(conn)))
+
+	if (PQcancelStatus(cancel_conn) == CONNECTION_BAD)
 	{
-		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+		PG_TRY();
 		{
 			ereport(WARNING,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
 					 errmsg("could not send cancel request: %s",
-							errbuf)));
-			PQfreeCancel(cancel);
-			return false;
+							pchomp(PQcancelErrorMessage(cancel_conn)))));
 		}
-		PQfreeCancel(cancel);
+		PG_FINALLY();
+		{
+			PQcancelFinish(cancel_conn);
+		}
+		PG_END_TRY();
+		return false;
 	}
 
-	return true;
+	/* In what follows, do not leak any PGcancelConn on an error. */
+	PG_TRY();
+	{
+		while (true)
+		{
+			TimestampTz now = GetCurrentTimestamp();
+			long		cur_timeout;
+			PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn);
+			int			waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+			if (pollres == PGRES_POLLING_OK)
+			{
+				break;
+			}
+
+			/* If timeout has expired, give up, else get sleep time. */
+			cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+			if (cur_timeout <= 0)
+			{
+				timed_out = true;
+				failed = true;
+				goto exit;
+			}
+
+			switch (pollres)
+			{
+				case PGRES_POLLING_READING:
+					waitEvents |= WL_SOCKET_READABLE;
+					break;
+				case PGRES_POLLING_WRITING:
+					waitEvents |= WL_SOCKET_WRITEABLE;
+					break;
+				default:
+					failed = true;
+					goto exit;
+			}
+
+			/* Sleep until there's something to do */
+			WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
+							  cur_timeout, PG_WAIT_EXTENSION);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+exit:	;
+		if (failed)
+		{
+			if (timed_out)
+			{
+				ereport(WARNING,
+						(errmsg("could not cancel request due to timeout")));
+			}
+			else
+			{
+				ereport(WARNING,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("could not send cancel request: %s",
+								pchomp(PQcancelErrorMessage(cancel_conn)))));
+			}
+		}
+	}
+	PG_FINALLY();
+	{
+		PQcancelFinish(cancel_conn);
+	}
+	PG_END_TRY();
+
+	return !failed;
 }
 
 static bool
@@ -1685,7 +1753,10 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
 	 */
 	if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
 	{
-		if (!pgfdw_cancel_query_begin(entry->conn))
+		TimestampTz endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+														  CONNECTION_CLEANUP_TIMEOUT);
+
+		if (!pgfdw_cancel_query_begin(entry->conn, endtime))
 			return false;		/* Unable to cancel running query */
 		*cancel_requested = lappend(*cancel_requested, entry);
 	}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index b5a38aeb214..16206a23a9d 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2698,6 +2698,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 (10 rows)
 
 ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+                                                                             QUERY PLAN                                                                              
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Foreign Scan
+   Output: (count(*))
+   Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
+   Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
+(4 rows)
+
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+ERROR:  canceling statement due to statement timeout
+RESET statement_timeout;
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index f410c3db4e6..01a98750611 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -717,6 +717,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
 ALTER VIEW v4 OWNER TO regress_view_owner;
 
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+RESET statement_timeout;
+
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/src/fe_utils/connect_utils.c b/src/fe_utils/connect_utils.c
index 808d54461fd..c5cd2f57875 100644
--- a/src/fe_utils/connect_utils.c
+++ b/src/fe_utils/connect_utils.c
@@ -157,19 +157,14 @@ connectMaintenanceDatabase(ConnParams *cparams,
 void
 disconnectDatabase(PGconn *conn)
 {
-	char		errbuf[256];
-
 	Assert(conn != NULL);
 
 	if (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
 	{
-		PGcancel   *cancel;
+		PGcancelConn *cancelConn = PQcancelConn(conn);
 
-		if ((cancel = PQgetCancel(conn)))
-		{
-			(void) PQcancel(cancel, errbuf, sizeof(errbuf));
-			PQfreeCancel(cancel);
-		}
+		(void) PQcancelSend(cancelConn);
+		PQcancelFinish(cancelConn);
 	}
 
 	PQfinish(conn);
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index 0a66235153a..de31a875716 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -946,26 +946,21 @@ try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags)
 			 */
 			if (td > max_step_wait && !canceled)
 			{
-				PGcancel   *cancel = PQgetCancel(conn);
+				PGcancelConn *cancel_conn = PQcancelConn(conn);
 
-				if (cancel != NULL)
+				if (PQcancelSend(cancel_conn))
 				{
-					char		buf[256];
-
-					if (PQcancel(cancel, buf, sizeof(buf)))
-					{
-						/*
-						 * print to stdout not stderr, as this should appear
-						 * in the test case's results
-						 */
-						printf("isolationtester: canceling step %s after %d seconds\n",
-							   step->name, (int) (td / USECS_PER_SEC));
-						canceled = true;
-					}
-					else
-						fprintf(stderr, "PQcancel failed: %s\n", buf);
-					PQfreeCancel(cancel);
+					/*
+					 * print to stdout not stderr, as this should appear in
+					 * the test case's results
+					 */
+					printf("isolationtester: canceling step %s after %d seconds\n",
+						   step->name, (int) (td / USECS_PER_SEC));
+					canceled = true;
 				}
+				else
+					fprintf(stderr, "PQcancel failed: %s\n", PQcancelErrorMessage(cancel_conn));
+				PQcancelFinish(cancel_conn);
 			}
 
 			/*
-- 
2.34.1

From 819ecc80382b93ffa0a9757119c396e4bf667908 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Fri, 26 Jan 2024 17:01:00 +0100
Subject: [PATCH v28 4/5] Add non-blocking version of PQcancel

This patch makes the following changes in libpq:
1. Add a new PQcancelSend function, which sends cancellation requests
   using the regular connection establishment code. This makes sure
   that cancel requests support and use all connection options
   including encryption.
2. Add a new PQcancelConn function which allows sending cancellation in
   a non-blocking way by using it together with the newly added
   PQcancelPoll and PQcancelSocket.

The existing PQcancel API is using blocking IO. This makes PQcancel
impossible to use in an event loop based codebase, without blocking the
event loop until the call returns. PQcancelConn can now be used instead,
to have a non-blocking way of sending cancel requests.

This patch also includes a test for all of libpq cancellation APIs. The
test can be easily run like this:

    cd src/test/modules/libpq_pipeline
    make && ./libpq_pipeline cancel
---
 doc/src/sgml/libpq.sgml                       | 280 +++++++++++++++--
 src/interfaces/libpq/exports.txt              |   8 +
 src/interfaces/libpq/fe-cancel.c              | 284 ++++++++++++++++++
 src/interfaces/libpq/fe-connect.c             | 130 +++++++-
 src/interfaces/libpq/libpq-fe.h               |  27 +-
 src/interfaces/libpq/libpq-int.h              |  10 +
 .../modules/libpq_pipeline/libpq_pipeline.c   | 263 +++++++++++++++-
 7 files changed, 964 insertions(+), 38 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index d0d5aefadc0..9808e678650 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -265,7 +265,7 @@ PGconn *PQsetdb(char *pghost,
     <varlistentry id="libpq-PQconnectStartParams">
      <term><function>PQconnectStartParams</function><indexterm><primary>PQconnectStartParams</primary></indexterm></term>
      <term><function>PQconnectStart</function><indexterm><primary>PQconnectStart</primary></indexterm></term>
-     <term><function>PQconnectPoll</function><indexterm><primary>PQconnectPoll</primary></indexterm></term>
+     <term id="libpq-PQconnectPoll"><function>PQconnectPoll</function><indexterm><primary>PQconnectPoll</primary></indexterm></term>
      <listitem>
       <para>
        <indexterm><primary>nonblocking connection</primary></indexterm>
@@ -5281,7 +5281,7 @@ int PQisBusy(PGconn *conn);
    <xref linkend="libpq-PQsendQuery"/>/<xref linkend="libpq-PQgetResult"/>
    can also attempt to cancel a command that is still being processed
    by the server; see <xref linkend="libpq-cancel"/>.  But regardless of
-   the return value of <xref linkend="libpq-PQcancel"/>, the application
+   the return value of <xref linkend="libpq-PQcancelSend"/>, the application
    must continue with the normal result-reading sequence using
    <xref linkend="libpq-PQgetResult"/>.  A successful cancellation will
    simply cause the command to terminate sooner than it would have
@@ -6034,13 +6034,223 @@ int PQsetSingleRowMode(PGconn *conn);
    this section.
 
    <variablelist>
+    <varlistentry id="libpq-PQcancelConn">
+     <term><function>PQcancelConn</function><indexterm><primary>PQcancelConn</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Prepares a connection over which a cancel request can be sent.
+<synopsis>
+PGcancelConn *PQcancelConn(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       <xref linkend="libpq-PQcancelConn"/> creates a
+       <structname>PGcancelConn</structname><indexterm><primary>PGcancelConn</primary></indexterm>
+       object, but it won't instantly start sending a cancel request over this
+       connection. A cancel request can be sent over this connection in a
+       blocking manner using <xref linkend="libpq-PQcancelSend"/> and in a
+       non-blocking manner using <xref linkend="libpq-PQcancelPoll"/>.
+       The return value should can be passed to <xref linkend="libpq-PQcancelStatus"/>,
+       to check if the <structname>PGcancelConn</structname> object was
+       created successfully. The <structname>PGcancelConn</structname> object
+       is an opaque structure that is not meant to be accessed directly by the
+       application. This <structname>PGcancelConn</structname> object can be
+       used to cancel the query that's running on the original connection in a
+       thread-safe way.
+      </para>
+
+      <para>
+       If the original connection is encrypted (using TLS or GSS), then the
+       connection for the cancel request is encrypted in the same way. Any
+       connection options that are only used during authentication or after
+       authentication of the client are ignored though, because cancellation
+       requests do not require authentication and the connection is closed right
+       after the cancellation request is submitted.
+      </para>
+
+      <para>
+       Note that when <function>PQcancelConn</function> returns a non-null
+       pointer, you must call <xref linkend="libpq-PQcancelFinish"/> when you
+       are finished with it, in order to dispose of the structure and any
+       associated memory blocks. This must be done even if the cancel request
+       failed or was abandoned.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelSend">
+     <term><function>PQcancelSend</function><indexterm><primary>PQcancelSend</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Requests that the server abandons processing of the current command in a blocking manner.
+<synopsis>
+int PQcancelSend(PGcancelConn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       The request is made over the given <structname>PGcancelConn</structname>,
+       which needs to be created with <xref linkend="libpq-PQcancelConn"/>
+       The return value of <xref linkend="libpq-PQcancelSend"/>
+       is 1 if the cancel request was successfully
+       dispatched and 0 if not. If it was unsuccessful, the error message can be
+       retrieved using <xref linkend="libpq-PQcancelErrorMessage"/>.
+      </para>
+
+      <para>
+       Successful dispatch of the cancellation is no guarantee that the request
+       will have any effect, however. If the cancellation is effective, the
+       command being canceled will terminate early and return an error result.
+       If the cancellation fails (say, because the server was already done
+       processing the command), then there will be no visible result at all.
+      </para>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelStatus">
+     <term><function>PQcancelStatus</function><indexterm><primary>PQcancelStatus</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of <xref linkend="libpq-PQstatus"/> that can be used for
+       cancellation connections.
+<synopsis>
+ConnStatusType PQcancelStatus(const PGcancelConn *conn);
+</synopsis>
+      </para>
+      <para>
+       In addition to all the statuses that a <structname>PGconn</structname>
+       can have, this connection can have one additional status:
+
+       <variablelist>
+        <varlistentry id="libpq-connection-starting">
+         <term><symbol>CONNECTION_STARTING</symbol></term>
+         <listitem>
+          <para>
+           Waiting for the first call to <xref linkend="libpq-PQcancelPoll"/>,
+           to actually open the socket. This is the connection state right after
+           calling <xref linkend="libpq-PQcancelConn"/>. No connection to the
+           server has been initiated yet at this point. To actually start
+           sending the cancel request use <xref linkend="libpq-PQcancelPoll"/>.
+          </para>
+         </listitem>
+        </varlistentry>
+       </variablelist>
+      </para>
+
+      <para>
+       One final note about the returned statuses is that
+       <symbol>CONNECTION_OK</symbol> has a slightly different meaning for a
+       <structname>PGcancelConn</structname> than what it has for a
+       <structname>PGconn</structname>. When <xref linkend="libpq-PQcancelStatus"/>
+       returns <symbol>CONNECTION_OK</symbol> for a <structname>PGcancelConn</structname>
+       it means that that the dispatch of the cancel request has completed (although
+       this is no promise that the query was actually canceled) and that the
+       connection is now closed. While a <symbol>CONNECTION_OK</symbol> result
+       for <structname>PGconn</structname> means that queries can be sent over
+       the connection.
+      </para>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelSocket">
+     <term><function>PQcancelSocket</function><indexterm><primary>PQcancelSocket</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of <xref linkend="libpq-PQsocket"/> that can be used for
+       cancellation connections.
+<synopsis>
+int PQcancelSocket(PGcancelConn *conn);
+</synopsis>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelPoll">
+     <term><function>PQcancelPoll</function><indexterm><primary>PQcancelPoll</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of <xref linkend="libpq-PQconnectPoll"/> that can be used for
+       cancellation connections.
+<synopsis>
+PostgresPollingStatusType PQcancelPoll(PGcancelConn *conn);
+</synopsis>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelErrorMessage">
+     <term><function>PQcancelErrorMessage</function><indexterm><primary>PQcancelErrorMessage</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of <xref linkend="libpq-PQerrorMessage"/> that can be used for
+       cancellation connections.
+<synopsis>
+char *PQcancelErrorMessage(const PGcancelConn *conn);
+</synopsis>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelFinish">
+     <term><function>PQcancelFinish</function><indexterm><primary>PQcancelFinish</primary></indexterm></term>
+     <listitem>
+      <para>
+       Closes the cancel connection (if it did not finish sending the cancel
+       request yet). Also frees memory used by the <structname>PGcancelConn</structname>
+       object.
+<synopsis>
+void PQcancelFinish(PGcancelConn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       Note that even if the cancel attempt fails (as
+       indicated by <xref linkend="libpq-PQcancelStatus"/>), the application should call <xref linkend="libpq-PQcancelFinish"/>
+       to free the memory used by the <structname>PGcancelConn</structname> object.
+       The <structname>PGcancelConn</structname> pointer must not be used again after
+       <xref linkend="libpq-PQcancelFinish"/> has been called.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelReset">
+     <term><function>PQcancelReset</function><indexterm><primary>PQcancelReset</primary></indexterm></term>
+     <listitem>
+      <para>
+       Resets the <symbol>PGcancelConn</symbol> so it can be reused for a new
+       cancel connection.
+<synopsis>
+void PQcancelReset(PGcancelConn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       If the <symbol>PGcancelConn</symbol> is currently used to send a cancel
+       request, then this connection is closed. It will then prepare the
+       <symbol>PGcancelConn</symbol> object such that it can be used to send a
+       new cancel request. This can be used to create one <symbol>PGcancelConn</symbol>
+       for a <symbol>PGconn</symbol> and reuse that multiple times throughout
+       the lifetime of the original <symbol>PGconn</symbol>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry id="libpq-PQgetCancel">
      <term><function>PQgetCancel</function><indexterm><primary>PQgetCancel</primary></indexterm></term>
 
      <listitem>
       <para>
        Creates a data structure containing the information needed to cancel
-       a command issued through a particular database connection.
+       a command using <xref linkend="libpq-PQcancel"/>.
 <synopsis>
 PGcancel *PQgetCancel(PGconn *conn);
 </synopsis>
@@ -6082,14 +6292,28 @@ void PQfreeCancel(PGcancel *cancel);
 
      <listitem>
       <para>
-       Requests that the server abandon processing of the current command.
+       An insecure version of <xref linkend="libpq-PQcancelSend"/>, but one
+       that can be used safely from within a signal handler.
 <synopsis>
 int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
 </synopsis>
       </para>
 
       <para>
-       The return value is 1 if the cancel request was successfully
+       <xref linkend="libpq-PQcancel"/> should only be used if it's necessary
+       to cancel a query from a signal-handler. If signal-safety is not needed,
+       <xref linkend="libpq-PQcancelSend"/> should be used to cancel the query
+       instead. <xref linkend="libpq-PQcancel"/> can be safely invoked from a
+       signal handler, if the <parameter>errbuf</parameter> is a local variable
+       in the signal handler.  The <structname>PGcancel</structname> object is
+       read-only as far as <xref linkend="libpq-PQcancel"/> is concerned, so it
+       can also be invoked from a thread that is separate from the one
+       manipulating the <structname>PGconn</structname> object.
+      </para>
+
+      <para>
+       The return value of <xref linkend="libpq-PQcancel"/>
+       is 1 if the cancel request was successfully
        dispatched and 0 if not.  If not, <parameter>errbuf</parameter> is filled
        with an explanatory error message.  <parameter>errbuf</parameter>
        must be a char array of size <parameter>errbufsize</parameter> (the
@@ -6097,21 +6321,22 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
       </para>
 
       <para>
-       Successful dispatch is no guarantee that the request will have
-       any effect, however.  If the cancellation is effective, the current
-       command will terminate early and return an error result.  If the
-       cancellation fails (say, because the server was already done
-       processing the command), then there will be no visible result at
-       all.
-      </para>
-
-      <para>
-       <xref linkend="libpq-PQcancel"/> can safely be invoked from a signal
-       handler, if the <parameter>errbuf</parameter> is a local variable in the
-       signal handler.  The <structname>PGcancel</structname> object is read-only
-       as far as <xref linkend="libpq-PQcancel"/> is concerned, so it can
-       also be invoked from a thread that is separate from the one
-       manipulating the <structname>PGconn</structname> object.
+       To achieve signal-safety, some concessions needed to be made in the
+       implementation of <xref linkend="libpq-PQcancel"/>. Not all connection
+       options of the original connection are used when establishing a
+       connection for the cancellation request. This function connects to
+       postgres on the same address and port as the original connection. The
+       only connection options that are honored during this connection are
+       <varname>keepalives</varname>,
+       <varname>keepalives_idle</varname>,
+       <varname>keepalives_interval</varname>,
+       <varname>keepalives_count</varname>, and
+       <varname>tcp_user_timeout</varname>.
+       So, for example
+       <varname>connect_timeout</varname>,
+       <varname>gssencmode</varname>, and
+       <varname>sslmode</varname> are ignored. <emphasis>This means the connection
+       for the cancel request is never encrypted using TLS or GSS</emphasis>.
       </para>
      </listitem>
     </varlistentry>
@@ -6123,13 +6348,22 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
 
      <listitem>
       <para>
-       <xref linkend="libpq-PQrequestCancel"/> is a deprecated variant of
-       <xref linkend="libpq-PQcancel"/>.
+       <xref linkend="libpq-PQrequestCancel"/> is a deprecated and insecure
+       variant of <xref linkend="libpq-PQcancelSend"/>.
 <synopsis>
 int PQrequestCancel(PGconn *conn);
 </synopsis>
       </para>
 
+      <para>
+       <xref linkend="libpq-PQrequestCancel"/> only exists because of backwards
+       compatibility reasons. <xref linkend="libpq-PQcancelSend"/> should be
+       used instead, to avoid the security and thread-safety issues that this
+       function has. This function has the same security issues as
+       <xref linkend="libpq-PQcancel"/>, but without the benefit of being
+       signal-safe.
+      </para>
+
       <para>
        Requests that the server abandon processing of the current
        command.  It operates directly on the
@@ -9356,7 +9590,7 @@ int PQisthreadsafe();
    The deprecated functions <xref linkend="libpq-PQrequestCancel"/> and
    <xref linkend="libpq-PQoidStatus"/> are not thread-safe and should not be
    used in multithread programs.  <xref linkend="libpq-PQrequestCancel"/>
-   can be replaced by <xref linkend="libpq-PQcancel"/>.
+   can be replaced by <xref linkend="libpq-PQcancelSend"/>.
    <xref linkend="libpq-PQoidStatus"/> can be replaced by
    <xref linkend="libpq-PQoidValue"/>.
   </para>
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 088592deb16..125bc80679a 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -193,3 +193,11 @@ PQsendClosePrepared       190
 PQsendClosePortal         191
 PQchangePassword          192
 PQsendPipelineSync        193
+PQcancelSend              194
+PQcancelConn              195
+PQcancelPoll              196
+PQcancelStatus            197
+PQcancelSocket            198
+PQcancelErrorMessage      199
+PQcancelReset             200
+PQcancelFinish            201
diff --git a/src/interfaces/libpq/fe-cancel.c b/src/interfaces/libpq/fe-cancel.c
index ce28d39f3f5..e37ee0c45ec 100644
--- a/src/interfaces/libpq/fe-cancel.c
+++ b/src/interfaces/libpq/fe-cancel.c
@@ -21,6 +21,290 @@
 #include "libpq-int.h"
 #include "port/pg_bswap.h"
 
+
+/*
+ *		PQcancelConn
+ *
+ * Asynchronously cancel a query on the given connection. This requires polling
+ * the returned PGcancelConn to actually complete the cancellation of the
+ * query.
+ */
+PGcancelConn *
+PQcancelConn(PGconn *conn)
+{
+	PGconn	   *cancelConn = pqMakeEmptyPGconn();
+	pg_conn_host originalHost;
+
+	if (cancelConn == NULL)
+		return NULL;
+
+	/* Check we have an open connection */
+	if (!conn)
+	{
+		libpq_append_conn_error(cancelConn, "passed connection was NULL");
+		return (PGcancelConn *) cancelConn;
+	}
+
+	if (conn->sock == PGINVALID_SOCKET)
+	{
+		libpq_append_conn_error(cancelConn, "passed connection is not open");
+		return (PGcancelConn *) cancelConn;
+	}
+
+
+	/*
+	 * Indicate that this connection is used to send a cancellation
+	 */
+	cancelConn->cancelRequest = true;
+
+	if (!pqCopyPGconn(conn, cancelConn))
+		return (PGcancelConn *) cancelConn;
+
+	/*
+	 * Compute derived options
+	 */
+	if (!pqConnectOptions2(cancelConn))
+		return (PGcancelConn *) cancelConn;
+
+	/*
+	 * Copy cancellation token data from the original connnection
+	 */
+	cancelConn->be_pid = conn->be_pid;
+	cancelConn->be_key = conn->be_key;
+
+	/*
+	 * Cancel requests should not iterate over all possible hosts. The request
+	 * needs to be sent to the exact host and address that the original
+	 * connection used. So we manually create the host and address arrays with
+	 * a single element after freeing the host array that we generated from
+	 * the connection options.
+	 */
+	pq_release_conn_hosts(cancelConn);
+	cancelConn->nconnhost = 1;
+	cancelConn->naddr = 1;
+
+	cancelConn->connhost = calloc(cancelConn->nconnhost, sizeof(pg_conn_host));
+	if (!cancelConn->connhost)
+		goto oom_error;
+
+	originalHost = conn->connhost[conn->whichhost];
+	if (originalHost.host)
+	{
+		cancelConn->connhost[0].host = strdup(originalHost.host);
+		if (!cancelConn->connhost[0].host)
+			goto oom_error;
+	}
+	if (originalHost.hostaddr)
+	{
+		cancelConn->connhost[0].hostaddr = strdup(originalHost.hostaddr);
+		if (!cancelConn->connhost[0].hostaddr)
+			goto oom_error;
+	}
+	if (originalHost.port)
+	{
+		cancelConn->connhost[0].port = strdup(originalHost.port);
+		if (!cancelConn->connhost[0].port)
+			goto oom_error;
+	}
+	if (originalHost.password)
+	{
+		cancelConn->connhost[0].password = strdup(originalHost.password);
+		if (!cancelConn->connhost[0].password)
+			goto oom_error;
+	}
+
+	cancelConn->addr = calloc(cancelConn->naddr, sizeof(AddrInfo));
+	if (!cancelConn->connhost)
+		goto oom_error;
+
+	cancelConn->addr[0].addr = conn->raddr;
+	cancelConn->addr[0].family = conn->raddr.addr.ss_family;
+
+	cancelConn->status = CONNECTION_STARTING;
+	return (PGcancelConn *) cancelConn;
+
+oom_error:
+	conn->status = CONNECTION_BAD;
+	libpq_append_conn_error(cancelConn, "out of memory");
+	return (PGcancelConn *) cancelConn;
+}
+
+
+/*
+ *		PQcancelSend
+ *
+ * Send a cancellation request in a blocking fashion.
+ * Returns 1 if successful 0 if not.
+ */
+int
+PQcancelSend(PGcancelConn * cancelConn)
+{
+	if (!cancelConn || cancelConn->conn.status == CONNECTION_BAD)
+		return 1;
+
+	if (!pqConnectDBStart(&cancelConn->conn))
+	{
+		cancelConn->conn.status = CONNECTION_BAD;
+		return 1;
+	}
+
+	return pqConnectDBComplete(&cancelConn->conn);
+}
+
+/*
+ *		PQcancelPoll
+ *
+ * Poll a cancel connection. For usage details see PQconnectPoll.
+ */
+PostgresPollingStatusType
+PQcancelPoll(PGcancelConn * cancelConn)
+{
+	PGconn	   *conn = (PGconn *) cancelConn;
+	int			n;
+
+	/*
+	 * Before we can call PQconnectPoll we first need to start the connection
+	 * using pqConnectDBStart. Non-cancel connections already do this whenever
+	 * the connection is initialized. But cancel connections wait until the
+	 * caller starts polling, because there might be a large delay between
+	 * creating a cancel connection and actually wanting to use it.
+	 */
+	if (conn->status == CONNECTION_STARTING)
+	{
+		if (!pqConnectDBStart(&cancelConn->conn))
+		{
+			cancelConn->conn.status = CONNECTION_STARTED;
+			return PGRES_POLLING_WRITING;
+		}
+	}
+
+	/*
+	 * The rest of the connection establishement we leave to PQconnectPoll,
+	 * since it's very similar to normal connection establishment. But once we
+	 * get to the CONNECTION_AWAITING_RESPONSE we need to do our own thing.
+	 */
+	if (conn->status != CONNECTION_AWAITING_RESPONSE)
+	{
+		return PQconnectPoll(conn);
+	}
+
+	/*
+	 * At this point we are waiting on the server to close the connection,
+	 * which is its way of communicating that the cancel has been handled.
+	 */
+
+	n = pqReadData(conn);
+
+	if (n == 0)
+		return PGRES_POLLING_READING;
+
+#ifndef WIN32
+
+	/*
+	 * If we receive an error report it, but only if errno is non-zero.
+	 * Otherwise we assume it's an EOF, which is what we expect from the
+	 * server.
+	 *
+	 * We skip this for Windows, because Windows is a bit special in its EOF
+	 * behaviour for TCP. Sometimes it will error with an ECONNRESET when
+	 * there is a clean connection closure. See these threads for details:
+	 * https://www.postgresql.org/message-id/flat/90b34057-4176-7bb0-0dbb-9822a5f6425b%40greiz-reinsdorf.de
+	 *
+	 * https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BOeoETZQ%3DQw5Ub5h3tmwQhBmDA%3DnuNO3KG%3DzWfUypFAw%40mail.gmail.com
+	 *
+	 * PQcancel ignores such errors and reports success for the cancellation
+	 * anyway, so even if this is not always correct we do the same here.
+	 */
+	if (n < 0 && errno != 0)
+	{
+		conn->status = CONNECTION_BAD;
+		return PGRES_POLLING_FAILED;
+	}
+#endif
+
+	/*
+	 * We don't expect any data, only connection closure. So if we strangly do
+	 * receive some data we consider that an error.
+	 */
+	if (n > 0)
+	{
+
+		libpq_append_conn_error(conn, "received unexpected response from server");
+		conn->status = CONNECTION_BAD;
+		return PGRES_POLLING_FAILED;
+	}
+
+	/*
+	 * Getting here means that we received an EOF. Which is what we were
+	 * expecting. The cancel request has completed.
+	 */
+	cancelConn->conn.status = CONNECTION_OK;
+	resetPQExpBuffer(&conn->errorMessage);
+	return PGRES_POLLING_OK;
+}
+
+/*
+ *		PQcancelStatus
+ *
+ * Get the status of a cancel connection.
+ */
+ConnStatusType
+PQcancelStatus(const PGcancelConn * cancelConn)
+{
+	return PQstatus((const PGconn *) cancelConn);
+}
+
+/*
+ *		PQcancelSocket
+ *
+ * Get the socket of the cancel connection.
+ */
+int
+PQcancelSocket(const PGcancelConn * cancelConn)
+{
+	return PQsocket((const PGconn *) cancelConn);
+}
+
+/*
+ *		PQcancelErrorMessage
+ *
+ * Get the socket of the cancel connection.
+ */
+char *
+PQcancelErrorMessage(const PGcancelConn * cancelConn)
+{
+	return PQerrorMessage((const PGconn *) cancelConn);
+}
+
+/*
+ *		PQcancelReset
+ *
+ * Resets the cancel connection, so it can be reused to send a new cancel
+ * request.
+ */
+void
+PQcancelReset(PGcancelConn * cancelConn)
+{
+	pqClosePGconn((PGconn *) cancelConn);
+	cancelConn->conn.status = CONNECTION_STARTING;
+	cancelConn->conn.whichhost = 0;
+	cancelConn->conn.whichaddr = 0;
+	cancelConn->conn.try_next_host = false;
+	cancelConn->conn.try_next_addr = false;
+}
+
+/*
+ *		PQcancelFinish
+ *
+ * Closes and frees the cancel connection.
+ */
+void
+PQcancelFinish(PGcancelConn * cancelConn)
+{
+	PQfinish((PGconn *) cancelConn);
+}
+
+
 /*
  * PQgetCancel: get a PGcancel structure corresponding to a connection.
  *
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index aeb3adc0e31..2cd95767fdb 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -616,8 +616,17 @@ pqDropServerData(PGconn *conn)
 	conn->write_failed = false;
 	free(conn->write_err_msg);
 	conn->write_err_msg = NULL;
-	conn->be_pid = 0;
-	conn->be_key = 0;
+
+	/*
+	 * Cancel connections should save their be_pid and be_key across
+	 * PQcancelReset invocations. Otherwise they would not have access to the
+	 * secret token of the connection they are supposed to cancel anymore.
+	 */
+	if (!conn->cancelRequest)
+	{
+		conn->be_pid = 0;
+		conn->be_key = 0;
+	}
 }
 
 
@@ -923,6 +932,45 @@ fillPGconn(PGconn *conn, PQconninfoOption *connOptions)
 	return true;
 }
 
+/*
+ * Copy over option values from srcConn to dstConn
+ *
+ * Don't put anything cute here --- intelligence should be in
+ * connectOptions2 ...
+ *
+ * Returns true on success. On failure, returns false and sets error message of
+ * dstConn.
+ */
+bool
+pqCopyPGconn(PGconn *srcConn, PGconn *dstConn)
+{
+	const internalPQconninfoOption *option;
+
+	/* copy over connection options */
+	for (option = PQconninfoOptions; option->keyword; option++)
+	{
+		if (option->connofs >= 0)
+		{
+			const char **tmp = (const char **) ((char *) srcConn + option->connofs);
+
+			if (*tmp)
+			{
+				char	  **dstConnmember = (char **) ((char *) dstConn + option->connofs);
+
+				if (*dstConnmember)
+					free(*dstConnmember);
+				*dstConnmember = strdup(*tmp);
+				if (*dstConnmember == NULL)
+				{
+					libpq_append_conn_error(dstConn, "out of memory");
+					return false;
+				}
+			}
+		}
+	}
+	return true;
+}
+
 /*
  *		connectOptions1
  *
@@ -2354,10 +2402,18 @@ pqConnectDBStart(PGconn *conn)
 	 * Set up to try to connect to the first host.  (Setting whichhost = -1 is
 	 * a bit of a cheat, but PQconnectPoll will advance it to 0 before
 	 * anything else looks at it.)
+	 *
+	 * Cancel requests are special though, they should only try one host and
+	 * address. These fields have already set up in PQcancelConn. So leave
+	 * these fields alone for cancel requests.
 	 */
-	conn->whichhost = -1;
-	conn->try_next_addr = false;
-	conn->try_next_host = true;
+	if (!conn->cancelRequest)
+	{
+		conn->whichhost = -1;
+		conn->try_next_host = true;
+		conn->try_next_addr = false;
+	}
+
 	conn->status = CONNECTION_NEEDED;
 
 	/* Also reset the target_server_type state if needed */
@@ -2499,7 +2555,10 @@ pqConnectDBComplete(PGconn *conn)
 		/*
 		 * Now try to advance the state machine.
 		 */
-		flag = PQconnectPoll(conn);
+		if (conn->cancelRequest)
+			flag = PQcancelPoll((PGcancelConn *) conn);
+		else
+			flag = PQconnectPoll(conn);
 	}
 }
 
@@ -2624,13 +2683,17 @@ keep_going:						/* We will come back to here until there is
 			 * Oops, no more hosts.
 			 *
 			 * If we are trying to connect in "prefer-standby" mode, then drop
-			 * the standby requirement and start over.
+			 * the standby requirement and start over. Don't do this for
+			 * cancel requests though, since we are certain the list of
+			 * servers won't change as the target_server_type option is not
+			 * applicable to those connections.
 			 *
 			 * Otherwise, an appropriate error message is already set up, so
 			 * we just need to set the right status.
 			 */
 			if (conn->target_server_type == SERVER_TYPE_PREFER_STANDBY &&
-				conn->nconnhost > 0)
+				conn->nconnhost > 0 &&
+				!conn->cancelRequest)
 			{
 				conn->target_server_type = SERVER_TYPE_PREFER_STANDBY_PASS2;
 				conn->whichhost = 0;
@@ -3272,6 +3335,29 @@ keep_going:						/* We will come back to here until there is
 				}
 #endif							/* USE_SSL */
 
+				/*
+				 * For cancel requests this is as far as we need to go in the
+				 * connection establishment. Now we can actually send our
+				 * cancellation request.
+				 */
+				if (conn->cancelRequest)
+				{
+					CancelRequestPacket cancelpacket;
+
+					packetlen = sizeof(cancelpacket);
+					cancelpacket.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE);
+					cancelpacket.backendPID = pg_hton32(conn->be_pid);
+					cancelpacket.cancelAuthCode = pg_hton32(conn->be_key);
+					if (pqPacketSend(conn, 0, &cancelpacket, packetlen) != STATUS_OK)
+					{
+						libpq_append_conn_error(conn, "could not send cancel packet: %s",
+												SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf)));
+						goto error_return;
+					}
+					conn->status = CONNECTION_AWAITING_RESPONSE;
+					return PGRES_POLLING_READING;
+				}
+
 				/*
 				 * Build the startup packet.
 				 */
@@ -4021,8 +4107,14 @@ keep_going:						/* We will come back to here until there is
 					}
 				}
 
-				/* We can release the address list now. */
-				release_conn_addrinfo(conn);
+				/*
+				 * For non cancel requests we can release the address list
+				 * now. For cancel requests we never actually resolve
+				 * addresses and instead the addrinfo exists for the lifetime
+				 * of the connection.
+				 */
+				if (!conn->cancelRequest)
+					release_conn_addrinfo(conn);
 
 				/*
 				 * Contents of conn->errorMessage are no longer interesting
@@ -4390,6 +4482,7 @@ freePGconn(PGconn *conn)
 		free(conn->events[i].name);
 	}
 
+	release_conn_addrinfo(conn);
 	pq_release_conn_hosts(conn);
 
 	free(conn->client_encoding_initial);
@@ -4540,6 +4633,15 @@ pq_release_conn_hosts(PGconn *conn)
 static void
 sendTerminateConn(PGconn *conn)
 {
+	/*
+	 * The Postgres cancellation protocol does not have a notion of a
+	 * Terminate message, so don't send one.
+	 */
+	if (conn->cancelRequest)
+	{
+		return;
+	}
+
 	/*
 	 * Note that the protocol doesn't allow us to send Terminate messages
 	 * during the startup phase.
@@ -4593,7 +4695,13 @@ pqClosePGconn(PGconn *conn)
 	conn->pipelineStatus = PQ_PIPELINE_OFF;
 	pqClearAsyncResult(conn);	/* deallocate result */
 	pqClearConnErrorState(conn);
-	release_conn_addrinfo(conn);
+
+	/*
+	 * Since cancel requests never change their addrinfo we don't free it
+	 * here. Otherwise we would have to rebuild it during a PQcancelReset.
+	 */
+	if (!conn->cancelRequest)
+		release_conn_addrinfo(conn);
 
 	/* Reset all state obtained from server, too */
 	pqDropServerData(conn);
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index defc415fa3f..857ba54d943 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -78,7 +78,9 @@ typedef enum
 	CONNECTION_CONSUME,			/* Consuming any extra messages. */
 	CONNECTION_GSS_STARTUP,		/* Negotiating GSSAPI. */
 	CONNECTION_CHECK_TARGET,	/* Checking target server properties. */
-	CONNECTION_CHECK_STANDBY	/* Checking if server is in standby mode. */
+	CONNECTION_CHECK_STANDBY,	/* Checking if server is in standby mode. */
+	CONNECTION_STARTING			/* Waiting for connection attempt to be
+								 * started.  */
 } ConnStatusType;
 
 typedef enum
@@ -165,6 +167,11 @@ typedef enum
  */
 typedef struct pg_conn PGconn;
 
+/* PGcancelConn encapsulates a cancel connection to the backend.
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_cancel_conn PGcancelConn;
+
 /* PGresult encapsulates the result of a query (or more precisely, of a single
  * SQL command --- a query string given to PQsendQuery can contain multiple
  * commands and thus return multiple PGresult objects).
@@ -321,16 +328,30 @@ extern PostgresPollingStatusType PQresetPoll(PGconn *conn);
 /* Synchronous (blocking) */
 extern void PQreset(PGconn *conn);
 
+/* Create a PGcancelConn that's used to cancel a query on the given PGconn */
+extern PGcancelConn * PQcancelConn(PGconn *conn);
+/* issue a blocking cancel request */
+extern int	PQcancelSend(PGcancelConn * conn);
+
+/* issue or poll a non-blocking cancel request */
+extern PostgresPollingStatusType PQcancelPoll(PGcancelConn * cancelConn);
+extern ConnStatusType PQcancelStatus(const PGcancelConn * cancelConn);
+extern int	PQcancelSocket(const PGcancelConn * cancelConn);
+extern char *PQcancelErrorMessage(const PGcancelConn * cancelConn);
+extern void PQcancelReset(PGcancelConn * cancelConn);
+extern void PQcancelFinish(PGcancelConn * cancelConn);
+
+
 /* request a cancel structure */
 extern PGcancel *PQgetCancel(PGconn *conn);
 
 /* free a cancel structure */
 extern void PQfreeCancel(PGcancel *cancel);
 
-/* issue a cancel request */
+/* a less secure version of PQcancelSend, but one which is signal-safe */
 extern int	PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
 
-/* backwards compatible version of PQcancel; not thread-safe */
+/* deprecated version of PQcancel; not thread-safe */
 extern int	PQrequestCancel(PGconn *conn);
 
 /* Accessor functions for PGconn objects */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index c1ff12dd396..e780b62b2bd 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -409,6 +409,10 @@ struct pg_conn
 	char	   *require_auth;	/* name of the expected auth method */
 	char	   *load_balance_hosts; /* load balance over hosts */
 
+	bool		cancelRequest;	/* true if this connection is used to send a
+								 * cancel request, instead of being a normal
+								 * connection that's used for queries */
+
 	/* Optional file to write trace info to */
 	FILE	   *Pfdebug;
 	int			traceFlags;
@@ -621,6 +625,11 @@ struct pg_conn
 	PQExpBufferData workBuffer; /* expansible string */
 };
 
+struct pg_cancel_conn
+{
+	PGconn		conn;
+};
+
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
  * data is required to safely cancel a connection running on a different
  * thread.
@@ -678,6 +687,7 @@ extern void pqDropConnection(PGconn *conn, bool flushInput);
 extern int	pqPacketSend(PGconn *conn, char pack_type,
 						 const void *buf, size_t buf_len);
 extern bool pqGetHomeDirectory(char *buf, int bufsize);
+extern bool pqCopyPGconn(PGconn *srcConn, PGconn *dstConn);
 extern bool pq_parse_int_param(const char *value, int *result, PGconn *conn,
 							   const char *context);
 extern void pq_release_conn_hosts(PGconn *conn);
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index 5f43aa40de4..580003002e4 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -86,6 +86,264 @@ pg_fatal_impl(int line, const char *fmt,...)
 	exit(1);
 }
 
+/*
+ * Check that the query on the given connection got canceled.
+ *
+ * This is a function wrapped in a macro to make the reported line number
+ * in an error match the line number of the invocation.
+ */
+#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
+static void
+confirm_query_canceled_impl(int line, PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal_impl(line, "PQgetResult returned null: %s",
+					  PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal_impl(line, "query did not fail when it was expected");
+	if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
+		pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
+					  PQerrorMessage(conn));
+	PQclear(res);
+	while (PQisBusy(conn))
+	{
+		PQconsumeInput(conn);
+	}
+}
+
+#define send_cancellable_query(conn, monitorConn) send_cancellable_query_impl(__LINE__, conn, monitorConn)
+static void
+send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
+{
+	const char *env_wait;
+	const Oid	paramTypes[1] = {INT4OID};
+
+	env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
+	if (env_wait == NULL)
+		env_wait = "180";
+
+	if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes, &env_wait, NULL, NULL, 0) != 1)
+		pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
+
+	/*
+	 * Wait until the query is actually running. Otherwise sending a
+	 * cancellation request might not cancel the query due to race conditions.
+	 */
+	while (true)
+	{
+		char	   *value = NULL;
+		PGresult   *res = PQexec(
+								 monitorConn,
+								 "SELECT count(*) FROM pg_stat_activity WHERE "
+								 "query = 'SELECT pg_sleep($1)' "
+								 "AND state = 'active'");
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_fatal("Connection to database failed: %s", PQerrorMessage(monitorConn));
+		}
+		if (PQntuples(res) != 1)
+		{
+			pg_fatal("unexpected number of rows received: %d", PQntuples(res));
+		}
+		if (PQnfields(res) != 1)
+		{
+			pg_fatal("unexpected number of columns received: %d", PQnfields(res));
+		}
+		value = PQgetvalue(res, 0, 0);
+		if (*value != '0')
+		{
+			PQclear(res);
+			break;
+		}
+		PQclear(res);
+
+		/*
+		 * wait 10ms before polling again
+		 */
+		pg_usleep(10000);
+	}
+}
+
+static void
+test_cancel(PGconn *conn, const char *conninfo)
+{
+	PGcancel   *cancel = NULL;
+	PGcancelConn *cancelConn = NULL;
+	PGconn	   *monitorConn = NULL;
+	char		errorbuf[256];
+
+	fprintf(stderr, "test cancellations... ");
+
+	if (PQsetnonblocking(conn, 1) != 0)
+		pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+	/*
+	 * Make a connection to the database to monitor the query on the main
+	 * connection.
+	 */
+	monitorConn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		pg_fatal("Connection to database failed: %s",
+				 PQerrorMessage(conn));
+	}
+
+	/* test PQcancel */
+	send_cancellable_query(conn, monitorConn);
+	cancel = PQgetCancel(conn);
+	if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+	{
+		pg_fatal("failed to run PQcancel: %s", errorbuf);
+	};
+	confirm_query_canceled(conn);
+
+	/* PGcancel object can be reused for the next query */
+	send_cancellable_query(conn, monitorConn);
+	if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+	{
+		pg_fatal("failed to run PQcancel: %s", errorbuf);
+	};
+	confirm_query_canceled(conn);
+
+	PQfreeCancel(cancel);
+
+	/* test PQrequestCancel */
+	send_cancellable_query(conn, monitorConn);
+	if (!PQrequestCancel(conn))
+		pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
+	confirm_query_canceled(conn);
+
+	/* test PQcancelSend */
+	send_cancellable_query(conn, monitorConn);
+	cancelConn = PQcancelConn(conn);
+	if (!PQcancelSend(cancelConn))
+		pg_fatal("failed to run PQcancelSend: %s", PQcancelErrorMessage(cancelConn));
+	confirm_query_canceled(conn);
+	PQcancelFinish(cancelConn);
+
+	/* test PQcancelConn and then polling with PQcancelPoll */
+	send_cancellable_query(conn, monitorConn);
+	cancelConn = PQcancelConn(conn);
+	if (PQcancelStatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
+	while (true)
+	{
+		struct timeval tv;
+		fd_set		input_mask;
+		fd_set		output_mask;
+		PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
+		int			sock = PQcancelSocket(cancelConn);
+
+		if (pollres == PGRES_POLLING_OK)
+		{
+			break;
+		}
+
+		FD_ZERO(&input_mask);
+		FD_ZERO(&output_mask);
+		switch (pollres)
+		{
+			case PGRES_POLLING_READING:
+				pg_debug("polling for reads\n");
+				FD_SET(sock, &input_mask);
+				break;
+			case PGRES_POLLING_WRITING:
+				pg_debug("polling for writes\n");
+				FD_SET(sock, &output_mask);
+				break;
+			default:
+				pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
+		}
+
+		if (sock < 0)
+			pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
+
+		tv.tv_sec = 3;
+		tv.tv_usec = 0;
+
+		while (true)
+		{
+			if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
+			{
+				if (errno == EINTR)
+					continue;
+				pg_fatal("select() failed: %m");
+			}
+			break;
+		}
+	}
+	if (PQcancelStatus(cancelConn) != CONNECTION_OK)
+		pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
+	confirm_query_canceled(conn);
+
+	/*
+	 * test PQcancelReset works on the cancel connection and it can be reused
+	 * after
+	 */
+	PQcancelReset(cancelConn);
+
+	send_cancellable_query(conn, monitorConn);
+	if (PQcancelStatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
+	while (true)
+	{
+		struct timeval tv;
+		fd_set		input_mask;
+		fd_set		output_mask;
+		PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
+		int			sock = PQcancelSocket(cancelConn);
+
+		if (pollres == PGRES_POLLING_OK)
+		{
+			break;
+		}
+
+		FD_ZERO(&input_mask);
+		FD_ZERO(&output_mask);
+		switch (pollres)
+		{
+			case PGRES_POLLING_READING:
+				pg_debug("polling for reads\n");
+				FD_SET(sock, &input_mask);
+				break;
+			case PGRES_POLLING_WRITING:
+				pg_debug("polling for writes\n");
+				FD_SET(sock, &output_mask);
+				break;
+			default:
+				pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
+		}
+
+		if (sock < 0)
+			pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
+
+		tv.tv_sec = 3;
+		tv.tv_usec = 0;
+
+		while (true)
+		{
+			if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
+			{
+				if (errno == EINTR)
+					continue;
+				pg_fatal("select() failed: %m");
+			}
+			break;
+		}
+	}
+	if (PQcancelStatus(cancelConn) != CONNECTION_OK)
+		pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
+	confirm_query_canceled(conn);
+
+	PQcancelFinish(cancelConn);
+
+	fprintf(stderr, "ok\n");
+}
+
 static void
 test_disallowed_in_pipeline(PGconn *conn)
 {
@@ -1789,6 +2047,7 @@ usage(const char *progname)
 static void
 print_test_list(void)
 {
+	printf("cancel\n");
 	printf("disallowed_in_pipeline\n");
 	printf("multi_pipelines\n");
 	printf("nosync\n");
@@ -1890,7 +2149,9 @@ main(int argc, char **argv)
 						PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
 	}
 
-	if (strcmp(testname, "disallowed_in_pipeline") == 0)
+	if (strcmp(testname, "cancel") == 0)
+		test_cancel(conn, conninfo);
+	else if (strcmp(testname, "disallowed_in_pipeline") == 0)
 		test_disallowed_in_pipeline(conn);
 	else if (strcmp(testname, "multi_pipelines") == 0)
 		test_multi_pipelines(conn);
-- 
2.34.1

Reply via email to