From a92799883555474448ae5d1ab698804c40bf1391 Mon Sep 17 00:00:00 2001
From: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
Date: Thu, 14 Dec 2023 13:39:09 +0100
Subject: [PATCH v33 5/5] Start using new libpq cancel APIs

A previous commit introduced new APIs to libpq for cancelling queries.
This replaces the usage of the old APIs in most of the codebase with
these newer ones. This specifically leaves out changes to psql and
pgbench as those would need a much larger refactor to be able to call
them, due to the new functions not being signal-safe.
---
 contrib/dblink/dblink.c                       |  30 +++--
 contrib/postgres_fdw/connection.c             | 105 +++++++++++++++---
 .../postgres_fdw/expected/postgres_fdw.out    |  15 +++
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   7 ++
 src/fe_utils/connect_utils.c                  |  11 +-
 src/test/isolation/isolationtester.c          |  29 ++---
 6 files changed, 145 insertions(+), 52 deletions(-)

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 19a362526d2..98dcca3e6fd 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -1346,22 +1346,32 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query);
 Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
-	int			res;
 	PGconn	   *conn;
-	PGcancel   *cancel;
-	char		errbuf[256];
+	PGcancelConn *cancelConn;
+	char	   *msg;
 
 	dblink_init();
 	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-	cancel = PQgetCancel(conn);
+	cancelConn = PQcancelCreate(conn);
 
-	res = PQcancel(cancel, errbuf, 256);
-	PQfreeCancel(cancel);
+	PG_TRY();
+	{
+		if (!PQcancelBlocking(cancelConn))
+		{
+			msg = pchomp(PQcancelErrorMessage(cancelConn));
+		}
+		else
+		{
+			msg = "OK";
+		}
+	}
+	PG_FINALLY();
+	{
+		PQcancelFinish(cancelConn);
+	}
+	PG_END_TRY();
 
-	if (res == 1)
-		PG_RETURN_TEXT_P(cstring_to_text("OK"));
-	else
-		PG_RETURN_TEXT_P(cstring_to_text(errbuf));
+	PG_RETURN_TEXT_P(cstring_to_text(msg));
 }
 
 
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 4931ebf5915..dcc13dc3b24 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_cancel_query(PGconn *conn);
-static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
 								   bool consume_input);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@@ -1315,36 +1315,104 @@ pgfdw_cancel_query(PGconn *conn)
 	endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
 										  CONNECTION_CLEANUP_TIMEOUT);
 
-	if (!pgfdw_cancel_query_begin(conn))
+	if (!pgfdw_cancel_query_begin(conn, endtime))
 		return false;
 	return pgfdw_cancel_query_end(conn, endtime, false);
 }
 
 static bool
-pgfdw_cancel_query_begin(PGconn *conn)
+pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
 {
-	PGcancel   *cancel;
-	char		errbuf[256];
+	bool		timed_out = false;
+	bool		failed = false;
+	PGcancelConn *cancel_conn = PQcancelCreate(conn);
 
-	/*
-	 * Issue cancel request.  Unfortunately, there's no good way to limit the
-	 * amount of time that we might block inside PQgetCancel().
-	 */
-	if ((cancel = PQgetCancel(conn)))
+
+	if (!PQcancelStart(cancel_conn))
 	{
-		if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+		PG_TRY();
 		{
 			ereport(WARNING,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
 					 errmsg("could not send cancel request: %s",
-							errbuf)));
-			PQfreeCancel(cancel);
-			return false;
+							pchomp(PQcancelErrorMessage(cancel_conn)))));
 		}
-		PQfreeCancel(cancel);
+		PG_FINALLY();
+		{
+			PQcancelFinish(cancel_conn);
+		}
+		PG_END_TRY();
+		return false;
 	}
 
-	return true;
+	/* In what follows, do not leak any PGcancelConn on an error. */
+	PG_TRY();
+	{
+		while (true)
+		{
+			TimestampTz now = GetCurrentTimestamp();
+			long		cur_timeout;
+			PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn);
+			int			waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+			if (pollres == PGRES_POLLING_OK)
+			{
+				break;
+			}
+
+			/* If timeout has expired, give up, else get sleep time. */
+			cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+			if (cur_timeout <= 0)
+			{
+				timed_out = true;
+				failed = true;
+				goto exit;
+			}
+
+			switch (pollres)
+			{
+				case PGRES_POLLING_READING:
+					waitEvents |= WL_SOCKET_READABLE;
+					break;
+				case PGRES_POLLING_WRITING:
+					waitEvents |= WL_SOCKET_WRITEABLE;
+					break;
+				default:
+					failed = true;
+					goto exit;
+			}
+
+			/* Sleep until there's something to do */
+			WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
+							  cur_timeout, PG_WAIT_EXTENSION);
+			ResetLatch(MyLatch);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+exit:	;
+		if (failed)
+		{
+			if (timed_out)
+			{
+				ereport(WARNING,
+						(errmsg("could not cancel request due to timeout")));
+			}
+			else
+			{
+				ereport(WARNING,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("could not send cancel request: %s",
+								pchomp(PQcancelErrorMessage(cancel_conn)))));
+			}
+		}
+	}
+	PG_FINALLY();
+	{
+		PQcancelFinish(cancel_conn);
+	}
+	PG_END_TRY();
+
+	return !failed;
 }
 
 static bool
@@ -1685,7 +1753,10 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
 	 */
 	if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
 	{
-		if (!pgfdw_cancel_query_begin(entry->conn))
+		TimestampTz endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+														  CONNECTION_CLEANUP_TIMEOUT);
+
+		if (!pgfdw_cancel_query_begin(entry->conn, endtime))
 			return false;		/* Unable to cancel running query */
 		*cancel_requested = lappend(*cancel_requested, entry);
 	}
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c355e8f3f7d..8892abc3502 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2698,6 +2698,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 (10 rows)
 
 ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+                                                                             QUERY PLAN                                                                              
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Foreign Scan
+   Output: (count(*))
+   Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
+   Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
+(4 rows)
+
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+ERROR:  canceling statement due to statement timeout
+RESET statement_timeout;
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 812e7646e16..8aa528002f7 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -717,6 +717,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
 ALTER VIEW v4 OWNER TO regress_view_owner;
 
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+RESET statement_timeout;
+
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
diff --git a/src/fe_utils/connect_utils.c b/src/fe_utils/connect_utils.c
index 808d54461fd..5ed9f3ba17b 100644
--- a/src/fe_utils/connect_utils.c
+++ b/src/fe_utils/connect_utils.c
@@ -157,19 +157,14 @@ connectMaintenanceDatabase(ConnParams *cparams,
 void
 disconnectDatabase(PGconn *conn)
 {
-	char		errbuf[256];
-
 	Assert(conn != NULL);
 
 	if (PQtransactionStatus(conn) == PQTRANS_ACTIVE)
 	{
-		PGcancel   *cancel;
+		PGcancelConn *cancelConn = PQcancelCreate(conn);
 
-		if ((cancel = PQgetCancel(conn)))
-		{
-			(void) PQcancel(cancel, errbuf, sizeof(errbuf));
-			PQfreeCancel(cancel);
-		}
+		(void) PQcancelBlocking(cancelConn);
+		PQcancelFinish(cancelConn);
 	}
 
 	PQfinish(conn);
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index 0a66235153a..65a9abd6888 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -946,26 +946,21 @@ try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags)
 			 */
 			if (td > max_step_wait && !canceled)
 			{
-				PGcancel   *cancel = PQgetCancel(conn);
+				PGcancelConn *cancel_conn = PQcancelCreate(conn);
 
-				if (cancel != NULL)
+				if (PQcancelBlocking(cancel_conn))
 				{
-					char		buf[256];
-
-					if (PQcancel(cancel, buf, sizeof(buf)))
-					{
-						/*
-						 * print to stdout not stderr, as this should appear
-						 * in the test case's results
-						 */
-						printf("isolationtester: canceling step %s after %d seconds\n",
-							   step->name, (int) (td / USECS_PER_SEC));
-						canceled = true;
-					}
-					else
-						fprintf(stderr, "PQcancel failed: %s\n", buf);
-					PQfreeCancel(cancel);
+					/*
+					 * print to stdout not stderr, as this should appear in
+					 * the test case's results
+					 */
+					printf("isolationtester: canceling step %s after %d seconds\n",
+						   step->name, (int) (td / USECS_PER_SEC));
+					canceled = true;
 				}
+				else
+					fprintf(stderr, "PQcancel failed: %s\n", PQcancelErrorMessage(cancel_conn));
+				PQcancelFinish(cancel_conn);
 			}
 
 			/*
-- 
2.34.1

