On Fri, 19 Jan 2024 17:46:03 +0900 (JST)
Tatsuo Ishii <is...@sraoss.co.jp> wrote:

> >> +/* send cancel requests to all connections */
> >> +static void
> >> +cancel_all()
> >> +{
> >> +  for (int i = 0; i < nclients; i++)
> >> +  {
> >> +          char errbuf[1];
> >> +          if (client_states[i].cancel != NULL)
> >> +                  (void) PQcancel(client_states[i].cancel, errbuf, 
> >> sizeof(errbuf));
> >> +  }
> >> +}
> >> +
> >> 
> >> Why in case of errors from PQCancel the error message is neglected? I
> >> think it's better to print out the error message in case of error.
> > 
> > Is the message useful for pgbench users? I saw the error is ignored
> > in pg_dump, for example in bin/pg_dump/parallel.c
> 
> I think the situation is different from pg_dump.  Unlike pg_dump, if
> PQcancel does not work, users can fix the problem by using
> pg_terminate_backend or kill command. In order to make this work, an
> appropriate error message is essential.

Makes sense. I fixed to emit an error message when PQcancel fails.

Also, I added some comments about the signal handling on Windows
to explain why the different way than non-Windows is required;

+    * On Windows, a callback function is set in which query cancel requests
+    * are sent to all benchmark queries running in the backend. This is
+    * required because all threads running queries continue to run without
+    * interrupted even when the signal is received.
+    *

Attached is the updated patch, v6.

> Best reagards,
> --
> Tatsuo Ishii
> SRA OSS LLC
> English: http://www.sraoss.co.jp/index_en/
> Japanese:http://www.sraoss.co.jp
> 
> 


-- 
Yugo NAGATA <nag...@sraoss.co.jp>
>From 52579f3d31a2927d8818953fabf8a908466e4fcf Mon Sep 17 00:00:00 2001
From: Yugo Nagata <nag...@sraoss.co.jp>
Date: Mon, 24 Jul 2023 21:53:28 +0900
Subject: [PATCH v6] Allow pgbnech to cancel queries during benchmark

Previously, Ctrl+C during benchmark killed pgbench immediately,
but queries were not cancelled nd they keep on running on the
backend until they tried to send the result to pgbench.
The commit fixes this so that cancel requests are sent to all
connections before pgbench exits.

In thread #0, setup_cancel_handler is called before the benchmark
so that CancelRequested is set when SIGINT is sent. When SIGINT
is sent during the benchmark, on non-Windows, thread #0 will be
interrupted, return from I/O wait, and send cancel requests to
all connections. After queries are cancelled, other threads also
be interrupted and pgbench will exit at the end. On Windows, cancel
requests are sent in the callback function specified by
setup_cancel_hander.
---
 src/bin/pgbench/pgbench.c                    | 94 ++++++++++++++++++++
 src/bin/pgbench/t/001_pgbench_with_server.pl | 42 +++++++++
 2 files changed, 136 insertions(+)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 2574454839..e69c4af68a 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -596,6 +596,7 @@ typedef enum
 typedef struct
 {
 	PGconn	   *con;			/* connection handle to DB */
+	PGcancel   *cancel;			/* query cancel */
 	int			id;				/* client No. */
 	ConnectionStateEnum state;	/* state machine's current state. */
 	ConditionalStack cstack;	/* enclosing conditionals state */
@@ -638,6 +639,8 @@ typedef struct
 								 * here */
 } CState;
 
+CState	*client_states;		/* status of all clients */
+
 /*
  * Thread state
  */
@@ -837,6 +840,10 @@ static void add_socket_to_set(socket_set *sa, int fd, int idx);
 static int	wait_on_socket_set(socket_set *sa, int64 usecs);
 static bool socket_has_input(socket_set *sa, int fd, int idx);
 
+#ifdef WIN32
+static void pgbench_cancel_callback(void);
+#endif
+
 /* callback used to build rows for COPY during data loading */
 typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
 
@@ -3646,6 +3653,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 						st->state = CSTATE_ABORTED;
 						break;
 					}
+					st->cancel = PQgetCancel(st->con);
 
 					/* reset now after connection */
 					now = pg_time_now();
@@ -4677,6 +4685,21 @@ disconnect_all(CState *state, int length)
 		finishCon(&state[i]);
 }
 
+/* send cancel requests to all connections */
+static void
+cancel_all()
+{
+	for (int i = 0; i < nclients; i++)
+	{
+		char errbuf[256];
+		if (client_states[i].cancel != NULL)
+		{
+			if (!PQcancel(client_states[i].cancel, errbuf, sizeof(errbuf)))
+				pg_log_error("Could not send cancel request: %s", errbuf);
+		}
+	}
+}
+
 /*
  * Remove old pgbench tables, if any exist
  */
@@ -7153,6 +7176,9 @@ main(int argc, char **argv)
 		}
 	}
 
+	/* enable threads to access the status of all clients */
+	client_states = state;
+
 	/* other CState initializations */
 	for (i = 0; i < nclients; i++)
 	{
@@ -7365,6 +7391,39 @@ threadRun(void *arg)
 	StatsData	last,
 				aggs;
 
+	/*
+	 * Query cancellation is handled only in thread #0.
+	 *
+	 * On Windows, a callback function is set in which query cancel requests
+	 * are sent to all benchmark queries running in the backend. This is
+	 * required because all threads running queries continue to run without
+	 * interrupted even when the signal is received.
+	 *
+	 * On non-Windows, any callback function is not set. When SIGINT is
+	 * received, CancelRequested is just set, and only thread #0 is
+	 * interrupted and returns from waiting input from the backend. After
+	 * that, the thread sends cancel requests to all benchmark queries.
+	 */
+	if (thread->tid == 0)
+#ifdef WIN32
+		setup_cancel_handler(pgbench_cancel_callback);
+#else
+		setup_cancel_handler(NULL);
+#endif
+
+#ifndef WIN32
+	if (thread->tid > 0)
+	{
+		sigset_t	sigint_sigset;
+		sigset_t	osigset;
+		sigemptyset(&sigint_sigset);
+		sigaddset(&sigint_sigset, SIGINT);
+
+		/* Block SIGINT in all threads except one. */
+		pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset);
+	}
+#endif
+
 	/* open log file if requested */
 	if (use_log)
 	{
@@ -7407,6 +7466,7 @@ threadRun(void *arg)
 				pg_fatal("could not create connection for client %d",
 						 state[i].id);
 			}
+			state[i].cancel = PQgetCancel(state[i].con);
 		}
 	}
 
@@ -7434,6 +7494,26 @@ threadRun(void *arg)
 		pg_time_usec_t min_usec;
 		pg_time_usec_t now = 0; /* set this only if needed */
 
+		/*
+		 * If pgbench is cancelled, send cancel requests to all connections
+		 * and exit the benchmark.
+		 *
+		 * Note that only thread #0 can be interrupted by SIGINT while waiting
+		 * the result from the backend. Other threads will return from waiting
+		 * just after queries they running are cancelled by thread #0.
+		 *
+		 * On Windows, cancel requests are sent in the callback function, so
+		 * do nothing but exit the benchmark.
+		 */
+		if (CancelRequested)
+		{
+#ifndef WIN32
+			if (thread->tid == 0)
+				cancel_all();
+#endif
+			goto done;
+		}
+
 		/*
 		 * identify which client sockets should be checked for input, and
 		 * compute the nearest time (if any) at which we need to wake up.
@@ -7657,6 +7737,8 @@ finishCon(CState *st)
 	{
 		PQfinish(st->con);
 		st->con = NULL;
+		PQfreeCancel(st->cancel);
+		st->cancel = NULL;
 	}
 }
 
@@ -7883,3 +7965,15 @@ socket_has_input(socket_set *sa, int fd, int idx)
 }
 
 #endif							/* POLL_USING_SELECT */
+
+#ifdef WIN32
+/*
+ * query cancellation callback for Windows
+ */
+static void
+pgbench_cancel_callback(void)
+{
+	/* send cancel requests to all connections */
+	cancel_all();
+}
+#endif
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index fc57facf9e..dd065016ae 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -7,6 +7,7 @@ use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
+use Time::HiRes qw(usleep);
 
 # Check the initial state of the data generated.  Tables for tellers and
 # branches use NULL for their filler attribute.  The table accounts uses
@@ -1502,6 +1503,47 @@ update counter set i = i+1 returning i \gset
 # Clean up
 $node->safe_psql('postgres', 'DROP TABLE counter;');
 
+# Test query canceling by sending SIGINT to a running pgbench
+SKIP:
+{
+	skip "sending SIGINT on Windows terminates the test itself", 3
+	  if $windows_os;
+
+	my ($stdin, $stdout, $stderr, @file);
+
+	@file = $node->_pgbench_make_files(
+		{
+			'003_pgbench_cancel' => qq{
+select pg_sleep($PostgreSQL::Test::Utils::timeout_default);
+		}});
+
+	local %ENV = $node->_get_env();
+
+	my $h = IPC::Run::start(
+		[ 'pgbench', '-c', '2', '-j', '2',
+		  '-T', "$PostgreSQL::Test::Utils::timeout_default", @file ],
+		\$stdin, \$stdout, \$stderr);
+
+	$node->poll_query_until('postgres',
+		q{SELECT (SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep') = 2;}
+	) or die "timed out";
+
+	# Send cancel request
+	$h->signal('INT');
+
+	my $result = finish $h;
+
+	ok(!$result, 'pgbench failed as expected');
+	like(
+		$stderr,
+		qr/Run was aborted; the above results are incomplete/,
+		'pgbench was canceled');
+
+	is($node->safe_psql('postgres',
+		q{SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep'}),
+		'0', 'all queries were canceled');
+}
+
 # done
 $node->safe_psql('postgres', 'DROP TABLESPACE regress_pgbench_tap_1_ts');
 $node->stop;
-- 
2.25.1

Reply via email to