Hi,

On 2023-01-30 15:06:46 -0500, Tom Lane wrote:
> Andres Freund <and...@anarazel.de> writes:
> > It's annoyingly hard to wait for the result of a query in a generic way with
> > background_psql(), and more generally for psql. background_psql() uses 
> > -XAtq,
> > which means that we'll not get "status" output (like "BEGIN" or "(1 row)"),
> > and that queries not returning anything are completely invisible.
>
> Yeah, the empty-query-result problem was giving me fits recently.
> +1 for wrapping this into something more convenient to use.

I've hacked some on this. I first tried to just introduce a few helper
functions in Cluster.pm, but that ended up being awkward. So I bit the bullet
and introduced a new class (in BackgroundPsql.pm), and made background_psql()
and interactive_psql() return an instance of it.

This is just a rough prototype. Several function names don't seem great, it
need POD documentation, etc.


The main convenience things it has over the old interface:
- $node->background_psql('dbname') is enough
- $psql->query(), which returns the query results as a string, is a lot easier
  to use than having to pump, identify query boundaries via regex etc.
- $psql->query_safe(), which dies if any query fails (detected via stderr)
- $psql->query_until() is a helper that makes it a bit easier to start queries
  that won't finish until a later point


I don't quite like the new interface yet:
- It's somewhat common to want to know if there was a failure, but also get
  the query result, not sure what the best function signature for that is in
  perl.
- query_until() sounds a bit too much like $node->poll_query_until(). Maybe
  query_wait_until() is better? OTOH, the other function has poll in the name,
  so maybe it's ok.
- right now there's a bit too much logic in background_psql() /
  interactive_psql() for my taste


Those points aside, I think it already makes the tests a good bit more
readable. My WIP vacuum_defer_cleanup_age patch shrunk by half with it.

I think with a bit more polish it's easy enough to use that we could avoid a
good number of those one-off psql's that we do all over.


I didn't really know what this, insrc/test/subscription/t/015_stream.pl, is
about:

$h->finish;    # errors make the next test fail, so ignore them here

There's no further test?

I'm somewhat surprised it doesn't cause problems in another ->finish later on,
where we then afterwards just use $h again. Apparently IPC::Run just
automagically restarts psql?


Greetings,

Andres Freund
>From fb0e9fceead01aaee0bdd05c3ad6c67814f6b820 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Mon, 30 Jan 2023 15:39:08 -0800
Subject: [PATCH v1] WIP: test: Introduce BackgroundPsql class

Discussion: https://postgr.es/m/882122.1675109...@sss.pgh.pa.us
---
 src/bin/psql/t/010_tab_completion.pl          |  27 +--
 contrib/amcheck/t/003_cic_2pc.pl              |  66 +++----
 .../perl/PostgreSQL/Test/BackgroundPsql.pm    | 165 ++++++++++++++++++
 src/test/perl/PostgreSQL/Test/Cluster.pm      |  83 ++-------
 src/test/recovery/t/031_recovery_conflict.pl  | 101 +++--------
 src/test/subscription/t/015_stream.pl         |  52 ++----
 6 files changed, 252 insertions(+), 242 deletions(-)
 create mode 100644 src/test/perl/PostgreSQL/Test/BackgroundPsql.pm

diff --git a/src/bin/psql/t/010_tab_completion.pl b/src/bin/psql/t/010_tab_completion.pl
index 7746c75e0c9..7c382b98f8f 100644
--- a/src/bin/psql/t/010_tab_completion.pl
+++ b/src/bin/psql/t/010_tab_completion.pl
@@ -92,14 +92,7 @@ print $FH "other stuff\n";
 close $FH;
 
 # fire up an interactive psql session
-my $in  = '';
-my $out = '';
-
-my $timer = timer($PostgreSQL::Test::Utils::timeout_default);
-
-my $h = $node->interactive_psql('postgres', \$in, \$out, $timer);
-
-like($out, qr/psql/, "print startup banner");
+my $h = $node->interactive_psql('postgres');
 
 # Simple test case: type something and see if psql responds as expected
 sub check_completion
@@ -109,15 +102,12 @@ sub check_completion
 	# report test failures from caller location
 	local $Test::Builder::Level = $Test::Builder::Level + 1;
 
-	# reset output collector
-	$out = "";
 	# restart per-command timer
-	$timer->start($PostgreSQL::Test::Utils::timeout_default);
-	# send the data to be sent
-	$in .= $send;
-	# wait ...
-	pump $h until ($out =~ $pattern || $timer->is_expired);
-	my $okay = ($out =~ $pattern && !$timer->is_expired);
+	$h->{timeout}->start($PostgreSQL::Test::Utils::timeout_default);
+
+	# send the data to be sent and wait for its result
+	my $out = $h->query_until($pattern, $send);
+	my $okay = ($out =~ $pattern && !$h->{timeout}->is_expired);
 	ok($okay, $annotation);
 	# for debugging, log actual output if it didn't match
 	local $Data::Dumper::Terse = 1;
@@ -443,10 +433,7 @@ check_completion("blarg \t\t", qr//, "check completion failure path");
 clear_query();
 
 # send psql an explicit \q to shut it down, else pty won't close properly
-$timer->start($PostgreSQL::Test::Utils::timeout_default);
-$in .= "\\q\n";
-finish $h or die "psql returned $?";
-$timer->reset;
+$h->quit or die "psql returned $?";
 
 # done
 $node->stop;
diff --git a/contrib/amcheck/t/003_cic_2pc.pl b/contrib/amcheck/t/003_cic_2pc.pl
index eabe6fcf964..5323ed11ae9 100644
--- a/contrib/amcheck/t/003_cic_2pc.pl
+++ b/contrib/amcheck/t/003_cic_2pc.pl
@@ -36,63 +36,46 @@ $node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
 # statements.
 #
 
-my $main_in    = '';
-my $main_out   = '';
-my $main_timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
+my $main_h = $node->background_psql('postgres');
 
-my $main_h =
-  $node->background_psql('postgres', \$main_in, \$main_out,
-	$main_timer, on_error_stop => 1);
-$main_in .= q(
+$main_h->query_safe(q(
 BEGIN;
 INSERT INTO tbl VALUES(0);
-\echo syncpoint1
-);
-pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired;
+));
 
-my $cic_in    = '';
-my $cic_out   = '';
-my $cic_timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
-my $cic_h =
-  $node->background_psql('postgres', \$cic_in, \$cic_out,
-	$cic_timer, on_error_stop => 1);
-$cic_in .= q(
+my $cic_h = $node->background_psql('postgres');
+
+$cic_h->query_until(qr/start/, q(
 \echo start
 CREATE INDEX CONCURRENTLY idx ON tbl(i);
-);
-pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired;
+));
 
-$main_in .= q(
+$main_h->query_safe(q(
 PREPARE TRANSACTION 'a';
-);
+));
 
-$main_in .= q(
+$main_h->query_safe(q(
 BEGIN;
 INSERT INTO tbl VALUES(0);
-\echo syncpoint2
-);
-pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired;
+));
 
 $node->safe_psql('postgres', q(COMMIT PREPARED 'a';));
 
-$main_in .= q(
+$main_h->query_safe(q(
 PREPARE TRANSACTION 'b';
 BEGIN;
 INSERT INTO tbl VALUES(0);
-\echo syncpoint3
-);
-pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired;
+));
 
 $node->safe_psql('postgres', q(COMMIT PREPARED 'b';));
 
-$main_in .= q(
+$main_h->query_safe(q(
 PREPARE TRANSACTION 'c';
 COMMIT PREPARED 'c';
-);
-$main_h->pump_nb;
+));
 
-$main_h->finish;
-$cic_h->finish;
+$main_h->quit;
+$cic_h->quit;
 
 $result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
 is($result, '0', 'bt_index_check after overlapping 2PC');
@@ -113,22 +96,15 @@ PREPARE TRANSACTION 'persists_forever';
 ));
 $node->restart;
 
-my $reindex_in  = '';
-my $reindex_out = '';
-my $reindex_timer =
-  IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
-my $reindex_h =
-  $node->background_psql('postgres', \$reindex_in, \$reindex_out,
-	$reindex_timer, on_error_stop => 1);
-$reindex_in .= q(
+my $reindex_h = $node->background_psql('postgres');
+$reindex_h->query_until(qr/start/, q(
 \echo start
 DROP INDEX CONCURRENTLY idx;
 CREATE INDEX CONCURRENTLY idx ON tbl(i);
-);
-pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired;
+));
 
 $node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'");
-$reindex_h->finish;
+$reindex_h->quit;
 $result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
 is($result, '0', 'bt_index_check after 2PC and restart');
 
diff --git a/src/test/perl/PostgreSQL/Test/BackgroundPsql.pm b/src/test/perl/PostgreSQL/Test/BackgroundPsql.pm
new file mode 100644
index 00000000000..9257075224d
--- /dev/null
+++ b/src/test/perl/PostgreSQL/Test/BackgroundPsql.pm
@@ -0,0 +1,165 @@
+
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+package PostgreSQL::Test::BackgroundPsql;
+
+use strict;
+use warnings;
+
+use Carp;
+use Config;
+use IPC::Run;
+use PostgreSQL::Test::Utils qw(pump_until);
+use Test::More;
+
+# Start a new psql background session
+#
+# Parameters:
+# - "interactive" - should a PTY be used
+# - "psql" - psql command, parameters, including connection string
+sub new
+{
+	my $class = shift;
+	my ($interactive, $psql_params) = @_;
+	my $psql = {'stdin' => '', 'stdout' => '', 'stderr' => ''};
+	my $run;
+
+	$psql->{timeout} = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
+
+	if ($interactive)
+	{
+		$run = IPC::Run::start $psql_params,
+		  '<pty<', \$psql->{stdin}, '>pty>', \$psql->{stdout}, '2>', \$psql->{stderr},
+		  $psql->{timeout};
+	}
+	else
+	{
+		$run = IPC::Run::start $psql_params,
+		  '<', \$psql->{stdin}, '>', \$psql->{stdout}, '2>', \$psql->{stderr},
+		  $psql->{timeout};
+	}
+
+	$psql->{run} = $run;
+
+	my $self = bless $psql, $class;
+
+	$self->wait_connect();
+
+	return $self;
+}
+
+sub wait_connect
+{
+	my ($self) = @_;
+
+	# Request some output, and pump until we see it.  This means that psql
+	# connection failures are caught here, relieving callers of the need to
+	# handle those.  (Right now, we have no particularly good handling for
+	# errors anyway, but that might be added later.)
+	my $banner = "background_psql: ready";
+	$self->{stdin} .= "\\echo $banner\n";
+	$self->{run}->pump() until $self->{stdout} =~ /$banner/ || $self->{timeout}->is_expired;
+	$self->{stdout} = ''; # clear out banner
+
+	die "psql startup timed out" if $self->{timeout}->is_expired;
+}
+
+sub quit
+{
+	my ($self) = @_;
+
+	$self->{stdin} .= "\\q\n";
+
+	return $self->{run}->finish;
+}
+
+sub reconnect_and_clear
+{
+	my ($self) = @_;
+
+	# If psql isn't dead already, tell it to quit as \q, when already dead,
+	# causes IPC::Run to unhelpfully error out with "ack Broken pipe:".
+	$self->{run}->pump_nb();
+	if ($self->{run}->pumpable())
+	{
+		$self->{stdin} .= "\\q\n";
+	}
+	$self->{run}->finish;
+
+	# restart
+	$self->{run}->run();
+	$self->{stdin}  = '';
+	$self->{stdout} = '';
+
+	$self->wait_connect()
+}
+
+sub query
+{
+	my ($self, $query) = @_;
+	my $ret;
+	local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+	note "issuing query via background psql: $query";
+
+	# feed the query to psql's stdin, follwed by \n (so psql processes the
+	# line), by a ; (so that psql issues the query, if it doesnt't include a ;
+	# itself), and a separator echoed with \echo, that we can wait on.
+	my $banner = "background_psql: QUERY_SEPARATOR";
+	$self->{stdin} .= "$query\n;\n\\echo $banner\n";
+
+	pump_until($self->{run}, $self->{timeout}, \$self->{stdout}, qr/$banner/);
+
+	die "psql query timed out" if $self->{timeout}->is_expired;
+	$ret = $self->{stdout};
+
+	# remove banner again, our caller doesn't care
+	$ret =~ s/\n$banner$//s;
+
+	# clear out output for the next query
+	$self->{stdout} = '';
+
+	return $ret;
+}
+
+# Like query(), but errors out if the query failed.
+#
+# Query failure is determined by producing output on stderr.
+sub query_safe
+{
+	my ($self, $query) = @_;
+
+	my $ret = $self->query($query);
+
+	if ($self->{stderr} ne "")
+	{
+		die "query failed: $self->{stderr}";
+	}
+
+	return $ret;
+}
+
+# issue query, but only wait for $until, not query completion
+#
+# Note: Query needs newline and semicolon for psql to process the input.
+sub query_until
+{
+	my ($self, $until, $query) = @_;
+	my $ret;
+	local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+	$self->{stdin} .= $query;
+
+	pump_until($self->{run}, $self->{timeout}, \$self->{stdout}, $until);
+
+	die "psql query timed out" if $self->{timeout}->is_expired;
+
+	$ret = $self->{stdout};
+
+	# clear out output for the next query
+	$self->{stdout} = '';
+
+	return $ret;
+}
+
+1;
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 04921ca3a3d..e16ff197cf9 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -113,6 +113,7 @@ use PostgreSQL::Test::RecursiveCopy;
 use Socket;
 use Test::More;
 use PostgreSQL::Test::Utils ();
+use PostgreSQL::Test::BackgroundPsql ();
 use Time::HiRes qw(usleep);
 use Scalar::Util qw(blessed);
 
@@ -1966,18 +1967,12 @@ sub psql
 
 =pod
 
-=item $node->background_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness
+=item $node->background_psql($dbname, %params) => BackgroundPsql instance
 
-Invoke B<psql> on B<$dbname> and return an IPC::Run harness object, which the
-caller may use to send input to B<psql>.  The process's stdin is sourced from
-the $stdin scalar reference, and its stdout and stderr go to the $stdout
-scalar reference.  This allows the caller to act on other parts of the system
-while idling this backend.
+Invoke B<psql> on B<$dbname> and return a BackgroundPsql object.
 
-The specified timer object is attached to the harness, as well.  It's caller's
-responsibility to set the timeout length (usually
-$PostgreSQL::Test::Utils::timeout_default), and to restart the timer after
-each command if the timeout is per-command.
+A default timeout of $PostgreSQL::Test::Utils::timeout_default is set up,
+which can modified later.
 
 psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc>
 disabled.  That may be overridden by passing extra psql parameters.
@@ -1986,7 +1981,7 @@ Dies on failure to invoke psql, or if psql fails to connect.  Errors occurring
 later are the caller's problem.  psql runs with on_error_stop by default so
 that it will stop running sql and return 3 if passed SQL results in an error.
 
-Be sure to "finish" the harness when done with it.
+Be sure to "quit" the returned object when done with it.
 
 =over
 
@@ -2012,7 +2007,7 @@ If given, it must be an array reference containing additional parameters to B<ps
 
 sub background_psql
 {
-	my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_;
+	my ($self, $dbname, %params) = @_;
 
 	local %ENV = $self->_get_env();
 
@@ -2029,45 +2024,18 @@ sub background_psql
 
 	$params{on_error_stop} = 1 unless defined $params{on_error_stop};
 
-	push @psql_params, '-v', 'ON_ERROR_STOP=1' if $params{on_error_stop};
-	push @psql_params, @{ $params{extra_params} }
-	  if defined $params{extra_params};
-
-	# Ensure there is no data waiting to be sent:
-	$$stdin = "" if ref($stdin);
-	# IPC::Run would otherwise append to existing contents:
-	$$stdout = "" if ref($stdout);
-
-	my $harness = IPC::Run::start \@psql_params,
-	  '<', $stdin, '>', $stdout, $timer;
-
-	# Request some output, and pump until we see it.  This means that psql
-	# connection failures are caught here, relieving callers of the need to
-	# handle those.  (Right now, we have no particularly good handling for
-	# errors anyway, but that might be added later.)
-	my $banner = "background_psql: ready";
-	$$stdin = "\\echo $banner\n";
-	pump $harness until $$stdout =~ /$banner/ || $timer->is_expired;
-
-	die "psql startup timed out" if $timer->is_expired;
-
-	return $harness;
+	return PostgreSQL::Test::BackgroundPsql->new(0, \@psql_params);
 }
 
 =pod
 
-=item $node->interactive_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness
+=item $node->interactive_psql($dbname, %params) => BackgroundPsql instance
 
-Invoke B<psql> on B<$dbname> and return an IPC::Run harness object,
-which the caller may use to send interactive input to B<psql>.
-The process's stdin is sourced from the $stdin scalar reference,
-and its stdout and stderr go to the $stdout scalar reference.
-ptys are used so that psql thinks it's being called interactively.
+Invoke B<psql> on B<$dbname> and return a BackgroundPsql object, which the
+caller may use to send interactive input to B<psql>.
 
-The specified timer object is attached to the harness, as well.  It's caller's
-responsibility to set the timeout length (usually
-$PostgreSQL::Test::Utils::timeout_default), and to restart the timer after
-each command if the timeout is per-command.
+A default timeout of $PostgreSQL::Test::Utils::timeout_default is set up,
+which can modified later.
 
 psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc>
 disabled.  That may be overridden by passing extra psql parameters.
@@ -2075,7 +2043,7 @@ disabled.  That may be overridden by passing extra psql parameters.
 Dies on failure to invoke psql, or if psql fails to connect.
 Errors occurring later are the caller's problem.
 
-Be sure to "finish" the harness when done with it.
+Be sure to "quit" the returned object when done with it.
 
 The only extra parameter currently accepted is
 
@@ -2093,7 +2061,7 @@ This requires IO::Pty in addition to IPC::Run.
 
 sub interactive_psql
 {
-	my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_;
+	my ($self, $dbname, %params) = @_;
 
 	local %ENV = $self->_get_env();
 
@@ -2104,26 +2072,7 @@ sub interactive_psql
 	push @psql_params, @{ $params{extra_params} }
 	  if defined $params{extra_params};
 
-	# Ensure there is no data waiting to be sent:
-	$$stdin = "" if ref($stdin);
-	# IPC::Run would otherwise append to existing contents:
-	$$stdout = "" if ref($stdout);
-
-	my $harness = IPC::Run::start \@psql_params,
-	  '<pty<', $stdin, '>pty>', $stdout, $timer;
-
-	# Pump until we see psql's help banner.  This ensures that callers
-	# won't write anything to the pty before it's ready, avoiding an
-	# implementation issue in IPC::Run.  Also, it means that psql
-	# connection failures are caught here, relieving callers of
-	# the need to handle those.  (Right now, we have no particularly
-	# good handling for errors anyway, but that might be added later.)
-	pump $harness
-	  until $$stdout =~ /Type "help" for help/ || $timer->is_expired;
-
-	die "psql startup timed out" if $timer->is_expired;
-
-	return $harness;
+	return PostgreSQL::Test::BackgroundPsql->new(1, \@psql_params);
 }
 
 # Common sub of pgbench-invoking interfaces.  Makes any requested script files
diff --git a/src/test/recovery/t/031_recovery_conflict.pl b/src/test/recovery/t/031_recovery_conflict.pl
index 875afb8e3ce..b5a5e634f3a 100644
--- a/src/test/recovery/t/031_recovery_conflict.pl
+++ b/src/test/recovery/t/031_recovery_conflict.pl
@@ -68,14 +68,7 @@ $node_primary->wait_for_catchup($node_standby, 'replay', $primary_lsn);
 
 
 # a longrunning psql that we can use to trigger conflicts
-my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
-my %psql_standby = ('stdin' => '', 'stdout' => '');
-$psql_standby{run} =
-  $node_standby->background_psql($test_db, \$psql_standby{stdin},
-	\$psql_standby{stdout},
-	$psql_timeout);
-$psql_standby{stdout} = '';
-
+my $psql_standby = $node_standby->background_psql($test_db);
 my $expected_conflicts = 0;
 
 
@@ -104,15 +97,14 @@ my $cursor1 = "test_recovery_conflict_cursor";
 
 # DECLARE and use a cursor on standby, causing buffer with the only block of
 # the relation to be pinned on the standby
-$psql_standby{stdin} .= qq[
-        BEGIN;
-        DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
-        FETCH FORWARD FROM $cursor1;
-        ];
+my $res = $psql_standby->query_safe(qq[
+    BEGIN;
+    DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
+    FETCH FORWARD FROM $cursor1;
+]);
 # FETCH FORWARD should have returned a 0 since all values of b in the table
 # are 0
-ok(pump_until_standby(qr/^0$/m),
-	"$sect: cursor with conflicting pin established");
+like($res, qr/^0$/m, "$sect: cursor with conflicting pin established");
 
 # to check the log starting now for recovery conflict messages
 my $log_location = -s $node_standby->logfile;
@@ -128,7 +120,7 @@ $primary_lsn = $node_primary->lsn('flush');
 $node_primary->wait_for_catchup($node_standby, 'replay', $primary_lsn);
 
 check_conflict_log("User was holding shared buffer pin for too long");
-reconnect_and_clear();
+$psql_standby->reconnect_and_clear();
 check_conflict_stat("bufferpin");
 
 
@@ -142,15 +134,12 @@ $primary_lsn = $node_primary->lsn('flush');
 $node_primary->wait_for_catchup($node_standby, 'replay', $primary_lsn);
 
 # DECLARE and FETCH from cursor on the standby
-$psql_standby{stdin} .= qq[
+$res = $psql_standby->query_safe(qq[
         BEGIN;
         DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1;
         FETCH FORWARD FROM $cursor1;
-        ];
-ok( pump_until(
-		$psql_standby{run},     $psql_timeout,
-		\$psql_standby{stdout}, qr/^0$/m,),
-	"$sect: cursor with conflicting snapshot established");
+        ]);
+like($res, qr/^0$/m, "$sect: cursor with conflicting snapshot established");
 
 # Do some HOT updates
 $node_primary->safe_psql($test_db,
@@ -165,7 +154,7 @@ $node_primary->wait_for_catchup($node_standby, 'replay', $primary_lsn);
 
 check_conflict_log(
 	"User query might have needed to see row versions that must be removed");
-reconnect_and_clear();
+$psql_standby->reconnect_and_clear();
 check_conflict_stat("snapshot");
 
 
@@ -174,12 +163,12 @@ $sect = "lock conflict";
 $expected_conflicts++;
 
 # acquire lock to conflict with
-$psql_standby{stdin} .= qq[
+$res = $psql_standby->query_safe(qq[
         BEGIN;
         LOCK TABLE $table1 IN ACCESS SHARE MODE;
         SELECT 1;
-        ];
-ok(pump_until_standby(qr/^1$/m), "$sect: conflicting lock acquired");
+        ]);
+like($res, qr/^1$/m, "$sect: conflicting lock acquired");
 
 # DROP TABLE containing block which standby has in a pinned buffer
 $node_primary->safe_psql($test_db, qq[DROP TABLE $table1;]);
@@ -188,7 +177,7 @@ $primary_lsn = $node_primary->lsn('flush');
 $node_primary->wait_for_catchup($node_standby, 'replay', $primary_lsn);
 
 check_conflict_log("User was holding a relation lock for too long");
-reconnect_and_clear();
+$psql_standby->reconnect_and_clear();
 check_conflict_stat("lock");
 
 
@@ -199,14 +188,14 @@ $expected_conflicts++;
 # DECLARE a cursor for a query which, with sufficiently low work_mem, will
 # spill tuples into temp files in the temporary tablespace created during
 # setup.
-$psql_standby{stdin} .= qq[
+$res = $psql_standby->query_safe(qq[
         BEGIN;
         SET work_mem = '64kB';
         DECLARE $cursor1 CURSOR FOR
           SELECT count(*) FROM generate_series(1,6000);
         FETCH FORWARD FROM $cursor1;
-        ];
-ok(pump_until_standby(qr/^6000$/m),
+        ]);
+like($res, qr/^6000$/m,
 	"$sect: cursor with conflicting temp file established");
 
 # Drop the tablespace currently containing spill files for the query on the
@@ -218,7 +207,7 @@ $node_primary->wait_for_catchup($node_standby, 'replay', $primary_lsn);
 
 check_conflict_log(
 	"User was or might have been using tablespace that must be dropped");
-reconnect_and_clear();
+$psql_standby->reconnect_and_clear();
 check_conflict_stat("tablespace");
 
 
@@ -234,7 +223,7 @@ $node_standby->adjust_conf(
 	'max_standby_streaming_delay',
 	"${PostgreSQL::Test::Utils::timeout_default}s");
 $node_standby->restart();
-reconnect_and_clear();
+$psql_standby->reconnect_and_clear();
 
 # Generate a few dead rows, to later be cleaned up by vacuum. Then acquire a
 # lock on another relation in a prepared xact, so it's held continuously by
@@ -258,19 +247,15 @@ SELECT txid_current();
 $primary_lsn = $node_primary->lsn('flush');
 $node_primary->wait_for_catchup($node_standby, 'replay', $primary_lsn);
 
-$psql_standby{stdin} .= qq[
+$res = $psql_standby->query_until(qr/^1$/m, qq[
     BEGIN;
     -- hold pin
     DECLARE $cursor1 CURSOR FOR SELECT a FROM $table1;
     FETCH FORWARD FROM $cursor1;
     -- wait for lock held by prepared transaction
 	SELECT * FROM $table2;
-    ];
-ok( pump_until(
-		$psql_standby{run},     $psql_timeout,
-		\$psql_standby{stdout}, qr/^1$/m,),
-	"$sect: cursor holding conflicting pin, also waiting for lock, established"
-);
+    ]);
+ok( 1, "$sect: cursor holding conflicting pin, also waiting for lock, established");
 
 # just to make sure we're waiting for lock already
 ok( $node_standby->poll_query_until(
@@ -286,7 +271,7 @@ $primary_lsn = $node_primary->lsn('flush');
 $node_primary->wait_for_catchup($node_standby, 'replay', $primary_lsn);
 
 check_conflict_log("User transaction caused buffer deadlock with recovery.");
-reconnect_and_clear();
+$psql_standby->reconnect_and_clear();
 check_conflict_stat("deadlock");
 
 # clean up for next tests
@@ -294,7 +279,7 @@ $node_primary->safe_psql($test_db, qq[ROLLBACK PREPARED 'lock';]);
 $node_standby->adjust_conf('postgresql.conf', 'max_standby_streaming_delay',
 	'50ms');
 $node_standby->restart();
-reconnect_and_clear();
+$psql_standby->reconnect_and_clear();
 
 
 # Check that expected number of conflicts show in pg_stat_database. Needs to
@@ -319,8 +304,7 @@ check_conflict_log("User was connected to a database that must be dropped");
 
 # explicitly shut down psql instances gracefully - to avoid hangs or worse on
 # windows
-$psql_standby{stdin} .= "\\q\n";
-$psql_standby{run}->finish;
+$psql_standby->quit;
 
 $node_standby->stop();
 $node_primary->stop();
@@ -328,37 +312,6 @@ $node_primary->stop();
 
 done_testing();
 
-
-sub pump_until_standby
-{
-	my $match = shift;
-
-	return pump_until($psql_standby{run}, $psql_timeout,
-		\$psql_standby{stdout}, $match);
-}
-
-sub reconnect_and_clear
-{
-	# If psql isn't dead already, tell it to quit as \q, when already dead,
-	# causes IPC::Run to unhelpfully error out with "ack Broken pipe:".
-	$psql_standby{run}->pump_nb();
-	if ($psql_standby{run}->pumpable())
-	{
-		$psql_standby{stdin} .= "\\q\n";
-	}
-	$psql_standby{run}->finish;
-
-	# restart
-	$psql_standby{run}->run();
-	$psql_standby{stdin}  = '';
-	$psql_standby{stdout} = '';
-
-	# Run query to ensure connection has finished re-establishing
-	$psql_standby{stdin} .= qq[SELECT 1;\n];
-	die unless pump_until_standby(qr/^1$/m);
-	$psql_standby{stdout} = '';
-}
-
 sub check_conflict_log
 {
 	my $message          = shift;
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
index 91e8aa8c0a5..7d3c893df23 100644
--- a/src/test/subscription/t/015_stream.pl
+++ b/src/test/subscription/t/015_stream.pl
@@ -28,26 +28,20 @@ sub test_streaming
 	my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
 
 	# Interleave a pair of transactions, each exceeding the 64kB limit.
-	my $in  = '';
-	my $out = '';
-
 	my $offset = 0;
 
-	my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
-
-	my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
+	my $h = $node_publisher->background_psql('postgres',
 		on_error_stop => 0);
 
 	# Check the subscriber log from now on.
 	$offset = -s $node_subscriber->logfile;
 
-	$in .= q{
+	$h->query_safe(q{
 	BEGIN;
 	INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
 	UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
 	DELETE FROM test_tab WHERE mod(a,3) = 0;
-	};
-	$h->pump_nb;
+	});
 
 	$node_publisher->safe_psql(
 		'postgres', q{
@@ -57,11 +51,10 @@ sub test_streaming
 	COMMIT;
 	});
 
-	$in .= q{
-	COMMIT;
-	\q
-	};
-	$h->finish;    # errors make the next test fail, so ignore them here
+	$h->query_safe('COMMIT');
+	$h->quit;
+	# XXX: Not sure what this means
+    # errors make the next test fail, so ignore them here
 
 	$node_publisher->wait_for_catchup($appname);
 
@@ -219,12 +212,7 @@ $node_subscriber->reload;
 $node_subscriber->safe_psql('postgres', q{SELECT 1});
 
 # Interleave a pair of transactions, each exceeding the 64kB limit.
-my $in  = '';
-my $out = '';
-
-my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
-
-my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
+my $h = $node_publisher->background_psql('postgres',
 	on_error_stop => 0);
 
 # Confirm if a deadlock between the leader apply worker and the parallel apply
@@ -232,11 +220,10 @@ my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
 
 my $offset = -s $node_subscriber->logfile;
 
-$in .= q{
+$h->query_safe(q{
 BEGIN;
 INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
-};
-$h->pump_nb;
+});
 
 # Ensure that the parallel apply worker executes the insert command before the
 # leader worker.
@@ -246,11 +233,8 @@ $node_subscriber->wait_for_log(
 
 $node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");
 
-$in .= q{
-COMMIT;
-\q
-};
-$h->finish;
+$h->query_safe('COMMIT');
+$h->quit;
 
 $node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
 	$offset);
@@ -277,11 +261,10 @@ $node_subscriber->safe_psql('postgres',
 # Check the subscriber log from now on.
 $offset = -s $node_subscriber->logfile;
 
-$in .= q{
+$h->query_safe(q{
 BEGIN;
 INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
-};
-$h->pump_nb;
+});
 
 # Ensure that the first parallel apply worker executes the insert command
 # before the second one.
@@ -292,11 +275,8 @@ $node_subscriber->wait_for_log(
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
 
-$in .= q{
-COMMIT;
-\q
-};
-$h->finish;
+$h->query_safe('COMMIT');
+$h->quit;
 
 $node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
 	$offset);
-- 
2.38.0

Reply via email to