From 71b584be461281be2d412cbb9ca41a022bd92ff4 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Mon, 18 Mar 2024 19:37:40 +0100
Subject: [PATCH v37] postgres_fdw: Start using new libpq cancel APIs

Commit 61461a300c1c introduced new functions to libpq for cancelling
queries.  This replaces the usage of the old ones in postgres_fdw.

This is done by introducing a new libpqsrv_cancel helper function. This
function takes a timeout and can itself be interupted while it is
sending a cancel request instead of being blocked. This new function is
now also used by dblink.

Finally, it also adds some test coverage for the cancel support in
postgres_fdw.

Author: Jelte Fennema-Nio <postgres@jeltef.nl>
Discussion: https://postgr.es/m/CAGECzQT_VgOWWENUqvUV9xQmbaCyXjtRRAYO8W07oqashk_N+g@mail.gmail.com
---
 contrib/dblink/dblink.c                       | 21 ++---
 contrib/postgres_fdw/connection.c             | 45 +++++-----
 .../postgres_fdw/expected/postgres_fdw.out    | 15 ++++
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  7 ++
 src/include/libpq/libpq-be-fe-helpers.h       | 84 +++++++++++++++++++
 5 files changed, 135 insertions(+), 37 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index edbc9ab02ac..2c1d92326ed 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1347,25 +1347,16 @@ Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
 	PGconn	   *conn;
-	PGcancelConn *cancelConn;
 	char	   *msg;
+	TimestampTz endtime;
 
 	dblink_init();
 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-	cancelConn = PQcancelCreate(conn);
-
-	PG_TRY();
-	{
-		if (!PQcancelBlocking(cancelConn))
-			msg = pchomp(PQcancelErrorMessage(cancelConn));
-		else
-			msg = "OK";
-	}
-	PG_FINALLY();
-	{
-		PQcancelFinish(cancelConn);
-	}
-	PG_END_TRY();
+	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+										  30000);
+	msg = libpqsrv_cancel(conn, endtime);
+	if (!msg)
+		msg = "OK";
 
 	PG_RETURN_TEXT_P(cstring_to_text(msg));
 }
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4931ebf5915..77a57bbdfd2 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_cancel_query(PGconn *conn);
-static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
 								   bool consume_input);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@@ -1315,35 +1315,32 @@ pgfdw_cancel_query(PGconn *conn)
 	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
 										  CONNECTION_CLEANUP_TIMEOUT);
 
-	if (!pgfdw_cancel_query_begin(conn))
+	if (!pgfdw_cancel_query_begin(conn, endtime))
 		return false;
 	return pgfdw_cancel_query_end(conn, endtime, false);
 }
 
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return true; if the timeout
+ * lapses without that, or the request fails for whatever reason, return
+ * false.
+ */
 static bool
-pgfdw_cancel_query_begin(PGconn *conn)
+pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
 {
-	PGcancel   *cancel;
-	char		errbuf[256];
+	char	   *error = libpqsrv_cancel(conn, endtime);
 
-	/*
-	 * Issue cancel request.  Unfortunately, there's no good way to limit the
-	 * amount of time that we might block inside PQgetCancel().
-	 */
-	if ((cancel = PQgetCancel(conn)))
+	if (error)
 	{
-		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
-		{
-			ereport(WARNING,
-					(errcode(ERRCODE_CONNECTION_FAILURE),
-					 errmsg("could not send cancel request: %s",
-							errbuf)));
-			PQfreeCancel(cancel);
-			return false;
-		}
-		PQfreeCancel(cancel);
+		ereport(WARNING,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not send cancel request: %s", error)));
+		return false;
 	}
-
 	return true;
 }
 
@@ -1685,7 +1682,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
 	 */
 	if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
 	{
-		if (!pgfdw_cancel_query_begin(entry->conn))
+		TimestampTz endtime;
+
+		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+											  CONNECTION_CLEANUP_TIMEOUT);
+		if (!pgfdw_cancel_query_begin(entry->conn, endtime))
 			return false;		/* Unable to cancel running query */
 		*cancel_requested = lappend(*cancel_requested, entry);
 	}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index acbbf3b56c8..98d16b1fd8c 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 (10 rows)
 
 ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+                                                                             QUERY PLAN                                                                              
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Foreign Scan
+   Output: (count(*))
+   Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
+   Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
+(4 rows)
+
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+ERROR:  canceling statement due to statement timeout
+RESET statement_timeout;
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index e3d147de6da..2626e68cc69 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
 ALTER VIEW v4 OWNER TO regress_view_owner;
 
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+RESET statement_timeout;
+
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index 5d33bcf32f7..33c7be7845c 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -44,6 +44,8 @@
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/latch.h"
+#include "utils/timestamp.h"
+#include "utils/wait_event.h"
 
 
 static inline void libpqsrv_connect_prepare(void);
@@ -365,4 +367,86 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
 	return PQgetResult(conn);
 }
 
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return NULL; if the timeout
+ * lapses without that, or the request fails for whatever reason, return
+ * the error message.
+ */
+static char *
+libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
+{
+	char	   *error = NULL;
+	PGcancelConn *cancel_conn = PQcancelCreate(conn);
+
+	if (!PQcancelStart(cancel_conn))
+	{
+		PG_TRY();
+		{
+			error = pchomp(PQcancelErrorMessage(cancel_conn));
+		}
+		PG_FINALLY();
+		{
+			PQcancelFinish(cancel_conn);
+		}
+		PG_END_TRY();
+		return error;
+	}
+
+	/* In what follows, do not leak any PGcancelConn on an error. */
+	PG_TRY();
+	{
+		while (true)
+		{
+			PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn);
+			TimestampTz now = GetCurrentTimestamp();
+			long		cur_timeout;
+			int			waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+			if (pollres == PGRES_POLLING_OK)
+				break;
+
+			/* If timeout has expired, give up, else get sleep time. */
+			cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+			if (cur_timeout <= 0)
+			{
+				error = "timed out";
+				break;
+			}
+
+			switch (pollres)
+			{
+				case PGRES_POLLING_READING:
+					waitEvents |= WL_SOCKET_READABLE;
+					break;
+				case PGRES_POLLING_WRITING:
+					waitEvents |= WL_SOCKET_WRITEABLE;
+					break;
+				default:
+					error = pchomp(PQcancelErrorMessage(cancel_conn));
+					goto exit;
+			}
+
+			/* Sleep until there's something to do */
+			WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
+							  cur_timeout, PG_WAIT_EXTENSION);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+exit:	;
+	}
+	PG_FINALLY();
+	{
+		PQcancelFinish(cancel_conn);
+	}
+	PG_END_TRY();
+
+	return error;
+}
+
+
 #endif							/* LIBPQ_BE_FE_HELPERS_H */

base-commit: b4080fa3dcf6c6359e542169e0e81a0662c53ba8
-- 
2.34.1

