From d8d581a0033e0365faf96a39e8ce75a6ec9ebf7d Mon Sep 17 00:00:00 2001
From: Jelte Fennema <jelte.fennema@microsoft.com>
Date: Wed, 12 Jan 2022 09:52:05 +0100
Subject: [PATCH] Add non-blocking version of PQcancel

This patch makes the following changes in libpq:
1. Add a new PQcancelSend function, which sends cancellation requests
   using the regular connection establishment code. This makes sure
   that cancel requests support and use all connection options
   including encryption.
2. Add a new PQcancelConn function which allows sending cancellation in
   a non-blocking way by using it together with the newly added
   PQcancelPoll and PQcancelSocket.
3. Use these two new cancellation APIs everywhere in the codebase where
   signal-safety is not a necessity.

The existing PQcancel API is using blocking IO. This makes PQcancel
impossible to use in an event loop based codebase, without blocking the
event loop until the call returns. PQcancelConn can now be used instead,
to have a non-blocking way of sending cancel requests. The postgres_fdw
cancellation code has been modified to make use of this.

This patch also includes a test for all of libpq cancellation APIs. The
test can be easily run like this:

    cd src/test/modules/libpq_pipeline
    make && ./libpq_pipeline cancel
---
 contrib/dblink/dblink.c                       |  22 +-
 contrib/postgres_fdw/connection.c             |  93 ++++-
 .../postgres_fdw/expected/postgres_fdw.out    |  15 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   8 +
 doc/src/sgml/libpq.sgml                       | 279 +++++++++++--
 src/fe_utils/connect_utils.c                  |  10 +-
 src/interfaces/libpq/exports.txt              |   8 +
 src/interfaces/libpq/fe-connect.c             | 375 ++++++++++++++++--
 src/interfaces/libpq/fe-misc.c                |  15 +-
 src/interfaces/libpq/fe-secure-openssl.c      |   2 +-
 src/interfaces/libpq/fe-secure.c              |   6 +
 src/interfaces/libpq/libpq-fe.h               |  25 +-
 src/interfaces/libpq/libpq-int.h              |   9 +
 src/test/isolation/isolationtester.c          |  29 +-
 .../modules/libpq_pipeline/libpq_pipeline.c   | 263 +++++++++++-
 15 files changed, 1050 insertions(+), 109 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 9eef417c47..2a55c6759a 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1378,22 +1378,24 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query);
 Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
-	int			res;
 	PGconn	   *conn;
-	PGcancel   *cancel;
-	char		errbuf[256];
+	PGcancelConn	*cancelConn;
+	char	   *msg;
 
 	dblink_init();
 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-	cancel = PQgetCancel(conn);
-
-	res = PQcancel(cancel, errbuf, 256);
-	PQfreeCancel(cancel);
+	cancelConn = PQcancelSend(conn);
 
-	if (res == 1)
-		PG_RETURN_TEXT_P(cstring_to_text("OK"));
+	if (PQcancelStatus(cancelConn) == CONNECTION_BAD)
+	{
+		msg = pchomp(PQcancelErrorMessage(cancelConn));
+	}
 	else
-		PG_RETURN_TEXT_P(cstring_to_text(errbuf));
+	{
+		msg = "OK";
+	}
+	PQcancelFinish(cancelConn);
+	PG_RETURN_TEXT_P(cstring_to_text(msg));
 }
 
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 939d114f02..9622441da7 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -1264,35 +1264,98 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
 static bool
 pgfdw_cancel_query(PGconn *conn)
 {
-	PGcancel   *cancel;
-	char		errbuf[256];
 	PGresult   *result = NULL;
-	TimestampTz endtime;
-	bool		timed_out;
 
 	/*
 	 * If it takes too long to cancel the query and discard the result, assume
 	 * the connection is dead.
 	 */
-	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+	TimestampTz endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
+	bool		timed_out = false;
+	bool		failed = false;
+	PGcancelConn	*cancel_conn = PQcancelConn(conn);
 
-	/*
-	 * Issue cancel request.  Unfortunately, there's no good way to limit the
-	 * amount of time that we might block inside PQgetCancel().
-	 */
-	if ((cancel = PQgetCancel(conn)))
+
+	if (PQcancelStatus(cancel_conn) == CONNECTION_BAD)
+	{
+		ereport(WARNING,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not send cancel request: %s",
+						pchomp(PQcancelErrorMessage(cancel_conn)))));
+		return false;
+	}
+
+	/* In what follows, do not leak any PGcancelConn on an error. */
+	PG_TRY();
+	{
+		while (true)
+		{
+			TimestampTz now = GetCurrentTimestamp();
+			long		cur_timeout;
+			PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn);
+			int			waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+			if (pollres == PGRES_POLLING_OK)
+			{
+				break;
+			}
+
+			/* If timeout has expired, give up, else get sleep time. */
+			cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+			if (cur_timeout <= 0)
+			{
+				timed_out = true;
+				failed = true;
+				goto exit;
+			}
+
+			switch (pollres)
+			{
+				case PGRES_POLLING_READING:
+					waitEvents |= WL_SOCKET_READABLE;
+					break;
+				case PGRES_POLLING_WRITING:
+					waitEvents |= WL_SOCKET_WRITEABLE;
+					break;
+				default:
+					failed = true;
+					goto exit;
+			}
+
+			/* Sleep until there's something to do */
+			WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
+							  cur_timeout, PG_WAIT_EXTENSION);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+exit:	;
+	}
+	PG_CATCH();
 	{
-		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+		PQcancelFinish(cancel_conn);
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	if (failed)
+	{
+		if (timed_out)
+		{
+			ereport(WARNING,
+					(errmsg("could not cancel request due to timeout")));
+		}
+		else
 		{
 			ereport(WARNING,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
 					 errmsg("could not send cancel request: %s",
-							errbuf)));
-			PQfreeCancel(cancel);
-			return false;
+							pchomp(PQcancelErrorMessage(cancel_conn)))));
 		}
-		PQfreeCancel(cancel);
+		PQcancelFinish(cancel_conn);
+		return failed;
 	}
+	PQcancelFinish(cancel_conn);
 
 	/* Get and discard the result of the query. */
 	if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index cc9e39c4a5..113f3204cc 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2688,6 +2688,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 (10 rows)
 
 ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+                                                                             QUERY PLAN                                                                              
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Foreign Scan
+   Output: (count(*))
+   Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
+   Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
+(4 rows)
+
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+ERROR:  canceling statement due to statement timeout
+RESET statement_timeout;
 -- cleanup
 DROP OWNED BY regress_view_owner;
 DROP ROLE regress_view_owner;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index e48ccd286b..bf977442d6 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -326,6 +326,7 @@ DELETE FROM loct_empty;
 ANALYZE ft_empty;
 EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft_empty ORDER BY c1;
 
+
 -- ===================================================================
 -- WHERE with remotely-executable conditions
 -- ===================================================================
@@ -713,6 +714,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
 ALTER VIEW v4 OWNER TO regress_view_owner;
 
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+RESET statement_timeout;
+
 -- cleanup
 DROP OWNED BY regress_view_owner;
 DROP ROLE regress_view_owner;
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 3c9bd3d673..90db021c1d 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -265,7 +265,7 @@ PGconn *PQsetdb(char *pghost,
     <varlistentry id="libpq-PQconnectStartParams">
      <term><function>PQconnectStartParams</function><indexterm><primary>PQconnectStartParams</primary></indexterm></term>
      <term><function>PQconnectStart</function><indexterm><primary>PQconnectStart</primary></indexterm></term>
-     <term><function>PQconnectPoll</function><indexterm><primary>PQconnectPoll</primary></indexterm></term>
+     <term id="libpq-PQconnectPoll"><function>PQconnectPoll</function><indexterm><primary>PQconnectPoll</primary></indexterm></term>
      <listitem>
       <para>
        <indexterm><primary>nonblocking connection</primary></indexterm>
@@ -4909,7 +4909,7 @@ int PQisBusy(PGconn *conn);
    <xref linkend="libpq-PQsendQuery"/>/<xref linkend="libpq-PQgetResult"/>
    can also attempt to cancel a command that is still being processed
    by the server; see <xref linkend="libpq-cancel"/>.  But regardless of
-   the return value of <xref linkend="libpq-PQcancel"/>, the application
+   the return value of <xref linkend="libpq-PQcancelSend"/>, the application
    must continue with the normal result-reading sequence using
    <xref linkend="libpq-PQgetResult"/>.  A successful cancellation will
    simply cause the command to terminate sooner than it would have
@@ -5627,13 +5627,220 @@ int PQsetSingleRowMode(PGconn *conn);
    this section.
 
    <variablelist>
+    <varlistentry id="libpq-PQcancelSend">
+     <term><function>PQcancelSend</function><indexterm><primary>PQcancelSend</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       Requests that the server abandons processing of the current command.
+<synopsis>
+PGcancelConn *PQcancelSend(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       This request is made over a connection that uses the same connection
+       options as the the original <structname>PGconn</structname>. So when the
+       original connection is encrypted (using TLS or GSS), the connection for
+       the cancel request connection is encrypted in the same. Any connection
+       options that only make sense for authentication or after authentication
+       are ignored though, because cancellation requests do not require
+       authentication.
+      </para>
+
+      <para>
+       This function returns a <structname>PGcancelConn</structname>
+       object. By using
+       <xref linkend="libpq-PQcancelStatus"/>
+       it can be checked if there was any error when sending the cancellation
+       request. If <xref linkend="libpq-PQcancelStatus"/>
+       returns for <symbol>CONNECTION_OK</symbol> the request was
+       successfully sent, but if it returns <symbol>CONNECTION_BAD</symbol>
+       an error occured. If an error occured the error message can be retrieved using
+       <xref linkend="libpq-PQcancelErrorMessage"/>.
+      </para>
+
+      <para>
+       Successful dispatch of the cancellation is no guarantee that the request
+       will have any effect, however. If the cancellation is effective, the
+       command being cancelled will terminate early and return an error result.
+       If the cancellation fails (say, because the server was already done
+       processing the command), then there will be no visible result at all.
+      </para>
+
+      <para>
+       Note that when <function>PQcancelSend</function> returns a non-null
+       pointer, you must call <xref linkend="libpq-PQcancelFinish"/> when you 
+       are finished with it, in order to dispose of the structure and any
+       associated memory blocks. This must be done even if the cancel request
+       failed.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelConn">
+     <term><function>PQcancelConn</function><indexterm><primary>PQcancelConn</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of <xref linkend="libpq-PQcancelSend"/> that can be used 
+       in a non-blocking manner.
+<synopsis>
+PGcancelConn *PQcancelConn(PGconn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       <xref linkend="libpq-PQcancelConn"/> creates a
+       <structname>PGcancelConn</structname><indexterm><primary>PGcancelConn</primary></indexterm>,
+       but it won't instantly start sending a cancel request over this
+       connection like <xref linkend="libpq-PQcancelSend"/>.
+       <xref linkend="libpq-PQcancelStatus"/> should be called on the return
+       value to check if the <structname> PGcancelConn </structname> was
+       created successfully.
+       The <structname>PGcancelConn</structname> object is an opaque structure
+       that is not meant to be accessed directly by the application.
+       This <structname>PGcancelConn</structname> object can be used to cancel
+       the query that's running on the original connection in a thread-safe and
+       non-blocking way.
+      </para>
+
+      <para>
+       Note that when <function>PQcancelConn</function> returns a non-null
+       pointer, you must call <xref linkend="libpq-PQcancelFinish"/> when you 
+       are finished with it, in order to dispose of the structure and any
+       associated memory blocks. This must be done even if the cancel request
+       failed or was abandoned.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelStatus">
+     <term><function>PQcancelStatus</function><indexterm><primary>PQcancelStatus</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of <xref linkend="libpq-PQstatus"/> that can be used for
+       cancellation connections.
+<synopsis>
+ConnStatusType PQcancelStatus(const PGcancelConn *conn);
+</synopsis>
+      </para>
+      <para>
+       In addition to all the statuses that a <structname>PGconn</structname>
+       can have, this connection can have one additional status:
+
+       <variablelist>
+        <varlistentry id="libpq-connection-starting">
+         <term><symbol>CONNECTION_STARTING</symbol></term>
+         <listitem>
+          <para>
+           Waiting for the first call to <xref linkend="libpq-PQcancelPoll"/>,
+           to actually open the socket. This is the connection state right after
+           calling <xref linkend="libpq-PQcancelConn"/>. No connection to the
+           server has been initiated yet at this point. To actually start
+           sending the cancel request use <xref linkend="libpq-PQcancelPoll"/>.
+          </para>
+         </listitem>
+        </varlistentry>
+       </variablelist>
+      </para>
+
+      <para>
+       One final note about the returned statuses is that 
+       <symbol>CONNECTION_OK</symbol> has a slightly different meaning for a
+       <structname>PGcancelConn</structname> than what it has for a 
+       <structname>PGconn</structname>. When <xref linkend="libpq-PQcancelStatus"/>
+       returns <symbol>CONNECTION_OK</symbol> for a <structname>PGcancelConn</structname>
+       it means that that the dispatch of the cancel request has completed (although
+       this is no promise that the query was actually cancelled).
+       While a <symbol>CONNECTION_OK</symbol> result for
+       <structname>PGconn</structname> means thatqueries can be sent over the
+       connection.
+      </para>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelPoll">
+     <term><function>PQcancelPoll</function><indexterm><primary>PQcancelPoll</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of <xref linkend="libpq-PQconnectPoll"/> that can be used for
+       cancellation connections.
+<synopsis>
+PostgresPollingStatusType PQcancelPoll(PGcancelConn *conn);
+</synopsis>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelErrorMessage">
+     <term><function>PQcancelErrorMessage</function><indexterm><primary>PQcancelErrorMessage</primary></indexterm></term>
+
+     <listitem>
+      <para>
+       A version of <xref linkend="libpq-PQerrorMessage"/> that can be used for
+       cancellation connections.
+<synopsis>
+char *PQcancelErrorMessage(const PGcancelConn *conn);
+</synopsis>
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelFinish">
+     <term><function>PQcancelFinish</function><indexterm><primary>PQcancelFinish</primary></indexterm></term>
+     <listitem>
+      <para>
+       Closes the cancel connection (if it did not finish sending the cancel 
+       request yet). Also frees memory used by the <structname>PGcancelConn</structname> 
+       object.
+<synopsis>
+void PQcancelFinish(PGcancelConn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       Note that even if the cancel attempt fails (as
+       indicated by <xref linkend="libpq-PQcancelStatus"/>), the application should call <xref linkend="libpq-PQcancelFinish"/>
+       to free the memory used by the <structname>PGcancelConn</structname> object.
+       The <structname>PGcancelConn</structname> pointer must not be used again after
+       <xref linkend="libpq-PQcancelFinish"/> has been called.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQcancelReset">
+     <term><function>PQcancelReset</function><indexterm><primary>PQcancelReset</primary></indexterm></term>
+     <listitem>
+      <para>
+       Resets the <symbol>PGcancelConn</symbol> so it can be reused for a new 
+       cancel connection.
+<synopsis>
+void PQcancelReset(PGcancelConn *conn);
+</synopsis>
+      </para>
+
+      <para>
+       If the <symbol>PGcancelConn</symbol> is currently used to send a cancel
+       request, then this connection is closed. It will then prepare the
+       <symbol>PGcancelConn</symbol> object such that it can be used to send a
+       new cancel request. This can be used to create one <symbol>PGcancelConn</symbol>
+       for a <symbol>PGconn</symbol> and reuse that multiple times throughout
+       the lifetime of the original <symbol>PGconn</symbol>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry id="libpq-PQgetCancel">
      <term><function>PQgetCancel</function><indexterm><primary>PQgetCancel</primary></indexterm></term>
 
      <listitem>
       <para>
        Creates a data structure containing the information needed to cancel
-       a command issued through a particular database connection.
+       a command using <xref linkend="libpq-PQcancel"/>.
 <synopsis>
 PGcancel *PQgetCancel(PGconn *conn);
 </synopsis>
@@ -5675,14 +5882,30 @@ void PQfreeCancel(PGcancel *cancel);
 
      <listitem>
       <para>
-       Requests that the server abandon processing of the current command.
+       An insecure version of
+       <xref linkend="libpq-PQcancelSend"/>, but one that can be used safely
+       from within a signal handler.
 <synopsis>
 int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
 </synopsis>
       </para>
 
       <para>
-       The return value is 1 if the cancel request was successfully
+       <xref linkend="libpq-PQcancel"/> should only be used if it's necessary
+       to cancel a query from a signal-handler. If signal-safety is not needed,
+       <xref linkend="libpq-PQcancelSend"/> should be used to cancel the query 
+       instead.
+       <xref linkend="libpq-PQcancel"/> can be safely invoked from a signal
+       handler, if the <parameter>errbuf</parameter> is a local variable in the
+       signal handler.  The <structname>PGcancel</structname> object is read-only
+       as far as <xref linkend="libpq-PQcancel"/> is concerned, so it can
+       also be invoked from a thread that is separate from the one
+       manipulating the <structname>PGconn</structname> object.
+      </para>
+
+      <para>
+       The return value of <xref linkend="libpq-PQcancel"/>
+       is 1 if the cancel request was successfully
        dispatched and 0 if not.  If not, <parameter>errbuf</parameter> is filled
        with an explanatory error message.  <parameter>errbuf</parameter>
        must be a char array of size <parameter>errbufsize</parameter> (the
@@ -5690,21 +5913,22 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
       </para>
 
       <para>
-       Successful dispatch is no guarantee that the request will have
-       any effect, however.  If the cancellation is effective, the current
-       command will terminate early and return an error result.  If the
-       cancellation fails (say, because the server was already done
-       processing the command), then there will be no visible result at
-       all.
-      </para>
-
-      <para>
-       <xref linkend="libpq-PQcancel"/> can safely be invoked from a signal
-       handler, if the <parameter>errbuf</parameter> is a local variable in the
-       signal handler.  The <structname>PGcancel</structname> object is read-only
-       as far as <xref linkend="libpq-PQcancel"/> is concerned, so it can
-       also be invoked from a thread that is separate from the one
-       manipulating the <structname>PGconn</structname> object.
+       To achieve signal-safety, some concessions needed to be made in the
+       implementation of <xref linkend="libpq-PQcancel"/>. Not all connection
+       options of the original connection are used when establishing a
+       connection for the cancellation request. When calling this function a
+       connection is made to the postgres host using the same port. The only
+       connection options that are honored during this connection are
+       <varname>keepalives</varname>,
+       <varname>keepalives_idle</varname>,
+       <varname>keepalives_interval</varname>,
+       <varname>keepalives_count</varname>, and
+       <varname>tcp_user_timeout</varname>.
+       So, for example
+       <varname>connect_timeout</varname>,
+       <varname>gssencmode</varname>, and
+       <varname>sslmode</varname> are ignored. <emphasis>This means the connection
+       is never encrypted using TLS or GSS</emphasis>.
       </para>
      </listitem>
     </varlistentry>
@@ -5716,13 +5940,22 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
 
      <listitem>
       <para>
-       <xref linkend="libpq-PQrequestCancel"/> is a deprecated variant of
-       <xref linkend="libpq-PQcancel"/>.
+       <xref linkend="libpq-PQrequestCancel"/> is a deprecated and insecure
+       variant of <xref linkend="libpq-PQcancelSend"/>.
 <synopsis>
 int PQrequestCancel(PGconn *conn);
 </synopsis>
       </para>
 
+      <para>
+       <xref linkend="libpq-PQrequestCancel"/> only exists because of backwards
+       compatibility reasons. <xref linkend="libpq-PQcancelSend"/> should be
+       used instead, to avoid the security and thread-safety issues that this
+       function has. This function has the same security issues as
+       <xref linkend="libpq-PQcancel"/>, but without the benefit of being
+       signal-safe.
+      </para>
+
       <para>
        Requests that the server abandon processing of the current
        command.  It operates directly on the
@@ -8871,7 +9104,7 @@ int PQisthreadsafe();
    The deprecated functions <xref linkend="libpq-PQrequestCancel"/> and
    <xref linkend="libpq-PQoidStatus"/> are not thread-safe and should not be
    used in multithread programs.  <xref linkend="libpq-PQrequestCancel"/>
-   can be replaced by <xref linkend="libpq-PQcancel"/>.
+   can be replaced by <xref linkend="libpq-PQcancelSend"/>.
    <xref linkend="libpq-PQoidStatus"/> can be replaced by
    <xref linkend="libpq-PQoidValue"/>.
   </para>
diff --git a/src/fe_utils/connect_utils.c b/src/fe_utils/connect_utils.c
index 1cc97b72f7..0f5e84ad71 100644
--- a/src/fe_utils/connect_utils.c
+++ b/src/fe_utils/connect_utils.c
@@ -157,19 +157,11 @@ connectMaintenanceDatabase(ConnParams *cparams,
 void
 disconnectDatabase(PGconn *conn)
 {
-	char		errbuf[256];
-
 	Assert(conn != NULL);
 
 	if (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
 	{
-		PGcancel   *cancel;
-
-		if ((cancel = PQgetCancel(conn)))
-		{
-			(void) PQcancel(cancel, errbuf, sizeof(errbuf));
-			PQfreeCancel(cancel);
-		}
+		PQcancelFinish(PQcancelSend(conn));
 	}
 
 	PQfinish(conn);
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc88370..f56e8c185c 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,11 @@ PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded            185
 PQsendFlushRequest        186
+PQcancelSend              187
+PQcancelConn              188
+PQcancelPoll              189
+PQcancelStatus            190
+PQcancelSocket            191
+PQcancelErrorMessage      192
+PQcancelReset             193
+PQcancelFinish            194
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 746e9b4f1e..7b59697e64 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -376,6 +376,7 @@ static PGPing internal_ping(PGconn *conn);
 static PGconn *makeEmptyPGconn(void);
 static void pqFreeCommandQueue(PGcmdQueueEntry *queue);
 static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions);
+static bool copyPGconn(PGconn *srcConn, PGconn *dstConn);
 static void freePGconn(PGconn *conn);
 static void closePGconn(PGconn *conn);
 static void release_conn_addrinfo(PGconn *conn);
@@ -599,8 +600,17 @@ pqDropServerData(PGconn *conn)
 	conn->write_failed = false;
 	free(conn->write_err_msg);
 	conn->write_err_msg = NULL;
-	conn->be_pid = 0;
-	conn->be_key = 0;
+
+	/*
+	 * Cancel connections should save their be_pid and be_key across
+	 * PQresetStart invocations. Otherwise they don't know the secret token of
+	 * the connection they are supposed to cancel anymore.
+	 */
+	if (!conn->cancelRequest)
+	{
+		conn->be_pid = 0;
+		conn->be_key = 0;
+	}
 }
 
 
@@ -731,6 +741,68 @@ PQping(const char *conninfo)
 	return ret;
 }
 
+/*
+ *		PQcancelConn
+ *
+ * Asynchronously cancel a request on the given connection. This requires
+ * polling the returned PGconn to actually complete the cancellation of the
+ * request.
+ */
+PGcancelConn *
+PQcancelConn(PGconn *conn)
+{
+	PGconn	   *cancelConn = makeEmptyPGconn();
+
+	if (cancelConn == NULL)
+		return NULL;
+
+	/* Check we have an open connection */
+	if (!conn)
+	{
+		appendPQExpBufferStr(&cancelConn->errorMessage, libpq_gettext("passed connection was NULL\n"));
+		return (PGcancelConn *) cancelConn;
+	}
+
+	if (conn->sock == PGINVALID_SOCKET)
+	{
+		appendPQExpBufferStr(&cancelConn->errorMessage, libpq_gettext("passed connection is not open\n"));
+		return (PGcancelConn *) cancelConn;
+	}
+
+	/*
+	 * Indicate that this connection is used to send a cancellation
+	 */
+	cancelConn->cancelRequest = true;
+
+	if (!copyPGconn(conn, cancelConn))
+		return (PGcancelConn *) cancelConn;
+
+	/*
+	 * Compute derived options
+	 */
+	if (!connectOptions2(cancelConn))
+		return (PGcancelConn *) cancelConn;
+
+	/*
+	 * Copy cancelation token data from the original connnection
+	 */
+	cancelConn->be_pid = conn->be_pid;
+	cancelConn->be_key = conn->be_key;
+
+	/*
+	 * Cancel requests should not iterate over all possible hosts. The request
+	 * needs to be sent to the exact host and address that the original
+	 * connection used.
+	 */
+	memcpy(&cancelConn->raddr, &conn->raddr, sizeof(SockAddr));
+	cancelConn->whichhost = conn->whichhost;
+	conn->try_next_host = false;
+	conn->try_next_addr = false;
+
+	cancelConn->status = CONNECTION_STARTING;
+	return (PGcancelConn *) cancelConn;
+}
+
 /*
  *		PQconnectStartParams
  *
@@ -907,6 +979,46 @@ fillPGconn(PGconn *conn, PQconninfoOption *connOptions)
 	return true;
 }
 
+/*
+ * Copy over option values from srcConn to dstConn
+ *
+ * Don't put anything cute here --- intelligence should be in
+ * connectOptions2 ...
+ *
+ * Returns true on success. On failure, returns false and sets error message of
+ * dstConn.
+ */
+static bool
+copyPGconn(PGconn *srcConn, PGconn *dstConn)
+{
+	const internalPQconninfoOption *option;
+
+	/* copy over connection options */
+	for (option = PQconninfoOptions; option->keyword; option++)
+	{
+		if (option->connofs >= 0)
+		{
+			const char **tmp = (const char **) ((char *) srcConn + option->connofs);
+
+			if (*tmp)
+			{
+				char	  **dstConnmember = (char **) ((char *) dstConn + option->connofs);
+
+				if (*dstConnmember)
+					free(*dstConnmember);
+				*dstConnmember = strdup(*tmp);
+				if (*dstConnmember == NULL)
+				{
+					appendPQExpBufferStr(&dstConn->errorMessage,
+										 libpq_gettext("out of memory\n"));
+					return false;
+				}
+			}
+		}
+	}
+	return true;
+}
+
 /*
  *		connectOptions1
  *
@@ -2055,10 +2167,17 @@ connectDBStart(PGconn *conn)
 	 * Set up to try to connect to the first host.  (Setting whichhost = -1 is
 	 * a bit of a cheat, but PQconnectPoll will advance it to 0 before
 	 * anything else looks at it.)
+	 *
+	 * Cancel requests are special though, they should only try one host,
+	 * which is determined in PQcancelConn. So leave these settings
+	 * alone for cancel requests.
 	 */
-	conn->whichhost = -1;
-	conn->try_next_addr = false;
-	conn->try_next_host = true;
+	if (!conn->cancelRequest)
+	{
+		conn->whichhost = -1;
+		conn->try_next_host = true;
+		conn->try_next_addr = false;
+	}
 	conn->status = CONNECTION_NEEDED;
 
 	/* Also reset the target_server_type state if needed */
@@ -2107,6 +2226,15 @@ connectDBComplete(PGconn *conn)
 	if (conn == NULL || conn->status == CONNECTION_BAD)
 		return 0;
 
+	if (conn->status == CONNECTION_STARTING)
+	{
+		if (!connectDBStart(conn))
+		{
+			conn->status = CONNECTION_BAD;
+			return 0;
+		}
+	}
+
 	/*
 	 * Set up a time limit, if connect_timeout isn't zero.
 	 */
@@ -2247,8 +2375,8 @@ PQconnectPoll(PGconn *conn)
 	switch (conn->status)
 	{
 			/*
-			 * We really shouldn't have been polled in these two cases, but we
-			 * can handle it.
+			 * We really shouldn't have been polled in these three cases, but
+			 * we can handle it.
 			 */
 		case CONNECTION_BAD:
 			return PGRES_POLLING_FAILED;
@@ -2265,6 +2393,34 @@ PQconnectPoll(PGconn *conn)
 				/* Load waiting data */
 				int			n = pqReadData(conn);
 
+#ifndef WIN32
+				if (n == -2 && conn->cancelRequest)
+#else
+
+				/*
+				 * Windows is a bit special in its EOF behaviour for TCP.
+				 * Sometimes it will error with an ECONNRESET when there is a
+				 * clean connection closure. See these threads for details:
+				 * https://www.postgresql.org/message-id/flat/90b34057-4176-7bb0-0dbb-9822a5f6425b%40greiz-reinsdorf.de
+				 *
+				 * https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BOeoETZQ%3DQw5Ub5h3tmwQhBmDA%3DnuNO3KG%3DzWfUypFAw%40mail.gmail.com
+				 *
+				 * PQcancel ignores such errors and reports success for the
+				 * cancellation anyway, so even if this is not always correct
+				 * we do the same here.
+				 */
+				if (n < 0 && conn->cancelRequest)
+#endif
+				{
+					/*
+					 * This is the expected end state for cancel connections.
+					 * They are closed once the cancel is processed by the
+					 * server.
+					 */
+					conn->status = CONNECTION_OK;
+					resetPQExpBuffer(&conn->errorMessage);
+					return PGRES_POLLING_OK;
+				}
 				if (n < 0)
 					goto error_return;
 				if (n == 0)
@@ -2274,6 +2430,7 @@ PQconnectPoll(PGconn *conn)
 			}
 
 			/* These are writing states, so we just proceed. */
+		case CONNECTION_STARTING:
 		case CONNECTION_STARTED:
 		case CONNECTION_MADE:
 			break;
@@ -2298,6 +2455,14 @@ keep_going:						/* We will come back to here until there is
 	/* Time to advance to next address, or next host if no more addresses? */
 	if (conn->try_next_addr)
 	{
+		/*
+		 * Cancel requests never have more addresses to try. They should only
+		 * try a single one.
+		 */
+		if (conn->cancelRequest)
+		{
+			goto error_return;
+		}
 		if (conn->addr_cur && conn->addr_cur->ai_next)
 		{
 			conn->addr_cur = conn->addr_cur->ai_next;
@@ -2317,6 +2482,15 @@ keep_going:						/* We will come back to here until there is
 		int			ret;
 		char		portstr[MAXPGPATH];
 
+		/*
+		 * Cancel requests never have more hosts to try. They should only try
+		 * a single one.
+		 */
+		if (conn->cancelRequest)
+		{
+			goto error_return;
+		}
+
 		if (conn->whichhost + 1 < conn->nconnhost)
 			conn->whichhost++;
 		else
@@ -2498,19 +2672,27 @@ keep_going:						/* We will come back to here until there is
 					char		host_addr[NI_MAXHOST];
 
 					/*
-					 * Advance to next possible host, if we've tried all of
-					 * the addresses for the current host.
+					 * Cancel requests don't use addr_cur at all. They have
+					 * their raddr field already filled in during
+					 * initialization in PQcancelConn.
 					 */
-					if (addr_cur == NULL)
+					if (!conn->cancelRequest)
 					{
-						conn->try_next_host = true;
-						goto keep_going;
-					}
+						/*
+						 * Advance to next possible host, if we've tried all
+						 * of the addresses for the current host.
+						 */
+						if (addr_cur == NULL)
+						{
+							conn->try_next_host = true;
+							goto keep_going;
+						}
 
-					/* Remember current address for possible use later */
-					memcpy(&conn->raddr.addr, addr_cur->ai_addr,
-						   addr_cur->ai_addrlen);
-					conn->raddr.salen = addr_cur->ai_addrlen;
+						/* Remember current address for possible use later */
+						memcpy(&conn->raddr.addr, addr_cur->ai_addr,
+							   addr_cur->ai_addrlen);
+						conn->raddr.salen = addr_cur->ai_addrlen;
+					}
 
 					/*
 					 * Set connip, too.  Note we purposely ignore strdup
@@ -2526,7 +2708,7 @@ keep_going:						/* We will come back to here until there is
 						conn->connip = strdup(host_addr);
 
 					/* Try to create the socket */
-					conn->sock = socket(addr_cur->ai_family, SOCK_STREAM, 0);
+					conn->sock = socket(conn->raddr.addr.ss_family, SOCK_STREAM, 0);
 					if (conn->sock == PGINVALID_SOCKET)
 					{
 						int			errorno = SOCK_ERRNO;
@@ -2536,12 +2718,18 @@ keep_going:						/* We will come back to here until there is
 						 * addresses to try; this reduces useless chatter in
 						 * cases where the address list includes both IPv4 and
 						 * IPv6 but kernel only accepts one family.
+						 *
+						 * Cancel requests never have more addresses to try.
+						 * They should only try a single one.
 						 */
-						if (addr_cur->ai_next != NULL ||
-							conn->whichhost + 1 < conn->nconnhost)
+						if (!conn->cancelRequest)
 						{
-							conn->try_next_addr = true;
-							goto keep_going;
+							if (addr_cur->ai_next != NULL ||
+								conn->whichhost + 1 < conn->nconnhost)
+							{
+								conn->try_next_addr = true;
+								goto keep_going;
+							}
 						}
 						emitHostIdentityInfo(conn, host_addr);
 						appendPQExpBuffer(&conn->errorMessage,
@@ -2564,7 +2752,7 @@ keep_going:						/* We will come back to here until there is
 					 * TCP sockets, nonblock mode, close-on-exec.  Try the
 					 * next address if any of this fails.
 					 */
-					if (addr_cur->ai_family != AF_UNIX)
+					if (conn->raddr.addr.ss_family != AF_UNIX)
 					{
 						if (!connectNoDelay(conn))
 						{
@@ -2593,7 +2781,7 @@ keep_going:						/* We will come back to here until there is
 					}
 #endif							/* F_SETFD */
 
-					if (addr_cur->ai_family != AF_UNIX)
+					if (conn->raddr.addr.ss_family != AF_UNIX)
 					{
 #ifndef WIN32
 						int			on = 1;
@@ -2687,8 +2875,9 @@ keep_going:						/* We will come back to here until there is
 					 * Start/make connection.  This should not block, since we
 					 * are in nonblock mode.  If it does, well, too bad.
 					 */
-					if (connect(conn->sock, addr_cur->ai_addr,
-								addr_cur->ai_addrlen) < 0)
+					if (connect(conn->sock,
+								(struct sockaddr *) &conn->raddr.addr,
+								conn->raddr.salen) < 0)
 					{
 						if (SOCK_ERRNO == EINPROGRESS ||
 #ifdef WIN32
@@ -2727,6 +2916,16 @@ keep_going:						/* We will come back to here until there is
 				}
 			}
 
+		case CONNECTION_STARTING:
+			{
+				if (!connectDBStart(conn))
+				{
+					goto error_return;
+				}
+				conn->status = CONNECTION_STARTED;
+				return PGRES_POLLING_WRITING;
+			}
+
 		case CONNECTION_STARTED:
 			{
 				socklen_t	optlen = sizeof(optval);
@@ -2935,6 +3134,30 @@ keep_going:						/* We will come back to here until there is
 				}
 #endif							/* USE_SSL */
 
+				/*
+				 * For cancel requests this is as far as we need to go in the
+				 * connection establishment. Now we can actually send our
+				 * cancelation request.
+				 */
+				if (conn->cancelRequest)
+				{
+					CancelRequestPacket cancelpacket;
+
+					packetlen = sizeof(cancelpacket);
+					cancelpacket.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE);
+					cancelpacket.backendPID = pg_hton32(conn->be_pid);
+					cancelpacket.cancelAuthCode = pg_hton32(conn->be_key);
+					if (pqPacketSend(conn, 0, &cancelpacket, packetlen) != STATUS_OK)
+					{
+						appendPQExpBuffer(&conn->errorMessage,
+										  libpq_gettext("could not send cancel packet: %s\n"),
+										  SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf)));
+						goto error_return;
+					}
+					conn->status = CONNECTION_AWAITING_RESPONSE;
+					return PGRES_POLLING_READING;
+				}
+
 				/*
 				 * Build the startup packet.
 				 */
@@ -4114,6 +4337,15 @@ release_conn_addrinfo(PGconn *conn)
 static void
 sendTerminateConn(PGconn *conn)
 {
+	/*
+	 * The Postgres cancellation protocol does not have a notion of a Terminate
+	 * message, so don't send one.
+	 */
+	if (conn->cancelRequest)
+	{
+		return;
+	}
+
 	/*
 	 * Note that the protocol doesn't allow us to send Terminate messages
 	 * during the startup phase.
@@ -4582,6 +4814,96 @@ cancel_errReturn:
 	return false;
 }
 
+/*
+ * PQrequestCancel: old, not thread-safe function for requesting query cancel
+ *
+ * Returns true if able to send the cancel request, false if not.
+ *
+ * On failure, the error message is saved in conn->errorMessage; this means
+ * that this can't be used when there might be other active operations on
+ * the connection object.
+ *
+ * NOTE: error messages will be cut off at the current size of the
+ * error message buffer, since we dare not try to expand conn->errorMessage!
+ */
+PGcancelConn *
+PQcancelSend(PGconn *conn)
+{
+	PGcancelConn *cancelConn = PQcancelConn(conn);
+
+	if (cancelConn && cancelConn->conn.status != CONNECTION_BAD)
+		(void) connectDBComplete(&cancelConn->conn);
+
+	return cancelConn;
+}
+
+/*
+ *		PQcancelPoll
+ *
+ * Poll a cancel connection. For usage details see PQconnectPoll.
+ */
+PostgresPollingStatusType
+PQcancelPoll(PGcancelConn * cancelConn)
+{
+	return PQconnectPoll((PGconn *) cancelConn);
+}
+
+/*
+ *		PQcancelStatus
+ *
+ * Get the status of a cancel connection.
+ */
+ConnStatusType
+PQcancelStatus(const PGcancelConn * cancelConn)
+{
+	return PQstatus((const PGconn *) cancelConn);
+}
+
+/*
+ *		PQcancelSocket
+ *
+ * Get the socket of the cancel connection.
+ */
+int
+PQcancelSocket(const PGcancelConn * cancelConn)
+{
+	return PQsocket((const PGconn *) cancelConn);
+}
+
+/*
+ *		PQcancelErrorMessage
+ *
+ * Get the socket of the cancel connection.
+ */
+char *
+PQcancelErrorMessage(const PGcancelConn * cancelConn)
+{
+	return PQerrorMessage((const PGconn *) cancelConn);
+}
+
+/*
+ *		PQcancelReset
+ *
+ * Resets the cancel connection, so it can be reused to send a new cancel
+ * request.
+ */
+void
+PQcancelReset(PGcancelConn *cancelConn)
+{
+	closePGconn((PGconn *) cancelConn);
+	cancelConn->conn.status = CONNECTION_STARTING;
+}
+
+/*
+ *		PQcancelFinish
+ *
+ * Closes and frees the cancel connection.
+ */
+void
+PQcancelFinish(PGcancelConn * cancelConn)
+{
+	PQfinish((PGconn *) cancelConn);
+}
 
 /*
  * PQrequestCancel: old, not thread-safe function for requesting query cancel
@@ -4640,6 +4962,7 @@ PQrequestCancel(PGconn *conn)
 }
 
 
+
 /*
  * pqPacketSend() -- convenience routine to send a message to server.
  *
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 795500c593..b5b10ec2ba 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -556,8 +556,11 @@ pqPutMsgEnd(PGconn *conn)
  * Possible return values:
  *	 1: successfully loaded at least one more byte
  *	 0: no data is presently available, but no error detected
- *	-1: error detected (including EOF = connection closure);
+ *	-1: error detected (excluding EOF = connection closure);
  *		conn->errorMessage set
+ *	-2: EOF detected, connection is closed
+ *		conn->errorMessage set
+ *
  * NOTE: callers must not assume that pointers or indexes into conn->inBuffer
  * remain valid across this call!
  * ----------
@@ -640,7 +643,7 @@ retry3:
 
 			default:
 				/* pqsecure_read set the error message for us */
-				return -1;
+				return nread;
 		}
 	}
 	if (nread > 0)
@@ -735,7 +738,7 @@ retry4:
 
 			default:
 				/* pqsecure_read set the error message for us */
-				return -1;
+				return nread;
 		}
 	}
 	if (nread > 0)
@@ -753,13 +756,17 @@ definitelyEOF:
 						 libpq_gettext("server closed the connection unexpectedly\n"
 									   "\tThis probably means the server terminated abnormally\n"
 									   "\tbefore or while processing the request.\n"));
+	/* Do *not* drop any already-read data; caller still wants it */
+	pqDropConnection(conn, false);
+	conn->status = CONNECTION_BAD;	/* No more connection to backend */
+	return -2;
 
 	/* Come here if lower-level code already set a suitable errorMessage */
 definitelyFailed:
 	/* Do *not* drop any already-read data; caller still wants it */
 	pqDropConnection(conn, false);
 	conn->status = CONNECTION_BAD;	/* No more connection to backend */
-	return -1;
+	return nread < 0 ? nread : -1;
 }
 
 /*
diff --git a/src/interfaces/libpq/fe-secure-openssl.c b/src/interfaces/libpq/fe-secure-openssl.c
index b42a908733..ca378d2ad5 100644
--- a/src/interfaces/libpq/fe-secure-openssl.c
+++ b/src/interfaces/libpq/fe-secure-openssl.c
@@ -253,7 +253,7 @@ rloop:
 			appendPQExpBufferStr(&conn->errorMessage,
 								 libpq_gettext("SSL connection has been closed unexpectedly\n"));
 			result_errno = ECONNRESET;
-			n = -1;
+			n = -2;
 			break;
 		default:
 			appendPQExpBuffer(&conn->errorMessage,
diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c
index 3df4a97f2e..18dff253c4 100644
--- a/src/interfaces/libpq/fe-secure.c
+++ b/src/interfaces/libpq/fe-secure.c
@@ -199,6 +199,12 @@ pqsecure_close(PGconn *conn)
  * On failure, this function is responsible for appending a suitable message
  * to conn->errorMessage.  The caller must still inspect errno, but only
  * to determine whether to continue/retry after error.
+ *
+ * Returns -1 in case of failures, except in the case of where a failure means
+ * that there was a clean connection closure, in those cases -2 is returned.
+ * Currently only the TLS implementation of pqsecure_read ever returns -2. For
+ * the other implementations a clean connection closure is detected in
+ * pqReadData instead.
  */
 ssize_t
 pqsecure_read(PGconn *conn, void *ptr, size_t len)
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b7df3224c0..de2e32ca63 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -78,7 +78,9 @@ typedef enum
 	CONNECTION_CONSUME,			/* Consuming any extra messages. */
 	CONNECTION_GSS_STARTUP,		/* Negotiating GSSAPI. */
 	CONNECTION_CHECK_TARGET,	/* Checking target server properties. */
-	CONNECTION_CHECK_STANDBY	/* Checking if server is in standby mode. */
+	CONNECTION_CHECK_STANDBY,	/* Checking if server is in standby mode. */
+	CONNECTION_STARTING			/* Waiting for connection attempt to be
+								 * started.  */
 } ConnStatusType;
 
 typedef enum
@@ -165,6 +167,11 @@ typedef enum
  */
 typedef struct pg_conn PGconn;
 
+/* PGcancelConn encapsulates a cancel connection to the backend.
+ * The contents of this struct are not supposed to be known to applications.
+ */
+typedef struct pg_cancel_conn PGcancelConn;
+
 /* PGresult encapsulates the result of a query (or more precisely, of a single
  * SQL command --- a query string given to PQsendQuery can contain multiple
  * commands and thus return multiple PGresult objects).
@@ -321,16 +328,28 @@ extern PostgresPollingStatusType PQresetPoll(PGconn *conn);
 /* Synchronous (blocking) */
 extern void PQreset(PGconn *conn);
 
+/* issue a cancel request */
+extern PGcancelConn * PQcancelSend(PGconn *conn);
+/* non-blocking version of PQrequestSend */
+extern PGcancelConn * PQcancelConn(PGconn *conn);
+extern PostgresPollingStatusType PQcancelPoll(PGcancelConn * cancelConn);
+extern ConnStatusType PQcancelStatus(const PGcancelConn * cancelConn);
+extern int	PQcancelSocket(const PGcancelConn * cancelConn);
+extern char *PQcancelErrorMessage(const PGcancelConn * cancelConn);
+extern void PQcancelReset(PGcancelConn *cancelConn);
+extern void PQcancelFinish(PGcancelConn * cancelConn);
+
+
 /* request a cancel structure */
 extern PGcancel *PQgetCancel(PGconn *conn);
 
 /* free a cancel structure */
 extern void PQfreeCancel(PGcancel *cancel);
 
-/* issue a cancel request */
+/* a less secure version of PQcancelSend, but one which is signal-safe */
 extern int	PQcancel(PGcancel *cancel, char *errbuf, int errbufsize);
 
-/* backwards compatible version of PQcancel; not thread-safe */
+/* deprecated version of PQcancel; not thread-safe */
 extern int	PQrequestCancel(PGconn *conn);
 
 /* Accessor functions for PGconn objects */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index c75ed63a2c..84027bc4ab 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -397,6 +397,10 @@ struct pg_conn
 	char	   *ssl_max_protocol_version;	/* maximum TLS protocol version */
 	char	   *target_session_attrs;	/* desired session properties */
 
+	bool		cancelRequest;	/* true if this connection is used to send a
+								 * cancel request, instead of being a normal
+								 * connection that's used for queries */
+
 	/* Optional file to write trace info to */
 	FILE	   *Pfdebug;
 	int			traceFlags;
@@ -592,6 +596,11 @@ struct pg_conn
 	PQExpBufferData workBuffer; /* expansible string */
 };
 
+struct pg_cancel_conn
+{
+	PGconn		conn;
+};
+
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
  * data is required to safely cancel a connection running on a different
  * thread.
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index 0a66235153..3781f7982b 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -946,26 +946,21 @@ try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags)
 			 */
 			if (td > max_step_wait && !canceled)
 			{
-				PGcancel   *cancel = PQgetCancel(conn);
+				PGcancelConn *cancel_conn = PQcancelSend(conn);
 
-				if (cancel != NULL)
+				if (PQcancelStatus(cancel_conn) == CONNECTION_OK)
 				{
-					char		buf[256];
-
-					if (PQcancel(cancel, buf, sizeof(buf)))
-					{
-						/*
-						 * print to stdout not stderr, as this should appear
-						 * in the test case's results
-						 */
-						printf("isolationtester: canceling step %s after %d seconds\n",
-							   step->name, (int) (td / USECS_PER_SEC));
-						canceled = true;
-					}
-					else
-						fprintf(stderr, "PQcancel failed: %s\n", buf);
-					PQfreeCancel(cancel);
+					/*
+					 * print to stdout not stderr, as this should appear in
+					 * the test case's results
+					 */
+					printf("isolationtester: canceling step %s after %d seconds\n",
+						   step->name, (int) (td / USECS_PER_SEC));
+					canceled = true;
 				}
+				else
+					fprintf(stderr, "PQcancel failed: %s\n", PQcancelErrorMessage(cancel_conn));
+				PQcancelFinish(cancel_conn);
 			}
 
 			/*
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
index c609f42258..2674abb539 100644
--- a/src/test/modules/libpq_pipeline/libpq_pipeline.c
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -86,6 +86,264 @@ pg_fatal_impl(int line, const char *fmt,...)
 	exit(1);
 }
 
+/*
+ * Check that the query on the given connection got cancelled.
+ *
+ * This is a function wrapped in a macrco to make the reported line number
+ * in an error match the line number of the invocation.
+ */
+#define confirm_query_cancelled(conn) confirm_query_cancelled_impl(__LINE__, conn)
+static void
+confirm_query_cancelled_impl(int line, PGconn *conn)
+{
+	PGresult   *res = NULL;
+
+	res = PQgetResult(conn);
+	if (res == NULL)
+		pg_fatal_impl(line, "PQgetResult returned null: %s",
+					  PQerrorMessage(conn));
+	if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+		pg_fatal_impl(line, "query did not fail when it was expected");
+	if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
+		pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
+					  PQerrorMessage(conn));
+	PQclear(res);
+	while (PQisBusy(conn))
+	{
+		PQconsumeInput(conn);
+	}
+}
+
+#define send_cancellable_query(conn, monitorConn) send_cancellable_query_impl(__LINE__, conn, monitorConn)
+static void
+send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
+{
+	const char *env_wait;
+	const Oid paramTypes[1] = {INT4OID};
+
+	env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
+	if (env_wait == NULL)
+		env_wait = "180";
+
+	if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes, &env_wait, NULL, NULL, 0) != 1)
+		pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
+
+	/*
+	 * Wait until the query is actually running. Otherwise sending a
+	 * cancellation request might not cancel the query due to race conditions.
+	 */
+	while (true)
+	{
+		char	   *value = NULL;
+		PGresult   *res = PQexec(
+								 monitorConn,
+								 "SELECT count(*) FROM pg_stat_activity WHERE "
+								 "query = 'SELECT pg_sleep($1)' "
+								 "AND state = 'active'");
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_fatal("Connection to database failed: %s", PQerrorMessage(monitorConn));
+		}
+		if (PQntuples(res) != 1)
+		{
+			pg_fatal("unexpected number of rows received: %d", PQntuples(res));
+		}
+		if (PQnfields(res) != 1)
+		{
+			pg_fatal("unexpected number of columns received: %d", PQnfields(res));
+		}
+		value = PQgetvalue(res, 0, 0);
+		if (*value != '0')
+		{
+			PQclear(res);
+			break;
+		}
+		PQclear(res);
+
+		/*
+		 * wait 10ms before polling again
+		 */
+		pg_usleep(10000);
+	}
+}
+
+static void
+test_cancel(PGconn *conn, const char *conninfo)
+{
+	PGcancel   *cancel = NULL;
+	PGcancelConn	   *cancelConn = NULL;
+	PGconn	   *monitorConn = NULL;
+	char		errorbuf[256];
+
+	fprintf(stderr, "test cancellations... ");
+
+	if (PQsetnonblocking(conn, 1) != 0)
+		pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+	/*
+	 * Make a connection to the database to monitor the query on the main
+	 * connection.
+	 */
+	monitorConn = PQconnectdb(conninfo);
+	if (PQstatus(conn) != CONNECTION_OK)
+	{
+		pg_fatal("Connection to database failed: %s",
+				 PQerrorMessage(conn));
+	}
+
+	/* test PQcancel */
+	send_cancellable_query(conn, monitorConn);
+	cancel = PQgetCancel(conn);
+	if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+	{
+		pg_fatal("failed to run PQcancel: %s", errorbuf);
+	};
+	confirm_query_cancelled(conn);
+
+	/* PGcancel object can be reused for the next query */
+	send_cancellable_query(conn, monitorConn);
+	if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+	{
+		pg_fatal("failed to run PQcancel: %s", errorbuf);
+	};
+	confirm_query_cancelled(conn);
+
+	PQfreeCancel(cancel);
+
+	/* test PQrequestCancel */
+	send_cancellable_query(conn, monitorConn);
+	if (!PQrequestCancel(conn))
+		pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
+	confirm_query_cancelled(conn);
+
+	/* test PQcancelSend */
+	send_cancellable_query(conn, monitorConn);
+	cancelConn = PQcancelSend(conn);
+	if (PQcancelStatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("failed to run PQcancelSend: %s", PQcancelErrorMessage(cancelConn));
+	confirm_query_cancelled(conn);
+	PQcancelFinish(cancelConn);
+
+	/* test PQcancelConn and then polling with PQcancelPoll */
+	send_cancellable_query(conn, monitorConn);
+	cancelConn = PQcancelConn(conn);
+	if (PQcancelStatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
+	while (true)
+	{
+		struct timeval tv;
+		fd_set		input_mask;
+		fd_set		output_mask;
+		PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
+		int			sock = PQcancelSocket(cancelConn);
+
+		if (pollres == PGRES_POLLING_OK)
+		{
+			break;
+		}
+
+		FD_ZERO(&input_mask);
+		FD_ZERO(&output_mask);
+		switch (pollres)
+		{
+			case PGRES_POLLING_READING:
+				pg_debug("polling for reads\n");
+				FD_SET(sock, &input_mask);
+				break;
+			case PGRES_POLLING_WRITING:
+				pg_debug("polling for writes\n");
+				FD_SET(sock, &output_mask);
+				break;
+			default:
+				pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
+		}
+
+		if (sock < 0)
+			pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
+
+		tv.tv_sec = 3;
+		tv.tv_usec = 0;
+
+		while (true)
+		{
+			if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
+			{
+				if (errno == EINTR)
+					continue;
+				pg_fatal("select() failed: %m");
+			}
+			break;
+		}
+	}
+	if (PQcancelStatus(cancelConn) != CONNECTION_OK)
+		pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
+	confirm_query_cancelled(conn);
+
+	/*
+	 * test PQcancelReset works on the cancel connection and it can be reused
+	 * after
+	 */
+	PQcancelReset(cancelConn);
+
+	send_cancellable_query(conn, monitorConn);
+	if (PQcancelStatus(cancelConn) == CONNECTION_BAD)
+		pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
+	while (true)
+	{
+		struct timeval tv;
+		fd_set		input_mask;
+		fd_set		output_mask;
+		PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
+		int			sock = PQcancelSocket(cancelConn);
+
+		if (pollres == PGRES_POLLING_OK)
+		{
+			break;
+		}
+
+		FD_ZERO(&input_mask);
+		FD_ZERO(&output_mask);
+		switch (pollres)
+		{
+			case PGRES_POLLING_READING:
+				pg_debug("polling for reads\n");
+				FD_SET(sock, &input_mask);
+				break;
+			case PGRES_POLLING_WRITING:
+				pg_debug("polling for writes\n");
+				FD_SET(sock, &output_mask);
+				break;
+			default:
+				pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
+		}
+
+		if (sock < 0)
+			pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
+
+		tv.tv_sec = 3;
+		tv.tv_usec = 0;
+
+		while (true)
+		{
+			if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
+			{
+				if (errno == EINTR)
+					continue;
+				pg_fatal("select() failed: %m");
+			}
+			break;
+		}
+	}
+	if (PQcancelStatus(cancelConn) != CONNECTION_OK)
+		pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
+	confirm_query_cancelled(conn);
+
+	PQcancelFinish(cancelConn);
+
+	fprintf(stderr, "ok\n");
+}
+
 static void
 test_disallowed_in_pipeline(PGconn *conn)
 {
@@ -1638,6 +1896,7 @@ usage(const char *progname)
 static void
 print_test_list(void)
 {
+	printf("cancel\n");
 	printf("disallowed_in_pipeline\n");
 	printf("multi_pipelines\n");
 	printf("nosync\n");
@@ -1739,7 +1998,9 @@ main(int argc, char **argv)
 						PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
 	}
 
-	if (strcmp(testname, "disallowed_in_pipeline") == 0)
+	if (strcmp(testname, "cancel") == 0)
+		test_cancel(conn, conninfo);
+	else if (strcmp(testname, "disallowed_in_pipeline") == 0)
 		test_disallowed_in_pipeline(conn);
 	else if (strcmp(testname, "multi_pipelines") == 0)
 		test_multi_pipelines(conn);
-- 
2.34.1

