From 7f241b871310ac37898bd04a43868fd8d4a8e3a4 Mon Sep 17 00:00:00 2001
From: Mark Dilger <mark.dilger@enterprisedb.com>
Date: Thu, 2 Sep 2021 11:55:29 -0700
Subject: [PATCH 2/2] Adding tests of subscription errors

---
 src/test/perl/PostgresNode.pm              |  64 +++++++++++
 src/test/regress/expected/subscription.out |   6 +
 src/test/regress/expected/sysviews.out     |   8 ++
 src/test/regress/sql/subscription.sql      |   4 +
 src/test/regress/sql/sysviews.sql          |   5 +
 src/test/subscription/t/025_errors.pl      | 124 +++++++++++++++++++++
 6 files changed, 211 insertions(+)
 create mode 100644 src/test/subscription/t/025_errors.pl

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index c59da758c7..d4d8cd8b68 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -2499,6 +2499,70 @@ sub wait_for_slot_catchup
 	return;
 }
 
+
+=item $node->wait_for_subscription($dbname, @subcriptions)
+
+Wait for the named subscriptions to catch up or error.
+
+=cut
+
+sub wait_for_subscriptions
+{
+	my ($self, $dbname, @subscriptions) = @_;
+
+	# Unique-ify the subscriptions passed by the caller
+	my %unique = map { $_ => 1 } @subscriptions;
+	my @unique = sort keys %unique;
+	my $unique_count = scalar(@unique);
+
+	# It makes sense to quietly return immediately for a list of zero
+	# subscriptions, but that is more likely user error than intentional, so we
+	# instead tell the caller about it noisily.
+	croak "subscriptions must be specified" unless $unique_count;
+
+	# Construct a SQL list from the unique subscription names
+	my @escaped = map { s/'/''/g; s/\\/\\\\/g; $_ } @unique;
+	my $quotedlist = join(', ', map { "'$_'" } @escaped);
+
+	# Sanity check that the subscriptions exist.  We don't want to
+	# poll until timeout on a non-existent misspelled subscription name.
+	my $unmatched = $self->safe_psql($dbname, qq(
+		SELECT string_agg(subname, ', ') FROM (
+			SELECT arg.subname
+				FROM (SELECT subname FROM unnest(ARRAY[$quotedlist]::text[]) AS subname) AS arg
+				LEFT JOIN pg_catalog.pg_subscription pg
+				ON arg.subname = pg.subname
+				WHERE pg.subname IS NULL
+				ORDER BY arg.subname
+			) AS ss
+		));
+	croak "no such subscription: $unmatched"
+		if length $unmatched;
+
+	# Ok, the subscriptions exist, so we can poll on them synchronizing or
+	# failing.  There is a race condition between when we checked above and
+	# this query, but we were only trying to detect typos in the tests, not
+	# concurrent subscription drops.
+	my $polling_sql = qq(
+		SELECT COUNT(1) = $unique_count FROM
+			(SELECT s.oid AS subid
+				FROM pg_catalog.pg_subscription s
+				LEFT JOIN pg_catalog.pg_subscription_rel sr
+				ON sr.srsubid = s.oid
+				WHERE (sr IS NULL OR sr.srsubstate IN ('s', 'r'))
+				  AND s.subname IN ($quotedlist)
+			 UNION
+			 SELECT e.subid
+				FROM pg_catalog.pg_stat_subscription_errors e
+				WHERE e.subname IN ($quotedlist)
+			) AS synced_or_errored
+		);
+	$self->poll_query_until($dbname, $polling_sql)
+	  or croak "timed out waiting for subscriptions";
+	print "done\n";
+	return;
+}
+
 =pod
 
 =item $node->query_hash($dbname, $query, @columns)
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 15a1ac6398..d33174849c 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -289,6 +289,12 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+-- no errors should be reported
+SELECT * FROM pg_stat_subscription_errors;
+ datname | subid | subname | relid | command | xid | failure_source | failure_count | last_failure | last_failure_message | stats_reset 
+---------+-------+---------+-------+---------+-----+----------------+---------------+--------------+----------------------+-------------
+(0 rows)
+
 DROP SUBSCRIPTION regress_testsub;
 -- two_phase and streaming are compatible.
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 6e54f3e15e..7cfe54224d 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -150,3 +150,11 @@ select count(distinct utc_offset) >= 24 as ok from pg_timezone_abbrevs;
  t
 (1 row)
 
+-- Test that the subscription errors view exists, and has the right columns.
+-- If we expected any rows to exist, we would need to filter out unstable
+-- columns.  But since there should be no errors, we just select them all.
+select * from pg_stat_subscription_errors;
+ datname | subid | subname | relid | command | xid | failure_source | failure_count | last_failure | last_failure_message | stats_reset 
+---------+-------+---------+-------+---------+-----+----------------+---------------+--------------+----------------------+-------------
+(0 rows)
+
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7faa935a2a..b2caf86b22 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -218,6 +218,10 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+-- no errors should be reported
+SELECT * FROM pg_stat_subscription_errors;
+
 DROP SUBSCRIPTION regress_testsub;
 
 -- two_phase and streaming are compatible.
diff --git a/src/test/regress/sql/sysviews.sql b/src/test/regress/sql/sysviews.sql
index dc8c9a3ac2..3991a246f5 100644
--- a/src/test/regress/sql/sysviews.sql
+++ b/src/test/regress/sql/sysviews.sql
@@ -60,3 +60,8 @@ set timezone_abbreviations = 'Australia';
 select count(distinct utc_offset) >= 24 as ok from pg_timezone_abbrevs;
 set timezone_abbreviations = 'India';
 select count(distinct utc_offset) >= 24 as ok from pg_timezone_abbrevs;
+
+-- Test that the subscription errors view exists, and has the right columns.
+-- If we expected any rows to exist, we would need to filter out unstable
+-- columns.  But since there should be no errors, we just select them all.
+select * from pg_stat_subscription_errors;
diff --git a/src/test/subscription/t/025_errors.pl b/src/test/subscription/t/025_errors.pl
new file mode 100644
index 0000000000..c5bd45ab15
--- /dev/null
+++ b/src/test/subscription/t/025_errors.pl
@@ -0,0 +1,124 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# This test checks behaviour of pg_stat_subscription_errors view when
+# tablesync and apply workers fail
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Create a chain of nodes for logical replication to propogate as:
+#
+#  publisher => middleman => subscriber
+#
+my ($publisher, $middleman, $subscriber);
+
+$publisher = PostgresNode->new('publisher');
+$publisher->init(allows_streaming => 'logical');
+$publisher->start;
+
+$middleman = PostgresNode->new('middleman');
+$middleman->init(allows_streaming => 'logical');
+$middleman->start;
+
+$subscriber = PostgresNode->new('subscriber');
+$subscriber->init;
+$subscriber->start;
+
+my ($node, $schema, $results);
+my @nodes = ($publisher, $middleman, $subscriber);
+
+# Create boilerplate text of the DDL needed to construct schemas and objects on
+# each node
+#
+my @schemas = qw(
+	good conflicts_on_middleman conflicts_on_subscriber);
+my @ddl = map { qq(
+CREATE SCHEMA $_;
+CREATE TABLE $_.tbl (i INTEGER);
+ALTER TABLE $_.tbl REPLICA IDENTITY FULL;
+CREATE INDEX ${_}_idx ON $_.tbl(i);
+) } @schemas;
+
+for my $node (@nodes)
+{
+	$node->safe_psql('postgres', $_) for (@ddl);
+}
+
+# Create non-unique data in all schemas on publisher
+#
+my @dml = map { qq(INSERT INTO $_.tbl (i) VALUES (1), (1), (1)) } @schemas;
+$publisher->safe_psql('postgres', $_) for (@dml);
+
+# Create additional DDL on the middleman and subscriber that will cause
+# replication failures during the initial tablesync.
+#
+$middleman->safe_psql('postgres', qq(
+CREATE UNIQUE INDEX unique_idx
+	ON conflicts_on_middleman.tbl(i)));
+$subscriber->safe_psql('postgres', qq(
+ALTER TABLE conflicts_on_subscriber.tbl
+	ADD CONSTRAINT must_be_three CHECK (i = 3)));
+
+# Insert data to all schemas on the middleman which do not violate the
+# middleman's uniqueness requirements.
+#
+for $node ($publisher, $middleman)
+{
+	$node->safe_psql('postgres', qq(INSERT INTO $_.tbl VALUES (1), (2), (3), (4)))
+		for (@schemas);
+}
+
+# Create publications named after the schemas they publish
+for $node ($publisher, $middleman)
+{
+	$node->safe_psql('postgres', qq(CREATE PUBLICATION $_ FOR TABLE $_.tbl))
+		for (@schemas);
+}
+
+my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
+my $middleman_connstr = $middleman->connstr . ' dbname=postgres';
+
+# Create subscriptions named after the schemas they subscribe
+#
+for $schema (@schemas)
+{
+	$middleman->safe_psql('postgres', qq(
+CREATE SUBSCRIPTION $schema
+	CONNECTION '$publisher_connstr'
+	PUBLICATION $schema));
+}
+
+for $schema (@schemas)
+{
+	$subscriber->safe_psql('postgres', qq(
+CREATE SUBSCRIPTION $schema
+	CONNECTION '$middleman_connstr'
+	PUBLICATION $schema));
+}
+
+# Wait for the subscriptions to finish synchronizing or to error
+#
+$middleman->wait_for_subscriptions('postgres', @schemas);
+
+$subscriber->wait_for_subscriptions('postgres', @schemas);
+
+$results = $publisher->safe_psql('postgres',
+	'select count(*) from pg_catalog.pg_stat_subscription_errors');
+is ($results, 0, "publisher has no subscription errors");
+
+$results = $middleman->safe_psql('postgres',
+	'select datname, subname, failure_source, last_failure_message from pg_catalog.pg_stat_subscription_errors');
+is ($results, 'postgres|conflicts_on_middleman|tablesync|duplicate key value violates unique constraint "unique_idx"',
+	'expected subscription failure on middleman');
+
+$results = $subscriber->safe_psql('postgres',
+	'select datname, subname, failure_source, last_failure_message from pg_catalog.pg_stat_subscription_errors');
+is ($results, 'postgres|conflicts_on_subscriber|tablesync|new row for relation "tbl" violates check constraint "must_be_three"',
+	'expected subscription failure on subscriber');
+
+$subscriber->stop();
+$middleman->stop();
+$publisher->stop();
-- 
2.21.1 (Apple Git-122.3)

