On 2024-06-17 Mo 10:01 AM, Andrew Dunstan wrote:


I agree with you that falling back on BackgroundPsql is not a terribly
satisfactory solution.
I'm somewhat doubtful we'll just agree on making FFI::Platypus a hard
dependency, but if we agree to do so...



Maybe not. If so your other suggestion of a small XS wrapper might make sense.


Here's the latest version of this patch. It removes all use of background_psql(). Instead it uses libpq's async interface, which seems to me far more robust. There is one remaining use of interactive_psql(), but that's reasonable as it's used for testing psql itself.

I spent yesterday creating an XS wrapper for just the 19 libpq functions used in Session.pm. It's pretty simple. I have it passing a very basic test, but haven't tried plugging it into Session.pm yet.


cheers


andrew



--
Andrew Dunstan
EDB: https://www.enterprisedb.com
diff --git a/contrib/amcheck/t/001_verify_heapam.pl b/contrib/amcheck/t/001_verify_heapam.pl
index 481e4dbe4f..f8217777f9 100644
--- a/contrib/amcheck/t/001_verify_heapam.pl
+++ b/contrib/amcheck/t/001_verify_heapam.pl
@@ -5,6 +5,7 @@ use strict;
 use warnings FATAL => 'all';
 
 use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Session;
 use PostgreSQL::Test::Utils;
 
 use Test::More;
@@ -18,7 +19,9 @@ $node = PostgreSQL::Test::Cluster->new('test');
 $node->init;
 $node->append_conf('postgresql.conf', 'autovacuum=off');
 $node->start;
-$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+my $session = PostgreSQL::Test::Session->new(node => $node);
+
+$session->do(q(CREATE EXTENSION amcheck));
 
 #
 # Check a table with data loaded but no corruption, freezing, etc.
@@ -49,7 +52,7 @@ detects_heap_corruption(
 # Check a corrupt table with all-frozen data
 #
 fresh_test_table('test');
-$node->safe_psql('postgres', q(VACUUM (FREEZE, DISABLE_PAGE_SKIPPING) test));
+$session->do(q(VACUUM (FREEZE, DISABLE_PAGE_SKIPPING) test));
 detects_no_corruption("verify_heapam('test')",
 	"all-frozen not corrupted table");
 corrupt_first_page('test');
@@ -81,7 +84,7 @@ sub relation_filepath
 	my ($relname) = @_;
 
 	my $pgdata = $node->data_dir;
-	my $rel = $node->safe_psql('postgres',
+	my $rel = $session->query_oneval(
 		qq(SELECT pg_relation_filepath('$relname')));
 	die "path not found for relation $relname" unless defined $rel;
 	return "$pgdata/$rel";
@@ -92,8 +95,8 @@ sub fresh_test_table
 {
 	my ($relname) = @_;
 
-	return $node->safe_psql(
-		'postgres', qq(
+	return $session->do(
+		qq(
 		DROP TABLE IF EXISTS $relname CASCADE;
 		CREATE TABLE $relname (a integer, b text);
 		ALTER TABLE $relname SET (autovacuum_enabled=false);
@@ -117,8 +120,8 @@ sub fresh_test_sequence
 {
 	my ($seqname) = @_;
 
-	return $node->safe_psql(
-		'postgres', qq(
+	return $session->do(
+		qq(
 		DROP SEQUENCE IF EXISTS $seqname CASCADE;
 		CREATE SEQUENCE $seqname
 			INCREMENT BY 13
@@ -134,8 +137,8 @@ sub advance_test_sequence
 {
 	my ($seqname) = @_;
 
-	return $node->safe_psql(
-		'postgres', qq(
+	return $session->query_oneval(
+		qq(
 		SELECT nextval('$seqname');
 	));
 }
@@ -145,10 +148,7 @@ sub set_test_sequence
 {
 	my ($seqname) = @_;
 
-	return $node->safe_psql(
-		'postgres', qq(
-		SELECT setval('$seqname', 102);
-	));
+	return $session->query_oneval(qq(SELECT setval('$seqname', 102)));
 }
 
 # Call SQL functions to reset the sequence
@@ -156,8 +156,8 @@ sub reset_test_sequence
 {
 	my ($seqname) = @_;
 
-	return $node->safe_psql(
-		'postgres', qq(
+	return $session->do(
+		qq(
 		ALTER SEQUENCE $seqname RESTART WITH 51
 	));
 }
@@ -169,6 +169,7 @@ sub corrupt_first_page
 	my ($relname) = @_;
 	my $relpath = relation_filepath($relname);
 
+	$session->close;
 	$node->stop;
 
 	my $fh;
@@ -191,6 +192,7 @@ sub corrupt_first_page
 	  or BAIL_OUT("close failed: $!");
 
 	$node->start;
+	$session->reconnect;
 }
 
 sub detects_heap_corruption
@@ -216,7 +218,7 @@ sub detects_corruption
 
 	my ($function, $testname, @re) = @_;
 
-	my $result = $node->safe_psql('postgres', qq(SELECT * FROM $function));
+	my $result = $session->query_tuples(qq(SELECT * FROM $function));
 	like($result, $_, $testname) for (@re);
 }
 
@@ -226,7 +228,7 @@ sub detects_no_corruption
 
 	my ($function, $testname) = @_;
 
-	my $result = $node->safe_psql('postgres', qq(SELECT * FROM $function));
+	my $result = $session->query_tuples(qq(SELECT * FROM $function));
 	is($result, '', $testname);
 }
 
diff --git a/contrib/amcheck/t/003_cic_2pc.pl b/contrib/amcheck/t/003_cic_2pc.pl
index fc314b8524..ff345f36ac 100644
--- a/contrib/amcheck/t/003_cic_2pc.pl
+++ b/contrib/amcheck/t/003_cic_2pc.pl
@@ -36,28 +36,29 @@ $node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
 # statements.
 #
 
-my $main_h = $node->background_psql('postgres');
+my $main_h = PostgreSQL::Test::Session->new(node=>$node);
 
-$main_h->query_safe(
+$main_h->do_async(
 	q(
 BEGIN;
 INSERT INTO tbl VALUES(0);
 ));
 
-my $cic_h = $node->background_psql('postgres');
+my $cic_h = PostgreSQL::Test::Session->new(node=>$node);
 
-$cic_h->query_until(
-	qr/start/, q(
-\echo start
+$cic_h->do_async(
+	q(
 CREATE INDEX CONCURRENTLY idx ON tbl(i);
 ));
 
-$main_h->query_safe(
+$main_h->wait_for_completion;
+$main_h->do_async(
 	q(
 PREPARE TRANSACTION 'a';
 ));
 
-$main_h->query_safe(
+$main_h->wait_for_completion;
+$main_h->do_async(
 	q(
 BEGIN;
 INSERT INTO tbl VALUES(0);
@@ -65,7 +66,8 @@ INSERT INTO tbl VALUES(0);
 
 $node->safe_psql('postgres', q(COMMIT PREPARED 'a';));
 
-$main_h->query_safe(
+$main_h->wait_for_completion;
+$main_h->do_async(
 	q(
 PREPARE TRANSACTION 'b';
 BEGIN;
@@ -74,14 +76,14 @@ INSERT INTO tbl VALUES(0);
 
 $node->safe_psql('postgres', q(COMMIT PREPARED 'b';));
 
-$main_h->query_safe(
-	q(
-PREPARE TRANSACTION 'c';
-COMMIT PREPARED 'c';
-));
+$main_h->wait_for_completion;
+$main_h->do(
+	q(PREPARE TRANSACTION 'c';),
+	q(COMMIT PREPARED 'c';));
 
-$main_h->quit;
-$cic_h->quit;
+$main_h->close;
+$cic_h->wait_for_completion;
+$cic_h->close;
 
 $result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
 is($result, '0', 'bt_index_check after overlapping 2PC');
@@ -102,16 +104,16 @@ PREPARE TRANSACTION 'persists_forever';
 ));
 $node->restart;
 
-my $reindex_h = $node->background_psql('postgres');
-$reindex_h->query_until(
-	qr/start/, q(
-\echo start
+my $reindex_h = PostgreSQL::Test::Session->new(node => $node);
+$reindex_h->do_async(
+	q(
 DROP INDEX CONCURRENTLY idx;
 CREATE INDEX CONCURRENTLY idx ON tbl(i);
 ));
 
 $node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'");
-$reindex_h->quit;
+$reindex_h->wait_for_completion;
+$reindex_h->close;
 $result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
 is($result, '0', 'bt_index_check after 2PC and restart');
 
diff --git a/contrib/bloom/t/001_wal.pl b/contrib/bloom/t/001_wal.pl
index 61f5641d9e..280e95eb2b 100644
--- a/contrib/bloom/t/001_wal.pl
+++ b/contrib/bloom/t/001_wal.pl
@@ -5,11 +5,14 @@
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Session;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
 my $node_primary;
 my $node_standby;
+my $session_primary;
+my $session_standby;
 
 # Run few queries on both primary and standby and check their results match.
 sub test_index_replay
@@ -21,20 +24,18 @@ sub test_index_replay
 	# Wait for standby to catch up
 	$node_primary->wait_for_catchup($node_standby);
 
-	my $queries = qq(SET enable_seqscan=off;
-SET enable_bitmapscan=on;
-SET enable_indexscan=on;
-SELECT * FROM tst WHERE i = 0;
-SELECT * FROM tst WHERE i = 3;
-SELECT * FROM tst WHERE t = 'b';
-SELECT * FROM tst WHERE t = 'f';
-SELECT * FROM tst WHERE i = 3 AND t = 'c';
-SELECT * FROM tst WHERE i = 7 AND t = 'e';
-);
+	my @queries = (
+		"SELECT * FROM tst WHERE i = 0",
+		"SELECT * FROM tst WHERE i = 3",
+		"SELECT * FROM tst WHERE t = 'b'",
+		"SELECT * FROM tst WHERE t = 'f'",
+		"SELECT * FROM tst WHERE i = 3 AND t = 'c'",
+		"SELECT * FROM tst WHERE i = 7 AND t = 'e'",
+	   );
 
 	# Run test queries and compare their result
-	my $primary_result = $node_primary->safe_psql("postgres", $queries);
-	my $standby_result = $node_standby->safe_psql("postgres", $queries);
+	my $primary_result = $session_primary->query_tuples(@queries);
+	my $standby_result = $session_standby->query_tuples(@queries);
 
 	is($primary_result, $standby_result, "$test_name: query result matches");
 	return;
@@ -55,13 +56,24 @@ $node_standby->init_from_backup($node_primary, $backup_name,
 	has_streaming => 1);
 $node_standby->start;
 
+# Create and initialize the sessions
+$session_primary = PostgreSQL::Test::Session->new(node => $node_primary);
+$session_standby = PostgreSQL::Test::Session->new(node => $node_standby);
+my $initset = q[
+   SET enable_seqscan=off;
+   SET enable_bitmapscan=on;
+   SET enable_indexscan=on;
+];
+$session_primary->do($initset);
+$session_standby->do($initset);
+
 # Create some bloom index on primary
-$node_primary->safe_psql("postgres", "CREATE EXTENSION bloom;");
-$node_primary->safe_psql("postgres", "CREATE TABLE tst (i int4, t text);");
-$node_primary->safe_psql("postgres",
+$session_primary->do("CREATE EXTENSION bloom;");
+$session_primary->do("CREATE TABLE tst (i int4, t text);");
+$session_primary->do(
 	"INSERT INTO tst SELECT i%10, substr(encode(sha256(i::text::bytea), 'hex'), 1, 1) FROM generate_series(1,10000) i;"
 );
-$node_primary->safe_psql("postgres",
+$session_primary->do(
 	"CREATE INDEX bloomidx ON tst USING bloom (i, t) WITH (col1 = 3);");
 
 # Test that queries give same result
diff --git a/contrib/pg_visibility/t/001_concurrent_transaction.pl b/contrib/pg_visibility/t/001_concurrent_transaction.pl
index c31d041757..582f107a99 100644
--- a/contrib/pg_visibility/t/001_concurrent_transaction.pl
+++ b/contrib/pg_visibility/t/001_concurrent_transaction.pl
@@ -17,10 +17,10 @@ $node->start;
 
 # Setup another database
 $node->safe_psql("postgres", "CREATE DATABASE other_database;\n");
-my $bsession = $node->background_psql('other_database');
+my $bsession = PostgreSQL::Test::Session->new(node => $node, dbname => 'other_database');
 
 # Run a concurrent transaction
-$bsession->query_safe(
+$bsession->do(
 	qq[
 	BEGIN;
 	SELECT txid_current();
@@ -40,8 +40,8 @@ my $result = $node->safe_psql("postgres",
 ok($result eq "", "pg_check_visible() detects no errors");
 
 # Shutdown
-$bsession->query_safe("COMMIT;");
-$bsession->quit;
+$bsession->do("COMMIT;");
+$bsession->close;
 $node->stop;
 
 done_testing();
diff --git a/src/bin/pg_amcheck/t/004_verify_heapam.pl b/src/bin/pg_amcheck/t/004_verify_heapam.pl
index f6d2c5f787..c8036249ae 100644
--- a/src/bin/pg_amcheck/t/004_verify_heapam.pl
+++ b/src/bin/pg_amcheck/t/004_verify_heapam.pl
@@ -5,6 +5,7 @@ use strict;
 use warnings FATAL => 'all';
 
 use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Session;
 use PostgreSQL::Test::Utils;
 
 use Test::More;
@@ -190,16 +191,17 @@ $node->append_conf('postgresql.conf', 'max_prepared_transactions=10');
 $node->start;
 my $port = $node->port;
 my $pgdata = $node->data_dir;
-$node->safe_psql('postgres', "CREATE EXTENSION amcheck");
-$node->safe_psql('postgres', "CREATE EXTENSION pageinspect");
+my $session = PostgreSQL::Test::Session->new(node => $node);
+$session->do("CREATE EXTENSION amcheck");
+$session->do("CREATE EXTENSION pageinspect");
 
 # Get a non-zero datfrozenxid
-$node->safe_psql('postgres', qq(VACUUM FREEZE));
+$session->do(qq(VACUUM FREEZE));
 
 # Create the test table with precisely the schema that our corruption function
 # expects.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
 		CREATE TABLE public.test (a BIGINT, b TEXT, c TEXT);
 		ALTER TABLE public.test SET (autovacuum_enabled=false);
 		ALTER TABLE public.test ALTER COLUMN c SET STORAGE EXTERNAL;
@@ -209,14 +211,15 @@ $node->safe_psql(
 # We want (0 < datfrozenxid < test.relfrozenxid).  To achieve this, we freeze
 # an otherwise unused table, public.junk, prior to inserting data and freezing
 # public.test
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
 		CREATE TABLE public.junk AS SELECT 'junk'::TEXT AS junk_column;
 		ALTER TABLE public.junk SET (autovacuum_enabled=false);
-		VACUUM FREEZE public.junk
-	));
+	),
+	'VACUUM FREEZE public.junk'
+);
 
-my $rel = $node->safe_psql('postgres',
+my $rel = $session->query_oneval(
 	qq(SELECT pg_relation_filepath('public.test')));
 my $relpath = "$pgdata/$rel";
 
@@ -229,23 +232,24 @@ my $ROWCOUNT_BASIC = 16;
 
 # First insert data needed for tests unrelated to update chain validation.
 # Then freeze the page. These tuples are at offset numbers 1 to 16.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
 	INSERT INTO public.test (a, b, c)
 		SELECT
 			x'DEADF9F9DEADF9F9'::bigint,
 			'abcdefg',
 			repeat('w', 10000)
 	FROM generate_series(1, $ROWCOUNT_BASIC);
-	VACUUM FREEZE public.test;)
+    ),
+	'VACUUM FREEZE public.test'
 );
 
 # Create some simple HOT update chains for line pointer validation. After
 # the page is HOT pruned, we'll have two redirects line pointers each pointing
 # to a tuple. We'll then change the second redirect to point to the same
 # tuple as the first one and verify that we can detect corruption.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
 		INSERT INTO public.test (a, b, c)
 			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg',
 					 generate_series(1,2)); -- offset numbers 17 and 18
@@ -254,8 +258,8 @@ $node->safe_psql(
 	));
 
 # Create some more HOT update chains.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
 		INSERT INTO public.test (a, b, c)
 			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg',
 					 generate_series(3,6)); -- offset numbers 21 through 24
@@ -264,25 +268,30 @@ $node->safe_psql(
 	));
 
 # Negative test case of HOT-pruning with aborted tuple.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
 		BEGIN;
 		UPDATE public.test SET c = 'a' WHERE c = '5'; -- offset number 27
 		ABORT;
-		VACUUM FREEZE public.test;
-	));
+       ),
+	   'VACUUM FREEZE public.test;',
+	);
 
 # Next update on any tuple will be stored at the same place of tuple inserted
 # by aborted transaction. This should not cause the table to appear corrupt.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
+        BEGIN;
 		UPDATE public.test SET c = 'a' WHERE c = '6'; -- offset number 27 again
-		VACUUM FREEZE public.test;
-	));
+        COMMIT;
+	),
+	'VACUUM FREEZE public.test;',
+   );
 
 # Data for HOT chain validation, so not calling VACUUM FREEZE.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
+        BEGIN;
 		INSERT INTO public.test (a, b, c)
 			VALUES ( x'DEADF9F9DEADF9F9'::bigint, 'abcdefg',
 					 generate_series(7,15)); -- offset numbers 28 to 36
@@ -293,11 +302,12 @@ $node->safe_psql(
 		UPDATE public.test SET c = 'a' WHERE c = '13'; -- offset number 41
 		UPDATE public.test SET c = 'a' WHERE c = '14'; -- offset number 42
 		UPDATE public.test SET c = 'a' WHERE c = '15'; -- offset number 43
+        COMMIT;
 	));
 
 # Need one aborted transaction to test corruption in HOT chains.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
 		BEGIN;
 		UPDATE public.test SET c = 'a' WHERE c = '9'; -- offset number 44
 		ABORT;
@@ -306,19 +316,19 @@ $node->safe_psql(
 # Need one in-progress transaction to test few corruption in HOT chains.
 # We are creating PREPARE TRANSACTION here as these will not be aborted
 # even if we stop the node.
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
 		BEGIN;
 		PREPARE TRANSACTION 'in_progress_tx';
 	));
-my $in_progress_xid = $node->safe_psql(
-	'postgres', qq(
+my $in_progress_xid = $session->query_oneval(
+	qq(
 		SELECT transaction FROM pg_prepared_xacts;
 	));
 
-my $relfrozenxid = $node->safe_psql('postgres',
+my $relfrozenxid = $session->query_oneval(
 	q(select relfrozenxid from pg_class where relname = 'test'));
-my $datfrozenxid = $node->safe_psql('postgres',
+my $datfrozenxid = $session->query_oneval(
 	q(select datfrozenxid from pg_database where datname = 'postgres'));
 
 # Sanity check that our 'test' table has a relfrozenxid newer than the
@@ -326,6 +336,7 @@ my $datfrozenxid = $node->safe_psql('postgres',
 # first normal xid.  We rely on these invariants in some of our tests.
 if ($datfrozenxid <= 3 || $datfrozenxid >= $relfrozenxid)
 {
+	$session->close;
 	$node->clean_node;
 	plan skip_all =>
 	  "Xid thresholds not as expected: got datfrozenxid = $datfrozenxid, relfrozenxid = $relfrozenxid";
@@ -334,17 +345,21 @@ if ($datfrozenxid <= 3 || $datfrozenxid >= $relfrozenxid)
 
 # Find where each of the tuples is located on the page. If a particular
 # line pointer is a redirect rather than a tuple, we record the offset as -1.
-my @lp_off = split '\n', $node->safe_psql(
-	'postgres', qq(
+my $lp_off_res = $session->query(
+	qq(
 	    SELECT CASE WHEN lp_flags = 2 THEN -1 ELSE lp_off END
 	    FROM heap_page_items(get_raw_page('test', 'main', 0))
     )
-);
+   );
+my @lp_off;
+push(@lp_off, $_->[0]) foreach @{$lp_off_res->{rows}};
+
 scalar @lp_off == $ROWCOUNT or BAIL_OUT("row offset counts mismatch");
 
 # Sanity check that our 'test' table on disk layout matches expectations.  If
 # this is not so, we will have to skip the test until somebody updates the test
 # to work on this platform.
+$session->close;
 $node->stop;
 my $file;
 open($file, '+<', $relpath)
@@ -750,17 +765,19 @@ for (my $tupidx = 0; $tupidx < $ROWCOUNT; $tupidx++)
 close($file)
   or BAIL_OUT("close failed: $!");
 $node->start;
+$session->reconnect;
 
 # Run pg_amcheck against the corrupt table with epoch=0, comparing actual
 # corruption messages against the expected messages
 $node->command_checks_all(
 	[ 'pg_amcheck', '--no-dependent-indexes', '-p', $port, 'postgres' ],
 	2, [@expected], [], 'Expected corruption message output');
-$node->safe_psql(
-	'postgres', qq(
+$session->do(
+	qq(
                         COMMIT PREPARED 'in_progress_tx';
         ));
 
+$session->close;
 $node->teardown_node;
 $node->clean_node;
 
diff --git a/src/test/authentication/t/001_password.pl b/src/test/authentication/t/001_password.pl
index 87e180af3d..52c2dfb58d 100644
--- a/src/test/authentication/t/001_password.pl
+++ b/src/test/authentication/t/001_password.pl
@@ -111,36 +111,18 @@ my $res = $node->safe_psql(
 	 WHERE rolname = 'scram_role_iter'");
 is($res, 'SCRAM-SHA-256$1024:', 'scram_iterations in server side ROLE');
 
-# If we don't have IO::Pty, forget it, because IPC::Run depends on that
-# to support pty connections. Also skip if IPC::Run isn't at least 0.98
-# as earlier version cause the session to time out.
-SKIP:
-{
-	skip "IO::Pty and IPC::Run >= 0.98 required", 1
-	  unless eval { require IO::Pty; IPC::Run->VERSION('0.98'); };
+# set password using PQchangePassword
+my $session = PostgreSQL::Test::Session->new (node => $node);
 
-	# Alter the password on the created role using \password in psql to ensure
-	# that clientside password changes use the scram_iterations value when
-	# calculating SCRAM secrets.
-	my $session = $node->interactive_psql('postgres');
-
-	$session->set_query_timer_restart();
-	$session->query("SET password_encryption='scram-sha-256';");
-	$session->query("SET scram_iterations=42;");
-	$session->query_until(qr/Enter new password/,
-		"\\password scram_role_iter\n");
-	$session->query_until(qr/Enter it again/, "pass\n");
-	$session->query_until(qr/postgres=# /, "pass\n");
-	$session->quit;
-
-	$res = $node->safe_psql(
-		'postgres',
+$session->do("SET password_encryption='scram-sha-256';",
+			 "SET scram_iterations=42;");
+$res = $session->set_password("scram_role_iter","pass");
+is($res->{status}, 1, "set password ok");
+$res = $session->query_oneval(
 		"SELECT substr(rolpassword,1,17)
 		 FROM pg_authid
 		 WHERE rolname = 'scram_role_iter'");
-	is($res, 'SCRAM-SHA-256$42:',
-		'scram_iterations in psql \password command');
-}
+is($res, 'SCRAM-SHA-256$42:', 'scram_iterations correct');
 
 # Create a database to test regular expression.
 $node->safe_psql('postgres', "CREATE database regex_testdb;");
diff --git a/src/test/modules/test_misc/t/005_timeouts.pl b/src/test/modules/test_misc/t/005_timeouts.pl
index 9e1ff9e5c1..721721f776 100644
--- a/src/test/modules/test_misc/t/005_timeouts.pl
+++ b/src/test/modules/test_misc/t/005_timeouts.pl
@@ -33,24 +33,16 @@ $node->safe_psql('postgres', 'CREATE EXTENSION injection_points;');
 $node->safe_psql('postgres',
 	"SELECT injection_points_attach('transaction-timeout', 'wait');");
 
-my $psql_session = $node->background_psql('postgres');
+my $psql_session = PostgreSQL::Test::Session->new(node => $node);
 
-# The following query will generate a stream of SELECT 1 queries. This is done
-# so to exercise transaction timeout in the presence of short queries.
-# Note: the interval value is parsed with locale-aware strtod()
-$psql_session->query_until(
-	qr/starting_bg_psql/,
-	sprintf(
-		q(\echo starting_bg_psql
-		SET transaction_timeout to '10ms';
-		BEGIN;
-		SELECT 1 \watch %g
-		\q
-), 0.001));
+$psql_session->do("SET transaction_timeout to '10ms';");
+
+$psql_session->do_async("BEGIN; DO ' begin loop PERFORM pg_sleep(0.001); end loop; end ';");
 
 # Wait until the backend enters the timeout injection point. Will get an error
 # here if anything goes wrong.
 $node->wait_for_event('client backend', 'transaction-timeout');
+pass("got transaction timeout event");
 
 my $log_offset = -s $node->logfile;
 
@@ -61,11 +53,9 @@ $node->safe_psql('postgres',
 # Check that the timeout was logged.
 $node->wait_for_log('terminating connection due to transaction timeout',
 	$log_offset);
+pass("got transaction timeout log");
 
-# If we send \q with $psql_session->quit the command can be sent to the session
-# already closed. So \q is in initial script, here we only finish IPC::Run.
-$psql_session->{run}->finish;
-
+$psql_session->close;
 
 #
 # 2. Test of the idle in transaction timeout
@@ -76,10 +66,8 @@ $node->safe_psql('postgres',
 );
 
 # We begin a transaction and the hand on the line
-$psql_session = $node->background_psql('postgres');
-$psql_session->query_until(
-	qr/starting_bg_psql/, q(
-   \echo starting_bg_psql
+$psql_session->reconnect;
+$psql_session->do(q(
    SET idle_in_transaction_session_timeout to '10ms';
    BEGIN;
 ));
@@ -87,6 +75,7 @@ $psql_session->query_until(
 # Wait until the backend enters the timeout injection point.
 $node->wait_for_event('client backend',
 	'idle-in-transaction-session-timeout');
+pass("got idle in transaction timeout event");
 
 $log_offset = -s $node->logfile;
 
@@ -97,8 +86,9 @@ $node->safe_psql('postgres',
 # Check that the timeout was logged.
 $node->wait_for_log(
 	'terminating connection due to idle-in-transaction timeout', $log_offset);
+pass("got idle in transaction timeout log");
 
-ok($psql_session->quit);
+$psql_session->close;
 
 
 #
@@ -108,15 +98,14 @@ $node->safe_psql('postgres',
 	"SELECT injection_points_attach('idle-session-timeout', 'wait');");
 
 # We just initialize the GUC and wait. No transaction is required.
-$psql_session = $node->background_psql('postgres');
-$psql_session->query_until(
-	qr/starting_bg_psql/, q(
-   \echo starting_bg_psql
+$psql_session->reconnect;
+$psql_session->do(q(
    SET idle_session_timeout to '10ms';
 ));
 
 # Wait until the backend enters the timeout injection point.
 $node->wait_for_event('client backend', 'idle-session-timeout');
+pass("got idle session timeout event");
 
 $log_offset = -s $node->logfile;
 
@@ -127,7 +116,8 @@ $node->safe_psql('postgres',
 # Check that the timeout was logged.
 $node->wait_for_log('terminating connection due to idle-session timeout',
 	$log_offset);
+pass("got idle sesion tiemout log");
 
-ok($psql_session->quit);
+$psql_session->close;
 
 done_testing();
diff --git a/src/test/modules/xid_wraparound/t/001_emergency_vacuum.pl b/src/test/modules/xid_wraparound/t/001_emergency_vacuum.pl
index 37550b67a4..dc53a83367 100644
--- a/src/test/modules/xid_wraparound/t/001_emergency_vacuum.pl
+++ b/src/test/modules/xid_wraparound/t/001_emergency_vacuum.pl
@@ -46,17 +46,10 @@ CREATE TABLE autovacuum_disabled(id serial primary key, data text) WITH (autovac
 INSERT INTO autovacuum_disabled(data) SELECT generate_series(1,1000);
 ]);
 
-# Bump the query timeout to avoid false negatives on slow test systems.
-my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
-
 # Start a background session, which holds a transaction open, preventing
 # autovacuum from advancing relfrozenxid and datfrozenxid.
-my $background_psql = $node->background_psql(
-	'postgres',
-	on_error_stop => 0,
-	timeout => $psql_timeout_secs);
-$background_psql->set_query_timer_restart();
-$background_psql->query_safe(
+my $background_psql = PostgreSQL::Test::Session->new(node => $node);
+$background_psql->do(
 	qq[
 	BEGIN;
 	DELETE FROM large WHERE id % 2 = 0;
@@ -89,8 +82,8 @@ my $log_offset = -s $node->logfile;
 
 # Finish the old transaction, to allow vacuum freezing to advance
 # relfrozenxid and datfrozenxid again.
-$background_psql->query_safe(qq[COMMIT]);
-$background_psql->quit;
+$background_psql->do(qq[COMMIT;]);
+$background_psql->close;
 
 # Wait until autovacuum processed all tables and advanced the
 # system-wide oldest-XID.
diff --git a/src/test/modules/xid_wraparound/t/002_limits.pl b/src/test/modules/xid_wraparound/t/002_limits.pl
index c02c287167..1aeb7a644c 100644
--- a/src/test/modules/xid_wraparound/t/002_limits.pl
+++ b/src/test/modules/xid_wraparound/t/002_limits.pl
@@ -30,6 +30,8 @@ $node->append_conf(
 autovacuum = off # run autovacuum only to prevent wraparound
 autovacuum_naptime = 1s
 log_autovacuum_min_duration = 0
+log_connections = on
+log_statement = 'all'
 ]);
 $node->start;
 $node->safe_psql('postgres', 'CREATE EXTENSION xid_wraparound');
@@ -41,16 +43,10 @@ CREATE TABLE wraparoundtest(t text);
 INSERT INTO wraparoundtest VALUES ('start');
 ]);
 
-# Bump the query timeout to avoid false negatives on slow test systems.
-my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
-
 # Start a background session, which holds a transaction open, preventing
 # autovacuum from advancing relfrozenxid and datfrozenxid.
-my $background_psql = $node->background_psql(
-	'postgres',
-	on_error_stop => 0,
-	timeout => $psql_timeout_secs);
-$background_psql->query_safe(
+my $background_psql = PostgreSQL::Test::Session->new(node => $node);
+$background_psql->do(
 	qq[
 	BEGIN;
 	INSERT INTO wraparoundtest VALUES ('oldxact');
@@ -108,8 +104,8 @@ like(
 
 # Finish the old transaction, to allow vacuum freezing to advance
 # relfrozenxid and datfrozenxid again.
-$background_psql->query_safe(qq[COMMIT]);
-$background_psql->quit;
+$background_psql->do(qq[COMMIT;]);
+$background_psql->close;
 
 # VACUUM, to freeze the tables and advance datfrozenxid.
 #
diff --git a/src/test/perl/PostgreSQL/PqFFI.pm b/src/test/perl/PostgreSQL/PqFFI.pm
new file mode 100644
index 0000000000..f9beb6558b
--- /dev/null
+++ b/src/test/perl/PostgreSQL/PqFFI.pm
@@ -0,0 +1,588 @@
+
+############################################
+#
+# FFI wrapper for libpq
+#
+############################################
+package PostgreSQL::PqFFI;
+
+use strict;
+use warnings FATAL => qw(all);
+
+use FFI::Platypus;
+use FFI::CheckLib;
+
+use Exporter qw(import);
+
+our @EXPORT = qw (
+
+  CONNECTION_OK
+  CONNECTION_BAD
+  CONNECTION_STARTED
+  CONNECTION_MADE
+  CONNECTION_AWAITING_RESPONSE
+  CONNECTION_AUTH_OK
+  CONNECTION_SETENV
+  CONNECTION_SSL_STARTUP
+  CONNECTION_NEEDED
+  CONNECTION_CHECK_WRITABLE
+  CONNECTION_CONSUME
+  CONNECTION_GSS_STARTUP
+  CONNECTION_CHECK_TARGET
+  CONNECTION_CHECK_STANDBY
+
+  PGRES_EMPTY_QUERY
+  PGRES_COMMAND_OK
+  PGRES_TUPLES_OK
+  PGRES_COPY_OUT
+  PGRES_COPY_IN
+  PGRES_BAD_RESPONSE
+  PGRES_NONFATAL_ERROR
+  PGRES_FATAL_ERROR
+  PGRES_COPY_BOTH
+  PGRES_SINGLE_TUPLE
+  PGRES_PIPELINE_SYNC
+  PGRES_PIPELINE_ABORTED
+
+  PQPING_OK
+  PQPING_REJECT
+  PQPING_NO_RESPONSE
+  PQPING_NO_ATTEMPT
+
+  PQTRANS_IDLE
+  PQTRANS_ACTIVE
+  PQTRANS_INTRANS
+  PQTRANS_INERROR
+  PQTRANS_UNKNOWN
+
+  BOOLOID
+  BYTEAOID
+  CHAROID
+  NAMEOID
+  INT8OID
+  INT2OID
+  INT2VECTOROID
+  INT4OID
+  TEXTOID
+  OIDOID
+  TIDOID
+  XIDOID
+  CIDOID
+  OIDVECTOROID
+  JSONOID
+  XMLOID
+  XID8OID
+  POINTOID
+  LSEGOID
+  PATHOID
+  BOXOID
+  POLYGONOID
+  LINEOID
+  FLOAT4OID
+  FLOAT8OID
+  UNKNOWNOID
+  CIRCLEOID
+  MONEYOID
+  MACADDROID
+  INETOID
+  CIDROID
+  MACADDR8OID
+  ACLITEMOID
+  BPCHAROID
+  VARCHAROID
+  DATEOID
+  TIMEOID
+  TIMESTAMPOID
+  TIMESTAMPTZOID
+  INTERVALOID
+  TIMETZOID
+  BITOID
+  VARBITOID
+  NUMERICOID
+  REFCURSOROID
+  UUIDOID
+  TSVECTOROID
+  GTSVECTOROID
+  TSQUERYOID
+  JSONBOID
+  JSONPATHOID
+  TXID_SNAPSHOTOID
+  INT4RANGEOID
+  NUMRANGEOID
+  TSRANGEOID
+  TSTZRANGEOID
+  DATERANGEOID
+  INT8RANGEOID
+  INT4MULTIRANGEOID
+  NUMMULTIRANGEOID
+  TSMULTIRANGEOID
+  TSTZMULTIRANGEOID
+  DATEMULTIRANGEOID
+  INT8MULTIRANGEOID
+  RECORDOID
+  RECORDARRAYOID
+  CSTRINGOID
+  VOIDOID
+  TRIGGEROID
+  EVENT_TRIGGEROID
+  BOOLARRAYOID
+  BYTEAARRAYOID
+  CHARARRAYOID
+  NAMEARRAYOID
+  INT8ARRAYOID
+  INT2ARRAYOID
+  INT2VECTORARRAYOID
+  INT4ARRAYOID
+  TEXTARRAYOID
+  OIDARRAYOID
+  TIDARRAYOID
+  XIDARRAYOID
+  CIDARRAYOID
+  OIDVECTORARRAYOID
+  JSONARRAYOID
+  XMLARRAYOID
+  XID8ARRAYOID
+  POINTARRAYOID
+  LSEGARRAYOID
+  PATHARRAYOID
+  BOXARRAYOID
+  POLYGONARRAYOID
+  LINEARRAYOID
+  FLOAT4ARRAYOID
+  FLOAT8ARRAYOID
+  CIRCLEARRAYOID
+  MONEYARRAYOID
+  MACADDRARRAYOID
+  INETARRAYOID
+  CIDRARRAYOID
+  MACADDR8ARRAYOID
+  ACLITEMARRAYOID
+  BPCHARARRAYOID
+  VARCHARARRAYOID
+  DATEARRAYOID
+  TIMEARRAYOID
+  TIMESTAMPARRAYOID
+  TIMESTAMPTZARRAYOID
+  INTERVALARRAYOID
+  TIMETZARRAYOID
+  BITARRAYOID
+  VARBITARRAYOID
+  NUMERICARRAYOID
+  REFCURSORARRAYOID
+  UUIDARRAYOID
+  TSVECTORARRAYOID
+  GTSVECTORARRAYOID
+  TSQUERYARRAYOID
+  JSONBARRAYOID
+  JSONPATHARRAYOID
+  TXID_SNAPSHOTARRAYOID
+  INT4RANGEARRAYOID
+  NUMRANGEARRAYOID
+  TSRANGEARRAYOID
+  TSTZRANGEARRAYOID
+  DATERANGEARRAYOID
+  INT8RANGEARRAYOID
+  INT4MULTIRANGEARRAYOID
+  NUMMULTIRANGEARRAYOID
+  TSMULTIRANGEARRAYOID
+  TSTZMULTIRANGEARRAYOID
+  DATEMULTIRANGEARRAYOID
+  INT8MULTIRANGEARRAYOID
+  CSTRINGARRAYOID
+
+);
+
+# connection status
+
+use constant {
+	CONNECTION_OK => 0,
+	CONNECTION_BAD => 1,
+	# Non-blocking mode only below here
+
+	CONNECTION_STARTED => 2,
+	CONNECTION_MADE => 3,
+	CONNECTION_AWAITING_RESPONSE => 4,
+	CONNECTION_AUTH_OK => 5,
+	CONNECTION_SETENV => 6,
+	CONNECTION_SSL_STARTUP => 7,
+	CONNECTION_NEEDED => 8,
+	CONNECTION_CHECK_WRITABLE => 9,
+	CONNECTION_CONSUME => 10,
+	CONNECTION_GSS_STARTUP => 11,
+	CONNECTION_CHECK_TARGET => 12,
+	CONNECTION_CHECK_STANDBY => 13,
+};
+
+# exec status
+
+use constant {
+	PGRES_EMPTY_QUERY => 0,
+	PGRES_COMMAND_OK => 1,
+	PGRES_TUPLES_OK => 2,
+	PGRES_COPY_OUT => 3,
+	PGRES_COPY_IN => 4,
+	PGRES_BAD_RESPONSE => 5,
+	PGRES_NONFATAL_ERROR => 6,
+	PGRES_FATAL_ERROR => 7,
+	PGRES_COPY_BOTH => 8,
+	PGRES_SINGLE_TUPLE => 9,
+	PGRES_PIPELINE_SYNC => 10,
+	PGRES_PIPELINE_ABORTED => 11,
+};
+
+# ping status
+
+use constant {
+	PQPING_OK => 0,
+	PQPING_REJECT => 1,
+	PQPING_NO_RESPONSE => 2,
+	PQPING_NO_ATTEMPT => 3,
+};
+
+# txn status
+use constant {
+	PQTRANS_IDLE => 0,
+	PQTRANS_ACTIVE => 1,
+	PQTRANS_INTRANS => 2,
+	PQTRANS_INERROR => 3,
+	PQTRANS_UNKNOWN => 4,
+};
+
+# type oids
+use constant {
+	BOOLOID => 16,
+	BYTEAOID => 17,
+	CHAROID => 18,
+	NAMEOID => 19,
+	INT8OID => 20,
+	INT2OID => 21,
+	INT2VECTOROID => 22,
+	INT4OID => 23,
+	TEXTOID => 25,
+	OIDOID => 26,
+	TIDOID => 27,
+	XIDOID => 28,
+	CIDOID => 29,
+	OIDVECTOROID => 30,
+	JSONOID => 114,
+	XMLOID => 142,
+	XID8OID => 5069,
+	POINTOID => 600,
+	LSEGOID => 601,
+	PATHOID => 602,
+	BOXOID => 603,
+	POLYGONOID => 604,
+	LINEOID => 628,
+	FLOAT4OID => 700,
+	FLOAT8OID => 701,
+	UNKNOWNOID => 705,
+	CIRCLEOID => 718,
+	MONEYOID => 790,
+	MACADDROID => 829,
+	INETOID => 869,
+	CIDROID => 650,
+	MACADDR8OID => 774,
+	ACLITEMOID => 1033,
+	BPCHAROID => 1042,
+	VARCHAROID => 1043,
+	DATEOID => 1082,
+	TIMEOID => 1083,
+	TIMESTAMPOID => 1114,
+	TIMESTAMPTZOID => 1184,
+	INTERVALOID => 1186,
+	TIMETZOID => 1266,
+	BITOID => 1560,
+	VARBITOID => 1562,
+	NUMERICOID => 1700,
+	REFCURSOROID => 1790,
+	UUIDOID => 2950,
+	TSVECTOROID => 3614,
+	GTSVECTOROID => 3642,
+	TSQUERYOID => 3615,
+	JSONBOID => 3802,
+	JSONPATHOID => 4072,
+	TXID_SNAPSHOTOID => 2970,
+	INT4RANGEOID => 3904,
+	NUMRANGEOID => 3906,
+	TSRANGEOID => 3908,
+	TSTZRANGEOID => 3910,
+	DATERANGEOID => 3912,
+	INT8RANGEOID => 3926,
+	INT4MULTIRANGEOID => 4451,
+	NUMMULTIRANGEOID => 4532,
+	TSMULTIRANGEOID => 4533,
+	TSTZMULTIRANGEOID => 4534,
+	DATEMULTIRANGEOID => 4535,
+	INT8MULTIRANGEOID => 4536,
+	RECORDOID => 2249,
+	RECORDARRAYOID => 2287,
+	CSTRINGOID => 2275,
+	VOIDOID => 2278,
+	TRIGGEROID => 2279,
+	EVENT_TRIGGEROID => 3838,
+	BOOLARRAYOID => 1000,
+	BYTEAARRAYOID => 1001,
+	CHARARRAYOID => 1002,
+	NAMEARRAYOID => 1003,
+	INT8ARRAYOID => 1016,
+	INT2ARRAYOID => 1005,
+	INT2VECTORARRAYOID => 1006,
+	INT4ARRAYOID => 1007,
+	TEXTARRAYOID => 1009,
+	OIDARRAYOID => 1028,
+	TIDARRAYOID => 1010,
+	XIDARRAYOID => 1011,
+	CIDARRAYOID => 1012,
+	OIDVECTORARRAYOID => 1013,
+	JSONARRAYOID => 199,
+	XMLARRAYOID => 143,
+	XID8ARRAYOID => 271,
+	POINTARRAYOID => 1017,
+	LSEGARRAYOID => 1018,
+	PATHARRAYOID => 1019,
+	BOXARRAYOID => 1020,
+	POLYGONARRAYOID => 1027,
+	LINEARRAYOID => 629,
+	FLOAT4ARRAYOID => 1021,
+	FLOAT8ARRAYOID => 1022,
+	CIRCLEARRAYOID => 719,
+	MONEYARRAYOID => 791,
+	MACADDRARRAYOID => 1040,
+	INETARRAYOID => 1041,
+	CIDRARRAYOID => 651,
+	MACADDR8ARRAYOID => 775,
+	ACLITEMARRAYOID => 1034,
+	BPCHARARRAYOID => 1014,
+	VARCHARARRAYOID => 1015,
+	DATEARRAYOID => 1182,
+	TIMEARRAYOID => 1183,
+	TIMESTAMPARRAYOID => 1115,
+	TIMESTAMPTZARRAYOID => 1185,
+	INTERVALARRAYOID => 1187,
+	TIMETZARRAYOID => 1270,
+	BITARRAYOID => 1561,
+	VARBITARRAYOID => 1563,
+	NUMERICARRAYOID => 1231,
+	REFCURSORARRAYOID => 2201,
+	UUIDARRAYOID => 2951,
+	TSVECTORARRAYOID => 3643,
+	GTSVECTORARRAYOID => 3644,
+	TSQUERYARRAYOID => 3645,
+	JSONBARRAYOID => 3807,
+	JSONPATHARRAYOID => 4073,
+	TXID_SNAPSHOTARRAYOID => 2949,
+	INT4RANGEARRAYOID => 3905,
+	NUMRANGEARRAYOID => 3907,
+	TSRANGEARRAYOID => 3909,
+	TSTZRANGEARRAYOID => 3911,
+	DATERANGEARRAYOID => 3913,
+	INT8RANGEARRAYOID => 3927,
+	INT4MULTIRANGEARRAYOID => 6150,
+	NUMMULTIRANGEARRAYOID => 6151,
+	TSMULTIRANGEARRAYOID => 6152,
+	TSTZMULTIRANGEARRAYOID => 6153,
+	DATEMULTIRANGEARRAYOID => 6155,
+	INT8MULTIRANGEARRAYOID => 6157,
+	CSTRINGARRAYOID => 1263,
+};
+
+
+
+my @procs = qw(
+
+  PQconnectdb
+  PQconnectdbParams
+  PQsetdbLogin
+  PQfinish
+  PQreset
+  PQdb
+  PQuser
+  PQpass
+  PQhost
+  PQhostaddr
+  PQport
+  PQtty
+  PQoptions
+  PQstatus
+  PQtransactionStatus
+  PQparameterStatus
+  PQping
+  PQpingParams
+
+  PQexec
+  PQexecParams
+  PQprepare
+  PQexecPrepared
+
+  PQdescribePrepared
+  PQdescribePortal
+
+  PQclosePrepared
+  PQclosePortal
+  PQclear
+
+  PQsendQuery
+  PQgetResult
+  PQisBusy
+  PQconsumeInput
+
+  PQprotocolVersion
+  PQserverVersion
+  PQerrorMessage
+  PQsocket
+  PQbackendPID
+  PQconnectionNeedsPassword
+  PQconnectionUsedPassword
+  PQconnectionUsedGSSAPI
+  PQclientEncoding
+  PQsetClientEncoding
+
+  PQresultStatus
+  PQresStatus
+  PQresultErrorMessage
+  PQresultErrorField
+  PQntuples
+  PQnfields
+  PQbinaryTuples
+  PQfname
+  PQfnumber
+  PQftable
+  PQftablecol
+  PQfformat
+  PQftype
+  PQfsize
+  PQfmod
+  PQcmdStatus
+  PQoidValue
+  PQcmdTuples
+  PQgetvalue
+  PQgetlength
+  PQgetisnull
+  PQnparams
+  PQparamtype
+  PQchangePassword
+);
+
+push(@EXPORT, @procs);
+
+sub setup
+{
+	my $libdir = shift;
+
+	my $ffi = FFI::Platypus->new(api => 1);
+
+	$ffi->type('opaque' => 'PGconn');
+	$ffi->type('opaque' => 'PGresult');
+	$ffi->type('uint32' => 'Oid');
+	$ffi->type('int' => 'ExecStatusType');
+
+	my $lib = find_lib_or_die(
+		lib => 'pq',
+		libpath => [$libdir],
+		# systempath => [],
+	   );
+	$ffi->lib($lib);
+
+	$ffi->attach('PQconnectdb' => ['string'] => 'PGconn');
+	$ffi->attach(
+		'PQconnectdbParams' => [ 'string[]', 'string[]', 'int' ] => 'PGconn');
+	$ffi->attach(
+		'PQsetdbLogin' => [
+			'string', 'string', 'string', 'string',
+			'string', 'string', 'string',
+		] => 'PGconn');
+	$ffi->attach('PQfinish' => ['PGconn'] => 'void');
+	$ffi->attach('PQreset' => ['PGconn'] => 'void');
+	$ffi->attach('PQdb' => ['PGconn'] => 'string');
+	$ffi->attach('PQuser' => ['PGconn'] => 'string');
+	$ffi->attach('PQpass' => ['PGconn'] => 'string');
+	$ffi->attach('PQhost' => ['PGconn'] => 'string');
+	$ffi->attach('PQhostaddr' => ['PGconn'] => 'string');
+	$ffi->attach('PQport' => ['PGconn'] => 'string');
+	$ffi->attach('PQtty' => ['PGconn'] => 'string');
+	$ffi->attach('PQoptions' => ['PGconn'] => 'string');
+	$ffi->attach('PQstatus' => ['PGconn'] => 'int');
+	$ffi->attach('PQtransactionStatus' => ['PGconn'] => 'int');
+	$ffi->attach('PQparameterStatus' => [ 'PGconn', 'string' ] => 'string');
+	$ffi->attach('PQping' => ['string'] => 'int');
+	$ffi->attach(
+		'PQpingParams' => [ 'string[]', 'string[]', 'int' ] => 'int');
+
+	$ffi->attach('PQprotocolVersion' => ['PGconn'] => 'int');
+	$ffi->attach('PQserverVersion' => ['PGconn'] => 'int');
+	$ffi->attach('PQerrorMessage' => ['PGconn'] => 'string');
+	$ffi->attach('PQsocket' => ['PGconn'] => 'int');
+	$ffi->attach('PQbackendPID' => ['PGconn'] => 'int');
+	$ffi->attach('PQconnectionNeedsPassword' => ['PGconn'] => 'int');
+	$ffi->attach('PQconnectionUsedPassword' => ['PGconn'] => 'int');
+	$ffi->attach('PQconnectionUsedGSSAPI' => ['PGconn'] => 'int');
+	$ffi->attach('PQclientEncoding' => ['PGconn'] => 'int');
+	$ffi->attach('PQsetClientEncoding' => [ 'PGconn', 'string' ] => 'int');
+
+	$ffi->attach('PQexec' => [ 'PGconn', 'string' ] => 'PGresult');
+	$ffi->attach(
+		'PQexecParams' => [
+			'PGconn', 'string', 'int', 'int[]',
+			'string[]', 'int[]', 'int[]', 'int'
+		] => 'PGresult');
+	$ffi->attach(
+		'PQprepare' => [ 'PGconn', 'string', 'string', 'int', 'int[]' ] =>
+		  'PGresult');
+	$ffi->attach(
+		'PQexecPrepared' => [ 'PGconn', 'string', 'int',
+			'string[]', 'int[]', 'int[]', 'int' ] => 'PGresult');
+
+	$ffi->attach('PQresultStatus' => ['PGresult'] => 'ExecStatusType');
+	$ffi->attach('PQresStatus' => ['ExecStatusType'] => 'string');
+	$ffi->attach('PQresultErrorMessage' => ['PGresult'] => 'string');
+	$ffi->attach('PQresultErrorField' => [ 'PGresult', 'int' ] => 'string');
+	$ffi->attach('PQntuples' => ['PGresult'] => 'int');
+	$ffi->attach('PQnfields' => ['PGresult'] => 'int');
+	$ffi->attach('PQbinaryTuples' => ['PGresult'] => 'int');
+	$ffi->attach('PQfname' => [ 'PGresult', 'int' ] => 'string');
+	$ffi->attach('PQfnumber' => [ 'PGresult', 'string' ] => 'int');
+	$ffi->attach('PQftable' => [ 'PGresult', 'int' ] => 'Oid');
+	$ffi->attach('PQftablecol' => [ 'PGresult', 'int' ] => 'int');
+	$ffi->attach('PQfformat' => [ 'PGresult', 'int' ] => 'int');
+	$ffi->attach('PQftype' => [ 'PGresult', 'int' ] => 'Oid');
+	$ffi->attach('PQfsize' => [ 'PGresult', 'int' ] => 'int');
+	$ffi->attach('PQfmod' => [ 'PGresult', 'int' ] => 'int');
+	$ffi->attach('PQcmdStatus' => ['PGresult'] => 'string');
+	$ffi->attach('PQoidValue' => ['PGresult'] => 'Oid');
+	$ffi->attach('PQcmdTuples' => ['PGresult'] => 'string');
+	$ffi->attach('PQgetvalue' => [ 'PGresult', 'int', 'int' ] => 'string');
+	$ffi->attach('PQgetlength' => [ 'PGresult', 'int', 'int' ] => 'int');
+	$ffi->attach('PQgetisnull' => [ 'PGresult', 'int', 'int' ] => 'int');
+	$ffi->attach('PQnparams' => ['PGresult'] => 'int');
+	$ffi->attach('PQparamtype' => [ 'PGresult', 'int' ] => 'Oid');
+
+
+	$ffi->attach(
+		'PQdescribePrepared' => [ 'PGconn', 'string' ] => 'PGresult');
+	$ffi->attach('PQdescribePortal' => [ 'PGconn', 'string' ] => 'PGresult');
+
+	$ffi->attach('PQclosePrepared' => [ 'PGconn', 'string' ] => 'PGresult');
+	$ffi->attach('PQclosePortal' => [ 'PGconn', 'string' ] => 'PGresult');
+	$ffi->attach('PQclear' => ['PGresult'] => 'void');
+
+	$ffi->attach('PQconnectStart' => [ 'string' ] => 'PGconn');
+	$ffi->attach(
+		'PQconnectStartParams' => [ 'string[]', 'string[]', 'int' ] => 'PGconn');
+	$ffi->attach('PQconnectPoll' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQresetStart' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQresetPoll' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQsendQuery' => [ 'PGconn',  'string' ] => 'int');
+	$ffi->attach('PQsendQueryParams' => [
+		'PGconn', 'string', 'int', 'Oid[]', 'string[]',
+		'int[]', 'int[]', 'int' ] => 'int');
+	$ffi->attach('PQsendPrepare' => [ 'PGconn', 'string', 'string', 'int', 'Oid[]' ] => 'int');
+	$ffi->attach('PQgetResult' => [ 'PGconn' ] => 'PGresult');
+
+	$ffi->attach('PQisBusy' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQconsumeInput' => [ 'PGconn' ] => 'int');
+	$ffi->attach('PQchangePassword' => [ 'PGconn', 'string', 'string' ] => 'PGresult');
+	
+}
+
+
+1;
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 32ee98aebc..1c929735bb 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -111,6 +111,7 @@ use Socket;
 use Test::More;
 use PostgreSQL::Test::Utils          ();
 use PostgreSQL::Test::BackgroundPsql ();
+use PostgreSQL::Test::Session;
 use Text::ParseWords                 qw(shellwords);
 use Time::HiRes                      qw(usleep);
 use Scalar::Util                     qw(blessed);
@@ -1887,6 +1888,27 @@ sub safe_psql
 
 	my ($stdout, $stderr);
 
+=comment
+
+	my $session = PostgreSQL::Test::Session->new(node=> $self, dbname => $dbname);
+
+	if ($sql =~ /^\s*SELECT/i)
+	{
+		my $res = $session->query($sql);
+		$stdout = $res->{psqlout};
+		$stderr = $res->{error_message} || "";
+	}
+	else
+	{
+		my $res = $session->do($sql);
+		$stdout = "";
+		$stderr = $res != 1 ? "error" : "";
+	}
+
+=cut
+
+	diag "safe_psql call has params" if scalar(keys(%params));
+
 	my $ret = $self->psql(
 		$dbname, $sql,
 		%params,
@@ -2004,6 +2026,9 @@ sub psql
 
 	local %ENV = $self->_get_env();
 
+	# uncomment to get a count of calls to psql
+	# note("counting psql");
+
 	my $stdout = $params{stdout};
 	my $stderr = $params{stderr};
 	my $replication = $params{replication};
@@ -2513,26 +2538,26 @@ sub poll_query_until
 
 	$expected = 't' unless defined($expected);    # default value
 
-	my $cmd = [
-		$self->installed_command('psql'), '-XAt',
-		'-d', $self->connstr($dbname)
-	];
-	my ($stdout, $stderr);
+	my $session = PostgreSQL::Test::Session->new(node => $self,
+												 dbname => $dbname);
 	my $max_attempts = 10 * $PostgreSQL::Test::Utils::timeout_default;
 	my $attempts = 0;
 
+	my $query_value;
+
 	while ($attempts < $max_attempts)
 	{
-		my $result = IPC::Run::run $cmd, '<', \$query,
-		  '>', \$stdout, '2>', \$stderr;
-
-		chomp($stdout);
-		chomp($stderr);
-
-		if ($stdout eq $expected && $stderr eq '')
+		if ($query =~ /\bselect\b/i)
 		{
-			return 1;
+			$query_value = $session->query_tuples($query);
 		}
+		else
+		{
+			$query_value = $session->do($query);
+			$expected = 1; # COMMAND_OK
+		}
+
+		return 1 if ($query_value  // 'no value returned') eq $expected;
 
 		# Wait 0.1 second before retrying.
 		usleep(100_000);
@@ -2547,9 +2572,42 @@ $query
 expecting this output:
 $expected
 last actual query output:
-$stdout
-with stderr:
-$stderr);
+$query_value
+);
+	return 0;
+}
+
+=pod
+
+=item $node->poll_until_connection($dbname)
+
+Try to connect repeatedly, until it we succeed.
+Times out after $PostgreSQL::Test::Utils::timeout_default seconds.
+Returns 1 if successful, 0 if timed out.
+
+=cut
+
+sub poll_until_connection
+{
+	my ($self, $dbname) = @_;
+
+	local %ENV = $self->_get_env();
+
+	my $max_attempts = 10 * $PostgreSQL::Test::Utils::timeout_default;
+	my $attempts = 0;
+
+	while ($attempts < $max_attempts)
+	{
+		my $session = PostgreSQL::Test::Session->new(node => $self,
+													 dbname => $dbname);
+		return 1 if $session;
+
+		# Wait 0.1 second before retrying.
+		usleep(100_000);
+
+		$attempts++;
+	}
+
 	return 0;
 }
 
@@ -3084,13 +3142,15 @@ sub wait_for_log
 
 	my $max_attempts = 10 * $PostgreSQL::Test::Utils::timeout_default;
 	my $attempts = 0;
-
+	my $length = 0;
 	while ($attempts < $max_attempts)
 	{
 		my $log =
 		  PostgreSQL::Test::Utils::slurp_file($self->logfile, $offset);
 
-		return $offset + length($log) if ($log =~ m/$regexp/);
+		$length = length($log);
+
+		return $offset + $length if ($log =~ m/$regexp/);
 
 		# Wait 0.1 second before retrying.
 		usleep(100_000);
@@ -3098,7 +3158,7 @@ sub wait_for_log
 		$attempts++;
 	}
 
-	croak "timed out waiting for match: $regexp";
+	croak "timed out waiting for match: $regexp, offset = $offset, length = $length";
 }
 
 =pod
diff --git a/src/test/perl/PostgreSQL/Test/Session.pm b/src/test/perl/PostgreSQL/Test/Session.pm
new file mode 100644
index 0000000000..dfaebfe31d
--- /dev/null
+++ b/src/test/perl/PostgreSQL/Test/Session.pm
@@ -0,0 +1,270 @@
+package PostgreSQL::Test::Session;
+
+use strict;
+use warnings FATAL => 'all';
+
+use Carp;
+use Time::HiRes qw(usleep);
+
+use PostgreSQL::PqFFI;
+
+my $setup_ok;
+
+sub setup
+{
+	return if $setup_ok;
+	my $libdir = shift;
+	PostgreSQL::PqFFI::setup($libdir);
+	$setup_ok = 1;
+}
+
+# can pass either a PostgreSQL::Test::Cluster instance or an explicit
+# directory location for libpq.{so, dll, whatever} plus a connstr
+sub new
+{
+	my $class = shift;
+	my $self = {};
+	bless $self, $class;
+	my %args = @_;
+	my $node = $args{node};
+	my $dbname = $args{dbname} || 'postgres';
+	my $libdir = $args{libdir};
+	my $connstr = $args{connstr};
+	unless ($setup_ok)
+	{
+		unless ($libdir)
+		{
+			croak "bad node" unless $node->isa("PostgreSQL::Test::Cluster");
+			$libdir = $node->config_data('--libdir');
+		}
+		setup($libdir);
+	}
+	unless ($connstr)
+	{
+		croak "bad node" unless $node->isa("PostgreSQL::Test::Cluster");
+		$connstr = $node->connstr($dbname);
+	}
+	$self->{connstr} = $connstr;
+	$self->{conn} = PQconnectdb($connstr);
+	# The destructor will clean up for us even if we fail
+	return (PQstatus($self->{conn}) == CONNECTION_OK) ? $self : undef;
+}
+
+sub close
+{
+	my $self = shift;
+	PQfinish($self->{conn});
+	delete $self->{conn};
+}
+
+sub DESTROY
+{
+	my $self = shift;
+	$self->close if exists $self->{conn};
+}
+
+sub reconnect
+{
+	my $self = shift;
+	$self->close if exists $self->{conn};
+	$self->{conn} = PQconnectdb($self->{connstr});
+	return PQstatus($self->{conn});
+}
+
+sub conn_status
+{
+	my $self = shift;
+	return exists $self->{conn} ? PQstatus($self->{conn}) : undef;
+}
+
+# run some sql which doesn't return tuples
+
+sub do
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+	my $status;
+	foreach my $sql (@_)
+	{
+		my $result = PQexec($conn, $sql);
+		$status = PQresultStatus($result);
+		PQclear($result);
+		return $status unless $status == PGRES_COMMAND_OK;
+	}
+	return $status;
+}
+
+sub do_async
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+	my $sql = shift;
+	my $result = PQsendQuery($conn, $sql);
+	return $result; # 1 or 0
+}
+
+# set password for user
+sub set_password
+{
+	my $self = shift;
+	my $user = shift;
+	my $password = shift;
+	my $conn = $self->{conn};
+	my $result = PQchangePassword($conn, $user, $password);
+	my $ret = _get_result_data($result);
+	PQclear($result);
+	return $ret;
+}
+
+# get the next resultset from some aync commands
+# wait if necessary
+# c.f. libpqsrv_get_result
+sub _get_result
+{
+	my $conn = shift;
+	while (PQisBusy($conn))
+	{
+		usleep(100_000);
+		last if PQconsumeInput($conn) == 0;
+	}
+	return PQgetResult($conn);
+}
+
+# wait for all the resultsets and clear them
+# c.f. libpqsrv_get_result_last
+sub wait_for_completion
+{
+	my $self = shift;
+	my $conn = $self->{conn};
+	while (my $res = _get_result($conn))
+	{
+		PQclear($res);
+	}
+}
+
+# Run some sql that does return tuples
+# Returns a hash with status, names, types and rows fields. names and types
+# are arrays, rows is an array of arrays. If there is an error processing
+# the query then result will also contain an error_message field, and names,
+# types and rows will be empty.
+
+sub _get_result_data
+{
+	my $result = shift;
+	my $conn = shift;
+	my $status = PQresultStatus($result);
+	my $res = {	status => $status, names => [], types => [], rows => [],
+			psqlout => ""};
+	unless ($status == PGRES_TUPLES_OK || $status == PGRES_COMMAND_OK)
+	{
+		$res->{error_message} = PQerrorMessage($conn);
+		return $res;
+	}
+	if ($status == PGRES_COMMAND_OK)
+	{
+		return $res;
+	}
+	my $ntuples = PQntuples($result);
+	my $nfields = PQnfields($result);
+	# assuming here that the strings returned by PQfname and PQgetvalue
+	# are mapped into perl space using setsvpv or similar and thus won't
+	# be affect by us calling PQclear on the result object.
+	foreach my $field (0 .. $nfields-1)
+	{
+		push(@{$res->{names}}, PQfname($result, $field));
+		push(@{$res->{types}}, PQftype($result, $field));
+	}
+	my @textrows;
+	foreach my $nrow (0 .. $ntuples - 1)
+	{
+		my $row = [];
+		foreach my $field ( 0 .. $nfields - 1)
+		{
+			my $val = PQgetvalue($result, $nrow, $field);
+			if (($val // "") eq "")
+			{
+				$val = undef if PQgetisnull($result, $nrow, $field);
+			}
+			push(@$row, $val);
+		}
+		push(@{$res->{rows}}, $row);
+		no warnings qw(uninitialized);
+		push(@textrows, join('|', @$row));
+	}
+	$res->{psqlout} = join("\n",@textrows) if $ntuples;
+	return $res;
+}
+
+sub query
+{
+	my $self = shift;
+	my $sql = shift;
+	my $conn = $self->{conn};
+	my $result = PQexec($conn, $sql);
+	my $res = _get_result_data($result, $conn);
+	PQclear($result);
+	return $res;
+}
+
+# Return a single value for a query. The query must return exactly one columns
+# and exactly one row unless missing_ok is set, in which case it can also
+# return zero rows. Any other case results in an error.
+# If the result is NULL, or if missing_ok is set and there are zero rows,
+# undef is returned. Otherwise the value from the query is returned.
+
+sub query_oneval
+{
+	my $self = shift;
+	my $sql = shift;
+	my $missing_ok = shift; # default is not ok
+	my $conn = $self->{conn};
+	my $result = PQexec($conn, $sql);
+	my $status = PQresultStatus($result);
+	unless  ($status == PGRES_TUPLES_OK)
+	{
+		PQclear($result) if $result;
+		croak PQerrorMessage($conn);
+	}
+	my $ntuples = PQntuples($result);
+	return undef if ($missing_ok && !$ntuples);
+	my $nfields = PQnfields($result);
+	croak "$ntuples tuples != 1 or $nfields fields != 1"
+	  if $ntuples != 1 || $nfields != 1;
+	my $val = PQgetvalue($result, 0, 0);
+	if ($val eq "")
+	{
+		$val = undef if PQgetisnull($result, 0, 0);
+	}
+	PQclear($result);
+	return $val;
+}
+
+# return tuples like psql's -A -t mode.
+# An empty resultset is represented by nothing, because that's the way psql does
+# it, and putting out a line with '--empty' breaks at least one test.
+
+sub query_tuples
+{
+	my $self = shift;
+	my @results;
+	foreach my $sql (@_)
+	{
+		my $res = $self->query($sql);
+		croak $res->{error_message}
+		  unless $res->{status} == PGRES_TUPLES_OK;
+		my $rows = $res->{rows};
+		unless (@$rows)
+		{
+			# push(@results,"-- empty");
+			next;
+		}
+		# join will render undef as an empty string here
+		no warnings qw(uninitialized);
+		my @tuples = map { join('|', @$_); } @$rows;
+		push(@results, join("\n",@tuples));
+	}
+	return join("\n",@results);
+}
+
+
+1;
diff --git a/src/test/recovery/t/013_crash_restart.pl b/src/test/recovery/t/013_crash_restart.pl
index d5d24e31d9..21ee528399 100644
--- a/src/test/recovery/t/013_crash_restart.pl
+++ b/src/test/recovery/t/013_crash_restart.pl
@@ -134,7 +134,7 @@ ok( pump_until(
 $monitor->finish;
 
 # Wait till server restarts
-is($node->poll_query_until('postgres', undef, ''),
+is($node->poll_until_connection('postgres'),
 	"1", "reconnected after SIGQUIT");
 
 
@@ -216,7 +216,7 @@ ok( pump_until(
 $monitor->finish;
 
 # Wait till server restarts
-is($node->poll_query_until('postgres', undef, ''),
+is($node->poll_until_connection('postgres'),
 	"1", "reconnected after SIGKILL");
 
 # Make sure the committed rows survived, in-progress ones not
diff --git a/src/test/recovery/t/022_crash_temp_files.pl b/src/test/recovery/t/022_crash_temp_files.pl
index 769b6a6627..73df2c4789 100644
--- a/src/test/recovery/t/022_crash_temp_files.pl
+++ b/src/test/recovery/t/022_crash_temp_files.pl
@@ -148,7 +148,7 @@ ok( pump_until(
 $killme2->finish;
 
 # Wait till server finishes restarting
-$node->poll_query_until('postgres', undef, '');
+$node->poll_until_connection('postgres');
 
 # Check for temporary files
 is( $node->safe_psql(
@@ -255,7 +255,7 @@ ok( pump_until(
 $killme2->finish;
 
 # Wait till server finishes restarting
-$node->poll_query_until('postgres', undef, '');
+$node->poll_until_connection('postgres');
 
 # Check for temporary files -- should be there
 is( $node->safe_psql(
diff --git a/src/test/recovery/t/031_recovery_conflict.pl b/src/test/recovery/t/031_recovery_conflict.pl
index d87efa823f..489db81276 100644
--- a/src/test/recovery/t/031_recovery_conflict.pl
+++ b/src/test/recovery/t/031_recovery_conflict.pl
@@ -67,8 +67,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 
 # a longrunning psql that we can use to trigger conflicts
-my $psql_standby =
-  $node_standby->background_psql($test_db, on_error_stop => 0);
+my $psql_standby = PostgreSQL::Test::Session->new(node => $node_standby, dbname => $test_db);
 my $expected_conflicts = 0;
 
 
@@ -96,7 +95,7 @@ my $cursor1 = "test_recovery_conflict_cursor";
 
 # DECLARE and use a cursor on standby, causing buffer with the only block of
 # the relation to be pinned on the standby
-my $res = $psql_standby->query_safe(
+my $res = $psql_standby->query_oneval(
 	qq[
     BEGIN;
     DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
@@ -119,7 +118,7 @@ $node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]);
 $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User was holding shared buffer pin for too long");
-$psql_standby->reconnect_and_clear();
+$psql_standby->reconnect();
 check_conflict_stat("bufferpin");
 
 
@@ -132,7 +131,7 @@ $node_primary->safe_psql($test_db,
 $node_primary->wait_for_replay_catchup($node_standby);
 
 # DECLARE and FETCH from cursor on the standby
-$res = $psql_standby->query_safe(
+$res = $psql_standby->query_oneval(
 	qq[
         BEGIN;
         DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
@@ -152,7 +151,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log(
 	"User query might have needed to see row versions that must be removed");
-$psql_standby->reconnect_and_clear();
+$psql_standby->reconnect();
 check_conflict_stat("snapshot");
 
 
@@ -161,7 +160,7 @@ $sect = "lock conflict";
 $expected_conflicts++;
 
 # acquire lock to conflict with
-$res = $psql_standby->query_safe(
+$res = $psql_standby->query_oneval(
 	qq[
         BEGIN;
         LOCK TABLE $table1 IN ACCESS SHARE MODE;
@@ -175,7 +174,7 @@ $node_primary->safe_psql($test_db, qq[DROP TABLE $table1;]);
 $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User was holding a relation lock for too long");
-$psql_standby->reconnect_and_clear();
+$psql_standby->reconnect();
 check_conflict_stat("lock");
 
 
@@ -186,7 +185,7 @@ $expected_conflicts++;
 # DECLARE a cursor for a query which, with sufficiently low work_mem, will
 # spill tuples into temp files in the temporary tablespace created during
 # setup.
-$res = $psql_standby->query_safe(
+$res = $psql_standby->query_oneval(
 	qq[
         BEGIN;
         SET work_mem = '64kB';
@@ -205,7 +204,7 @@ $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log(
 	"User was or might have been using tablespace that must be dropped");
-$psql_standby->reconnect_and_clear();
+$psql_standby->reconnect();
 check_conflict_stat("tablespace");
 
 
@@ -220,8 +219,9 @@ $node_standby->adjust_conf(
 	'postgresql.conf',
 	'max_standby_streaming_delay',
 	"${PostgreSQL::Test::Utils::timeout_default}s");
+$psql_standby->close;
 $node_standby->restart();
-$psql_standby->reconnect_and_clear();
+$psql_standby->reconnect();
 
 # Generate a few dead rows, to later be cleaned up by vacuum. Then acquire a
 # lock on another relation in a prepared xact, so it's held continuously by
@@ -244,12 +244,15 @@ SELECT txid_current();
 
 $node_primary->wait_for_replay_catchup($node_standby);
 
-$res = $psql_standby->query_until(
-	qr/^1$/m, qq[
+$res = $psql_standby->query_oneval(
+	qq[
     BEGIN;
     -- hold pin
     DECLARE $cursor1 CURSOR FOR SELECT a FROM $table1;
     FETCH FORWARD FROM $cursor1;
+]);
+is ($res, 1, "pin held");
+$psql_standby->do_async(qq[
     -- wait for lock held by prepared transaction
 	SELECT * FROM $table2;
     ]);
@@ -270,15 +273,16 @@ $node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]);
 $node_primary->wait_for_replay_catchup($node_standby);
 
 check_conflict_log("User transaction caused buffer deadlock with recovery.");
-$psql_standby->reconnect_and_clear();
+$psql_standby->reconnect();
 check_conflict_stat("deadlock");
 
 # clean up for next tests
 $node_primary->safe_psql($test_db, qq[ROLLBACK PREPARED 'lock';]);
 $node_standby->adjust_conf('postgresql.conf', 'max_standby_streaming_delay',
-	'50ms');
+						   '50ms');
+$psql_standby->close;
 $node_standby->restart();
-$psql_standby->reconnect_and_clear();
+$psql_standby->reconnect();
 
 
 # Check that expected number of conflicts show in pg_stat_database. Needs to
@@ -302,7 +306,7 @@ check_conflict_log("User was connected to a database that must be dropped");
 
 # explicitly shut down psql instances gracefully - to avoid hangs or worse on
 # windows
-$psql_standby->quit;
+$psql_standby->close;
 
 $node_standby->stop();
 $node_primary->stop();
diff --git a/src/test/recovery/t/037_invalid_database.pl b/src/test/recovery/t/037_invalid_database.pl
index 47f524be4c..d34cc91af7 100644
--- a/src/test/recovery/t/037_invalid_database.pl
+++ b/src/test/recovery/t/037_invalid_database.pl
@@ -89,20 +89,20 @@ is($node->psql('postgres', 'DROP DATABASE regression_invalid'),
 # interruption happens at the appropriate moment, we lock pg_tablespace. DROP
 # DATABASE scans pg_tablespace once it has reached the "irreversible" part of
 # dropping the database, making it a suitable point to wait.
-my $bgpsql = $node->background_psql('postgres', on_error_stop => 0);
-my $pid = $bgpsql->query('SELECT pg_backend_pid()');
+my $bgpsql = PostgreSQL::Test::Session->new(node=>$node);
+my $pid = $bgpsql->query_oneval('SELECT pg_backend_pid()');
 
 # create the database, prevent drop database via lock held by a 2PC transaction
-ok( $bgpsql->query_safe(
+is (1,  $bgpsql->do(
 		qq(
-  CREATE DATABASE regression_invalid_interrupt;
-  BEGIN;
+  CREATE DATABASE regression_invalid_interrupt;),
+  qq(BEGIN;
   LOCK pg_tablespace;
   PREPARE TRANSACTION 'lock_tblspc';)),
 	"blocked DROP DATABASE completion");
 
 # Try to drop. This will wait due to the still held lock.
-$bgpsql->query_until(qr//, "DROP DATABASE regression_invalid_interrupt;\n");
+$bgpsql->do_async("DROP DATABASE regression_invalid_interrupt;");
 
 # Ensure we're waiting for the lock
 $node->poll_query_until('postgres',
@@ -113,12 +113,9 @@ $node->poll_query_until('postgres',
 ok($node->safe_psql('postgres', "SELECT pg_cancel_backend($pid)"),
 	"canceling DROP DATABASE");
 
+$bgpsql->wait_for_completion;
 # wait for cancellation to be processed
-ok( pump_until(
-		$bgpsql->{run}, $bgpsql->{timeout},
-		\$bgpsql->{stderr}, qr/canceling statement due to user request/),
-	"cancel processed");
-$bgpsql->{stderr} = '';
+pass("cancel processed");
 
 # verify that connection to the database aren't allowed
 is($node->psql('regression_invalid_interrupt', ''),
@@ -126,12 +123,12 @@ is($node->psql('regression_invalid_interrupt', ''),
 
 # To properly drop the database, we need to release the lock previously preventing
 # doing so.
-ok($bgpsql->query_safe(qq(ROLLBACK PREPARED 'lock_tblspc')),
+ok($bgpsql->do(qq(ROLLBACK PREPARED 'lock_tblspc')),
 	"unblock DROP DATABASE");
 
 ok($bgpsql->query(qq(DROP DATABASE regression_invalid_interrupt)),
 	"DROP DATABASE invalid_interrupt");
 
-$bgpsql->quit();
+$bgpsql->close();
 
 done_testing();
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 2c51cfc3c8..ea49b47ded 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -740,17 +740,13 @@ $primary->safe_psql('postgres',
 	"SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);"
 );
 
-my $back_q = $primary->background_psql(
-	'postgres',
-	on_error_stop => 0,
-	timeout => $PostgreSQL::Test::Utils::timeout_default);
+my $back_q = PostgreSQL::Test::Session->new(node=>$primary);
 
 # pg_logical_slot_get_changes will be blocked until the standby catches up,
 # hence it needs to be executed in a background session.
 $offset = -s $primary->logfile;
-$back_q->query_until(
-	qr/logical_slot_get_changes/, q(
-   \echo logical_slot_get_changes
+$back_q->do_async(
+	q(
    SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);
 ));
 
@@ -768,7 +764,8 @@ $primary->reload;
 # Since there are no slots in synchronized_standby_slots, the function
 # pg_logical_slot_get_changes should now return, and the session can be
 # stopped.
-$back_q->quit;
+$back_q->wait_for_completion;
+$back_q->close;
 
 $primary->safe_psql('postgres',
 	"SELECT pg_drop_replication_slot('test_slot');");
diff --git a/src/test/recovery/t/041_checkpoint_at_promote.pl b/src/test/recovery/t/041_checkpoint_at_promote.pl
index 5aa05b456c..31cd9b27cb 100644
--- a/src/test/recovery/t/041_checkpoint_at_promote.pl
+++ b/src/test/recovery/t/041_checkpoint_at_promote.pl
@@ -62,11 +62,9 @@ $node_standby->safe_psql('postgres',
 # Execute a restart point on the standby, that we will now be waiting on.
 # This needs to be in the background.
 my $logstart = -s $node_standby->logfile;
-my $psql_session =
-  $node_standby->background_psql('postgres', on_error_stop => 0);
-$psql_session->query_until(
-	qr/starting_checkpoint/, q(
-   \echo starting_checkpoint
+my $psql_session = PostgreSQL::Test::Session->new(node=> $node_standby);
+$psql_session->do_async(
+	q(
    CHECKPOINT;
 ));
 
@@ -152,7 +150,7 @@ ok( pump_until(
 $killme->finish;
 
 # Wait till server finishes restarting.
-$node_standby->poll_query_until('postgres', undef, '');
+$node_standby->poll_until_connection('postgres');
 
 # After recovery, the server should be able to start.
 my $stdout;
diff --git a/src/test/recovery/t/042_low_level_backup.pl b/src/test/recovery/t/042_low_level_backup.pl
index 61d23187e0..14ab229b9d 100644
--- a/src/test/recovery/t/042_low_level_backup.pl
+++ b/src/test/recovery/t/042_low_level_backup.pl
@@ -20,11 +20,10 @@ $node_primary->start;
 
 # Start backup.
 my $backup_name = 'backup1';
-my $psql = $node_primary->background_psql('postgres');
+my $psql = PostgreSQL::Test::Session->new(node => $node_primary);
 
-$psql->query_safe("SET client_min_messages TO WARNING");
-$psql->set_query_timer_restart;
-$psql->query_safe("select pg_backup_start('test label')");
+$psql->do("SET client_min_messages TO WARNING");
+$psql->query("select pg_backup_start('test label')");
 
 # Copy files.
 my $backup_dir = $node_primary->backup_dir . '/' . $backup_name;
@@ -81,9 +80,9 @@ my $stop_segment_name = $node_primary->safe_psql('postgres',
 
 # Stop backup and get backup_label, the last segment is archived.
 my $backup_label =
-  $psql->query_safe("select labelfile from pg_backup_stop()");
+  $psql->query_oneval("select labelfile from pg_backup_stop()");
 
-$psql->quit;
+$psql->close;
 
 # Rather than writing out backup_label, try to recover the backup without
 # backup_label to demonstrate that recovery will not work correctly without it,
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 5b9956681d..ef9ede62fc 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -5,6 +5,7 @@
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Session;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
@@ -30,18 +31,17 @@ sub test_streaming
 	# Interleave a pair of transactions, each exceeding the 64kB limit.
 	my $offset = 0;
 
-	my $h = $node_publisher->background_psql('postgres', on_error_stop => 0);
+	my $h = PostgreSQL::Test::Session->new(node=>$node_publisher);
 
 	# Check the subscriber log from now on.
 	$offset = -s $node_subscriber->logfile;
 
-	$h->query_safe(
-		q{
-	BEGIN;
-	INSERT INTO test_tab SELECT i, sha256(i::text::bytea) FROM generate_series(3, 5000) s(i);
-	UPDATE test_tab SET b = sha256(b) WHERE mod(a,2) = 0;
-	DELETE FROM test_tab WHERE mod(a,3) = 0;
-	});
+	$h->do(
+		'BEGIN',
+		'INSERT INTO test_tab SELECT i, sha256(i::text::bytea) FROM generate_series(3, 5000) s(i)',
+		'UPDATE test_tab SET b = sha256(b) WHERE mod(a,2) = 0',
+		'DELETE FROM test_tab WHERE mod(a,3) = 0',
+	);
 
 	$node_publisher->safe_psql(
 		'postgres', q{
@@ -51,9 +51,9 @@ sub test_streaming
 	COMMIT;
 	});
 
-	$h->query_safe('COMMIT');
+	$h->do('COMMIT');
 	# errors make the next test fail, so ignore them here
-	$h->quit;
+	$h->close;
 
 	$node_publisher->wait_for_catchup($appname);
 
@@ -211,14 +211,14 @@ $node_subscriber->reload;
 $node_subscriber->safe_psql('postgres', q{SELECT 1});
 
 # Interleave a pair of transactions, each exceeding the 64kB limit.
-my $h = $node_publisher->background_psql('postgres', on_error_stop => 0);
+my $h = PostgreSQL::Test::Session->new(node => $node_publisher);
 
 # Confirm if a deadlock between the leader apply worker and the parallel apply
 # worker can be detected.
 
 my $offset = -s $node_subscriber->logfile;
 
-$h->query_safe(
+$h->do(
 	q{
 BEGIN;
 INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
@@ -232,8 +232,8 @@ $node_subscriber->wait_for_log(
 
 $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");
 
-$h->query_safe('COMMIT');
-$h->quit;
+$h->do('COMMIT');
+$h->close;
 
 $node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
 	$offset);
@@ -260,7 +260,8 @@ $node_subscriber->safe_psql('postgres',
 # Check the subscriber log from now on.
 $offset = -s $node_subscriber->logfile;
 
-$h->query_safe(
+$h->reconnect;
+$h->do(
 	q{
 BEGIN;
 INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
@@ -275,8 +276,8 @@ $node_subscriber->wait_for_log(
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
 
-$h->query_safe('COMMIT');
-$h->quit;
+$h->do('COMMIT');
+$h->close;
 
 $node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
 	$offset);

Reply via email to