On Mon, 15 Jan 2024 16:49:44 +0900 (JST)
Tatsuo Ishii <is...@sraoss.co.jp> wrote:

> > On Wed, 6 Sep 2023 20:13:34 +0900
> > Yugo NAGATA <nag...@sraoss.co.jp> wrote:
> >  
> >> I attached the updated patch v3. The changes since the previous
> >> patch includes the following;
> >> 
> >> I removed the unnecessary condition (&& false) that you
> >> pointed out in [1].
> >> 
> >> The test was rewritten by using IPC::Run signal() and integrated
> >> to "001_pgbench_with_server.pl". This test is skipped on Windows
> >> because SIGINT causes to terminate the test itself as discussed
> >> in [2] about query cancellation test in psql.
> >> 
> >> I added some comments to describe how query cancellation is
> >> handled as I explained in [1].
> >> 
> >> Also, I found the previous patch didn't work on Windows so fixed it.
> >> On non-Windows system, a thread waiting a response of long query can
> >> be interrupted by SIGINT, but on Windows, threads do not return from
> >> waiting until queries they are running are cancelled. This is because,
> >> when the signal is received, the system just creates a new thread to
> >> execute the callback function specified by setup_cancel_handler, and
> >> other thread continue to run[3]. Therefore, the queries have to be
> >> cancelled in the callback function.
> >> 
> >> [1] 
> >> https://www.postgresql.org/message-id/a58388ac-5411-4760-ea46-71324d8324cb%40mines-paristech.fr
> >> [2] 
> >> https://www.postgresql.org/message-id/20230906004524.2fd6ee049f8a6c6f2690b99c%40sraoss.co.jp
> >> [3] https://learn.microsoft.com/en-us/windows/console/handlerroutine
> > 
> > I found that --disable-thread-safety option was removed in 68a4b58eca0329.
> > So, I removed codes involving ENABLE_THREAD_SAFETY from the patch.
> > 
> > Also, I wrote a commit log draft.
> 
> > Previously, Ctrl+C during benchmark killed pgbench immediately,
> > but queries running at that time were not cancelled.
> 
> Better to mention explicitely that queries keep on running on the
> backend. What about this?
> 
> Previously, Ctrl+C during benchmark killed pgbench immediately, but
> queries were not canceled and they keep on running on the backend
> until they tried to send the result to pgbench.

Thank you for your comments. I agree with you, so I fixed the message
as your suggestion.

> > The commit
> > fixes this so that cancel requests are sent for all connections
> > before pgbench exits.
> 
> "sent for" -> "sent to"

Fixed.

> > Attached is the updated version, v4.
> 
> +/* send cancel requests to all connections */
> +static void
> +cancel_all()
> +{
> +     for (int i = 0; i < nclients; i++)
> +     {
> +             char errbuf[1];
> +             if (client_states[i].cancel != NULL)
> +                     (void) PQcancel(client_states[i].cancel, errbuf, 
> sizeof(errbuf));
> +     }
> +}
> +
> 
> Why in case of errors from PQCancel the error message is neglected? I
> think it's better to print out the error message in case of error.

Is the message useful for pgbench users? I saw the error is ignored
in pg_dump, for example in bin/pg_dump/parallel.c

        /*
         * Send QueryCancel to leader connection, if enabled.  Ignore errors,
         * there's not much we can do about them anyway.
         */
        if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
            (void) PQcancel(signal_info.myAH->connCancel,
                            errbuf, sizeof(errbuf));


> +      * On non-Windows, any callback function is not set. When SIGINT is
> +      * received, CancelRequested is just set, and only thread #0 is 
> interrupted
> +      * and returns from waiting input from the backend. After that, the 
> thread
> +      * sends cancel requests to all benchmark queries.
> 
> The second line is a little bit long according to the coding
> standard. Fix like this?
> 
>        * On non-Windows, any callback function is not set. When SIGINT is
>        * received, CancelRequested is just set, and only thread #0 is
>        * interrupted and returns from waiting input from the backend. After
>        * that, the thread sends cancel requests to all benchmark queries.

Fixed.

The attached is the updated patch, v5.

Regards,
Yugo Nagata

> Best reagards,
> --
> Tatsuo Ishii
> SRA OSS LLC
> English: http://www.sraoss.co.jp/index_en/
> Japanese:http://www.sraoss.co.jp


-- 
Yugo NAGATA <nag...@sraoss.co.jp>
>From a42e6d47b27f2e91a02c5d934509070bd41bf79b Mon Sep 17 00:00:00 2001
From: Yugo Nagata <nag...@sraoss.co.jp>
Date: Mon, 24 Jul 2023 21:53:28 +0900
Subject: [PATCH v5] Allow pgbnech to cancel queries during benchmark

Previously, Ctrl+C during benchmark killed pgbench immediately,
but queries were not cancelled nd they keep on running on the
backend until they tried to send the result to pgbench.
The commit fixes this so that cancel requests are sent to all
connections before pgbench exits.

In thread #0, setup_cancel_handler is called before the benchmark
so that CancelRequested is set when SIGINT is sent. When SIGINT
is sent during the benchmark, on non-Windows, thread #0 will be
interrupted, return from I/O wait, and send cancel requests to
all connections. After queries are cancelled, other threads also
be interrupted and pgbench will exit at the end. On Windows, cancel
requests are sent in the callback function specified by
setup_cancel_hander.
---
 src/bin/pgbench/pgbench.c                    | 89 ++++++++++++++++++++
 src/bin/pgbench/t/001_pgbench_with_server.pl | 42 +++++++++
 2 files changed, 131 insertions(+)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 2574454839..69ccf0e351 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -596,6 +596,7 @@ typedef enum
 typedef struct
 {
 	PGconn	   *con;			/* connection handle to DB */
+	PGcancel   *cancel;			/* query cancel */
 	int			id;				/* client No. */
 	ConnectionStateEnum state;	/* state machine's current state. */
 	ConditionalStack cstack;	/* enclosing conditionals state */
@@ -638,6 +639,8 @@ typedef struct
 								 * here */
 } CState;
 
+CState	*client_states;		/* status of all clients */
+
 /*
  * Thread state
  */
@@ -837,6 +840,10 @@ static void add_socket_to_set(socket_set *sa, int fd, int idx);
 static int	wait_on_socket_set(socket_set *sa, int64 usecs);
 static bool socket_has_input(socket_set *sa, int fd, int idx);
 
+#ifdef WIN32
+static void pgbench_cancel_callback(void);
+#endif
+
 /* callback used to build rows for COPY during data loading */
 typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
 
@@ -3646,6 +3653,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 						st->state = CSTATE_ABORTED;
 						break;
 					}
+					st->cancel = PQgetCancel(st->con);
 
 					/* reset now after connection */
 					now = pg_time_now();
@@ -4677,6 +4685,18 @@ disconnect_all(CState *state, int length)
 		finishCon(&state[i]);
 }
 
+/* send cancel requests to all connections */
+static void
+cancel_all()
+{
+	for (int i = 0; i < nclients; i++)
+	{
+		char errbuf[1];
+		if (client_states[i].cancel != NULL)
+			(void) PQcancel(client_states[i].cancel, errbuf, sizeof(errbuf));
+	}
+}
+
 /*
  * Remove old pgbench tables, if any exist
  */
@@ -7153,6 +7173,9 @@ main(int argc, char **argv)
 		}
 	}
 
+	/* enable threads to access the status of all clients */
+	client_states = state;
+
 	/* other CState initializations */
 	for (i = 0; i < nclients; i++)
 	{
@@ -7365,6 +7388,37 @@ threadRun(void *arg)
 	StatsData	last,
 				aggs;
 
+	/*
+	 * Query cancellation is handled only in thread #0.
+	 *
+	 * On Windows, a callback function is set in which query cancel requests
+	 * are sent to all benchmark queries running in the backend.
+	 *
+	 * On non-Windows, any callback function is not set. When SIGINT is
+	 * received, CancelRequested is just set, and only thread #0 is
+	 * interrupted and returns from waiting input from the backend. After
+	 * that, the thread sends cancel requests to all benchmark queries.
+	 */
+	if (thread->tid == 0)
+#ifdef WIN32
+		setup_cancel_handler(pgbench_cancel_callback);
+#else
+		setup_cancel_handler(NULL);
+#endif
+
+#ifndef WIN32
+	if (thread->tid > 0)
+	{
+		sigset_t	sigint_sigset;
+		sigset_t	osigset;
+		sigemptyset(&sigint_sigset);
+		sigaddset(&sigint_sigset, SIGINT);
+
+		/* Block SIGINT in all threads except one. */
+		pthread_sigmask(SIG_BLOCK, &sigint_sigset, &osigset);
+	}
+#endif
+
 	/* open log file if requested */
 	if (use_log)
 	{
@@ -7407,6 +7461,7 @@ threadRun(void *arg)
 				pg_fatal("could not create connection for client %d",
 						 state[i].id);
 			}
+			state[i].cancel = PQgetCancel(state[i].con);
 		}
 	}
 
@@ -7434,6 +7489,26 @@ threadRun(void *arg)
 		pg_time_usec_t min_usec;
 		pg_time_usec_t now = 0; /* set this only if needed */
 
+		/*
+		 * If pgbench is cancelled, send cancel requests to all connections
+		 * and exit the benchmark.
+		 *
+		 * Note that only thread #0 can be interrupted by SIGINT while waiting
+		 * the result from the backend. Other threads will return from waiting
+		 * just after queries they running are cancelled by thread #0.
+		 *
+		 * On Windows, cancel requests are sent in the callback function, so
+		 * do nothing but exit the benchmark.
+		 */
+		if (CancelRequested)
+		{
+#ifndef WIN32
+			if (thread->tid == 0)
+				cancel_all();
+#endif
+			goto done;
+		}
+
 		/*
 		 * identify which client sockets should be checked for input, and
 		 * compute the nearest time (if any) at which we need to wake up.
@@ -7657,6 +7732,8 @@ finishCon(CState *st)
 	{
 		PQfinish(st->con);
 		st->con = NULL;
+		PQfreeCancel(st->cancel);
+		st->cancel = NULL;
 	}
 }
 
@@ -7883,3 +7960,15 @@ socket_has_input(socket_set *sa, int fd, int idx)
 }
 
 #endif							/* POLL_USING_SELECT */
+
+#ifdef WIN32
+/*
+ * query cancellation callback for Windows
+ */
+static void
+pgbench_cancel_callback(void)
+{
+	/* send cancel requests to all connections */
+	cancel_all();
+}
+#endif
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index fc57facf9e..dd065016ae 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -7,6 +7,7 @@ use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
+use Time::HiRes qw(usleep);
 
 # Check the initial state of the data generated.  Tables for tellers and
 # branches use NULL for their filler attribute.  The table accounts uses
@@ -1502,6 +1503,47 @@ update counter set i = i+1 returning i \gset
 # Clean up
 $node->safe_psql('postgres', 'DROP TABLE counter;');
 
+# Test query canceling by sending SIGINT to a running pgbench
+SKIP:
+{
+	skip "sending SIGINT on Windows terminates the test itself", 3
+	  if $windows_os;
+
+	my ($stdin, $stdout, $stderr, @file);
+
+	@file = $node->_pgbench_make_files(
+		{
+			'003_pgbench_cancel' => qq{
+select pg_sleep($PostgreSQL::Test::Utils::timeout_default);
+		}});
+
+	local %ENV = $node->_get_env();
+
+	my $h = IPC::Run::start(
+		[ 'pgbench', '-c', '2', '-j', '2',
+		  '-T', "$PostgreSQL::Test::Utils::timeout_default", @file ],
+		\$stdin, \$stdout, \$stderr);
+
+	$node->poll_query_until('postgres',
+		q{SELECT (SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep') = 2;}
+	) or die "timed out";
+
+	# Send cancel request
+	$h->signal('INT');
+
+	my $result = finish $h;
+
+	ok(!$result, 'pgbench failed as expected');
+	like(
+		$stderr,
+		qr/Run was aborted; the above results are incomplete/,
+		'pgbench was canceled');
+
+	is($node->safe_psql('postgres',
+		q{SELECT count(*) FROM pg_stat_activity WHERE query ~ '^select pg_sleep'}),
+		'0', 'all queries were canceled');
+}
+
 # done
 $node->safe_psql('postgres', 'DROP TABLESPACE regress_pgbench_tap_1_ts');
 $node->stop;
-- 
2.25.1

Reply via email to