From ae43c41fdfd9025d724b29fc762a6f05bece9a40 Mon Sep 17 00:00:00 2001
From: Osumi Takamichi <osumi.takamichi@fujitsu.com>
Date: Tue, 2 Nov 2021 09:53:53 +0000
Subject: [PATCH v3] Optionally disabling subscriptions on error

Logical replication apply workers for a subscription can easily get
stuck in an infinite loop of attempting to apply a change,
triggering an error (such as a constraint violation), exiting with
an error written to the subscription worker log, and restarting.

To partially remedy the situation, adding a new
subscription_parameter named 'disable_on_error'.  To be consistent
with old behavior, the parameter defaults to false.  When true, the
apply worker catches errors thrown, and for errors that are deemed
not to be transient, disables the subscription in order to break the
loop. The error is still also written to the logs.

Proposed and Written originally by Mark Dilger.
---
 doc/src/sgml/ref/create_subscription.sgml       |  12 ++
 src/backend/catalog/pg_subscription.c           |   1 +
 src/backend/catalog/system_views.sql            |   2 +-
 src/backend/commands/subscriptioncmds.c         |  27 +++-
 src/backend/replication/logical/launcher.c      |   1 +
 src/backend/replication/logical/worker.c        | 167 +++++++++++++++++++--
 src/include/catalog/pg_subscription.h           |   4 +
 src/test/perl/PostgreSQL/Test/Cluster.pm        |  40 +++++
 src/test/subscription/t/027_disable_on_error.pl | 192 ++++++++++++++++++++++++
 9 files changed, 434 insertions(+), 12 deletions(-)
 create mode 100644 src/test/subscription/t/027_disable_on_error.pl

diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 990a41f..ef12277 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -142,6 +142,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
        </varlistentry>
 
        <varlistentry>
+        <term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription should be automatically disabled
+          if replicating data from the publisher triggers non-transicent errors
+          such as referential integrity or permissions errors. The default is
+          <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
         <term><literal>enabled</literal> (<type>boolean</type>)</term>
         <listitem>
          <para>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 25021e2..fcace32 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->name = pstrdup(NameStr(subform->subname));
 	sub->owner = subform->subowner;
 	sub->enabled = subform->subenabled;
+	sub->disableonerr = subform->subdisableonerr;
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a2ee00c..8220fca 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1258,7 +1258,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
+GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subdisableonerr, subbinary,
               substream, subtwophasestate, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 18962b9..122eed7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_DISABLE_ON_ERR		0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	bool		disableonerr;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -129,6 +131,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->streaming = false;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
+	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
+		opts->disableonerr = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -248,6 +252,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
 			opts->twophase = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+				 strcmp(defel->defname, "disable_on_error") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
+			opts->disableonerr = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -397,7 +410,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+					  SUBOPT_DISABLE_ON_ERR);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -465,6 +479,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
+	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
 	values[Anum_pg_subscription_subtwophasestate - 1] =
@@ -871,11 +886,19 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING);
+								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
+				if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
+				{
+					values[Anum_pg_subscription_subdisableonerr - 1]
+						= BoolGetDatum(opts.disableonerr);
+					values[Anum_pg_subscription_subdisableonerr - 1]
+						= true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 				{
 					/*
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3fb4caa..febfc4d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -132,6 +132,7 @@ get_subscription_list(void)
 		sub->dbid = subform->subdbid;
 		sub->owner = subform->subowner;
 		sub->enabled = subform->subenabled;
+		sub->disableonerr = subform->subdisableonerr;
 		sub->name = pstrdup(NameStr(subform->subname));
 		/* We don't fill fields we are not interested in. */
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 3a40684..c79fad5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -334,6 +335,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
 static void apply_error_callback(void *arg);
 static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
 static inline void reset_apply_error_context_info(void);
+static bool IsSubscriptionDisablingError(void);
 
 /*
  * Should this worker apply changes for given relation.
@@ -2752,6 +2754,123 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 }
 
 /*
+ * Errors which are transient, network protocol related, or resource exhaustion
+ * related, should not disable a subscription.  These may clear up without user
+ * intervention in the subscription, schema, or data being replicated.
+ */
+static bool
+IsSubscriptionDisablingError(void)
+{
+	switch (geterrcode())
+	{
+		case ERRCODE_CONNECTION_EXCEPTION:
+		case ERRCODE_CONNECTION_DOES_NOT_EXIST:
+		case ERRCODE_CONNECTION_FAILURE:
+		case ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION:
+		case ERRCODE_SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION:
+		case ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN:
+		case ERRCODE_PROTOCOL_VIOLATION:
+		case ERRCODE_INSUFFICIENT_RESOURCES:
+		case ERRCODE_DISK_FULL:
+		case ERRCODE_OUT_OF_MEMORY:
+		case ERRCODE_TOO_MANY_CONNECTIONS:
+		case ERRCODE_CONFIGURATION_LIMIT_EXCEEDED:
+		case ERRCODE_PROGRAM_LIMIT_EXCEEDED:
+		case ERRCODE_STATEMENT_TOO_COMPLEX:
+		case ERRCODE_TOO_MANY_COLUMNS:
+		case ERRCODE_TOO_MANY_ARGUMENTS:
+		case ERRCODE_OPERATOR_INTERVENTION:
+		case ERRCODE_QUERY_CANCELED:
+		case ERRCODE_ADMIN_SHUTDOWN:
+		case ERRCODE_CRASH_SHUTDOWN:
+		case ERRCODE_CANNOT_CONNECT_NOW:
+		case ERRCODE_DATABASE_DROPPED:
+		case ERRCODE_IDLE_SESSION_TIMEOUT:
+			return false;
+		default:
+			break;
+	}
+
+	return true;
+}
+
+/*
+ * Recover from a possibly aborted transaction state and disable the current
+ * subscription.
+ */
+static ErrorData *
+DisableSubscriptionOnError(MemoryContext mcxt)
+{
+	Relation	rel;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+	HeapTuple	tup;
+	Oid			subid;
+	Form_pg_subscription subform;
+	ErrorData  *edata;
+
+	/*
+	 * Clean up from the error and get a fresh transaction in which to
+	 * disable the subscription.
+	 */
+	MemoryContextSwitchTo(mcxt);
+	edata = CopyErrorData();
+
+	ereport(LOG,
+			(errmsg("logical replication subscription \"%s\" will be disabled due to error: %s",
+					MySubscription->name, edata->message)));
+
+	AbortOutOfAnyTransaction();
+	FlushErrorState();
+
+	StartTransactionCommand();
+
+	/* Look up our subscription in the catalogs */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(MySubscription->name));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("subscription \"%s\" does not exist",
+						MySubscription->name)));
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+	subid = subform->oid;
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
+	/*
+	 * We would not be here unless this subscription's disableonerr
+	 * field was true when our worker began applying changes, but check
+	 * whether that field has changed in the interim.
+	 */
+	if (!subform->subdisableonerr)
+		ReThrowError(edata);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Set the subscription to disabled, and note the reason. */
+	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+	replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+	/* Update the catalog */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	CommitTransactionCommand();
+
+	return edata;
+}
+
+/*
  * Send a Standby Status Update message to server.
  *
  * 'recvpos' is the latest LSN we've received data to, force is set if we need
@@ -3336,6 +3455,9 @@ ApplyWorkerMain(Datum main_arg)
 	char	   *myslotname;
 	WalRcvStreamOptions options;
 	int			server_version;
+	bool		disable_subscription = false;
+	MemoryContext ecxt;
+	ErrorData  *errdata;
 
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
@@ -3437,8 +3559,8 @@ ApplyWorkerMain(Datum main_arg)
 		}
 		PG_CATCH();
 		{
-			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
-			ErrorData  *errdata = CopyErrorData();
+			ecxt = MemoryContextSwitchTo(cctx);
+			errdata = CopyErrorData();
 
 			/*
 			 * Report the table sync error. There is no corresponding message
@@ -3450,11 +3572,26 @@ ApplyWorkerMain(Datum main_arg)
 										  0,	/* message type */
 										  InvalidTransactionId,
 										  errdata->message);
-			MemoryContextSwitchTo(ecxt);
-			PG_RE_THROW();
+
+			/* Decide whether or not we disable this subscription */
+			if (MySubscription->disableonerr &&
+				IsSubscriptionDisablingError())
+				disable_subscription = true;
+			else
+			{
+				MemoryContextSwitchTo(ecxt);
+				PG_RE_THROW();
+			}
 		}
 		PG_END_TRY();
 
+		/* If we caught an error above, disable the subscription */
+		if (disable_subscription)
+		{
+			ReThrowError(DisableSubscriptionOnError(cctx));
+			MemoryContextSwitchTo(ecxt);
+		}
+
 		/* allocate slot name in long-lived context */
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
@@ -3580,8 +3717,8 @@ ApplyWorkerMain(Datum main_arg)
 		/* report the apply error */
 		if (apply_error_callback_arg.command != 0)
 		{
-			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
-			ErrorData  *errdata = CopyErrorData();
+			ecxt = MemoryContextSwitchTo(cctx);
+			errdata = CopyErrorData();
 
 			pgstat_report_subworker_error(MyLogicalRepWorker->subid,
 										  MyLogicalRepWorker->relid,
@@ -3591,13 +3728,25 @@ ApplyWorkerMain(Datum main_arg)
 										  apply_error_callback_arg.command,
 										  apply_error_callback_arg.remote_xid,
 										  errdata->message);
-			MemoryContextSwitchTo(ecxt);
-		}
 
-		PG_RE_THROW();
+			if (MySubscription->disableonerr &&
+				IsSubscriptionDisablingError())
+				disable_subscription = true;
+			else
+			{
+				PG_RE_THROW();
+				MemoryContextSwitchTo(ecxt);
+			}
+		}
 	}
 	PG_END_TRY();
 
+	if (disable_subscription)
+	{
+		ReThrowError(DisableSubscriptionOnError(cctx));
+		MemoryContextSwitchTo(ecxt);
+	}
+
 	proc_exit(0);
 }
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 2106149..ff50caa 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -60,6 +60,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	bool		subdisableonerr; /* True if apply errors should
+								  * disable the subscription upon error */
+
 	bool		subbinary;		/* True if the subscription wants the
 								 * publisher to send data in binary */
 
@@ -99,6 +102,7 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	bool		disableonerr;	/* Whether errors automatically disable */
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 9467a19..b830be4 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -2586,6 +2586,46 @@ sub wait_for_slot_catchup
 	return;
 }
 
+=pot
+
+=item $node->wait_for_subscriptions($dbname, @subcriptions)
+
+Wait for the named subscriptions to catch up or to be disabled.
+
+=cut
+
+sub wait_for_subscriptions
+{
+	my ($self, $dbname, @subscriptions) = @_;
+
+	# Unique-ify the subscriptions passed by the caller
+	my %unique = map { $_ => 1 } @subscriptions;
+	my @unique = sort keys %unique;
+	my $unique_count = scalar(@unique);
+
+	# Construct a SQL list from the unique subscription names
+	my @escaped = map { s/'/''/g; s/\\/\\\\/g; $_ } @unique;
+	my $sublist = join(', ', map { "'$_'" } @escaped);
+
+	my $polling_sql = qq(
+		SELECT COUNT(1) = $unique_count FROM
+			(SELECT s.oid
+				FROM pg_catalog.pg_subscription s
+				LEFT JOIN pg_catalog.pg_subscription_rel sr
+				ON sr.srsubid = s.oid
+				WHERE (sr IS NULL OR sr.srsubstate IN ('s', 'r'))
+				  AND s.subname IN ($sublist)
+				  AND s.subenabled IS TRUE
+			 UNION
+			 SELECT s.oid
+				FROM pg_catalog.pg_subscription s
+				WHERE s.subname IN ($sublist)
+				  AND s.subenabled IS FALSE
+			) AS synced_or_disabled
+		);
+	return $self->poll_query_until($dbname, $polling_sql);
+}
+
 =pod
 
 =item $node->query_hash($dbname, $query, @columns)
diff --git a/src/test/subscription/t/027_disable_on_error.pl b/src/test/subscription/t/027_disable_on_error.pl
new file mode 100644
index 0000000..e18ce0d
--- /dev/null
+++ b/src/test/subscription/t/027_disable_on_error.pl
@@ -0,0 +1,192 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature
+use strict;
+use warnings;
+# use PostgresNode;
+# use TestLib;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 10;
+
+my @schemas = qw(s1 s2);
+my ($schema, $cmd);
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber =  PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create identical schema, table and index on both the publisher and
+# subscriber
+#
+for $schema (@schemas)
+{
+	$cmd = qq(
+CREATE SCHEMA $schema;
+CREATE TABLE $schema.tbl (i INT);
+ALTER TABLE $schema.tbl REPLICA IDENTITY FULL;
+CREATE INDEX ${schema}_tbl_idx ON $schema.tbl(i));
+	$node_publisher->safe_psql('postgres', $cmd);
+	$node_subscriber->safe_psql('postgres', $cmd);
+}
+
+# Create non-unique data in both schemas on the publisher.
+#
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (1), (1), (1));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Create an additional unique index in schema s1 on the subscriber only.  When
+# we create subscriptions, below, this should cause subscription "s1" on the
+# subscriber to fail during initial synchronization and to get automatically
+# disabled.
+#
+$cmd = qq(CREATE UNIQUE INDEX s1_tbl_unique ON s1.tbl (i));
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Create publications and subscriptions linking the schemas on
+# the publisher with those on the subscriber.  This tests that the
+# uniqueness violations cause subscription "s1" to fail during
+# initial synchronization.
+#
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+for $schema (@schemas)
+{
+	# Create the publication for this table
+	$cmd = qq(
+CREATE PUBLICATION $schema FOR TABLE $schema.tbl);
+	$node_publisher->safe_psql('postgres', $cmd);
+
+	# Create the subscription for this table
+	$cmd = qq(
+CREATE SUBSCRIPTION $schema
+	CONNECTION '$publisher_connstr'
+	PUBLICATION $schema
+	WITH (disable_on_error = true));
+	$node_subscriber->safe_psql('postgres', $cmd);
+}
+
+# Wait for the initial subscription synchronizations to finish or fail.
+#
+$node_subscriber->wait_for_subscriptions('postgres', @schemas)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should have disabled itself due to error.
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 no longer enabled");
+
+# Subscription "s2" should have copied the initial data without incident.
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = qq(SELECT i, COUNT(*) FROM s2.tbl GROUP BY i);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"1|3",
+	"subscription s2 replicated initial data");
+
+# Enter unique data for both schemas on the publisher.  This should succeed on
+# the publisher node, and not cause any additional problems on the subscriber
+# side either, though disabled subscription "s1" should not replicate anything.
+#
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (2));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Wait for the data to replicate for the subscriptions.  This tests that the
+# problems encountered by subscription "s1" do not cause subscription "s2" to
+# get stuck.
+$node_subscriber->wait_for_subscriptions('postgres', @schemas)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should still be disabled and have replicated no data
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 still disabled");
+
+# Subscription "s2" should still be enabled and have replicated all changes
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = q(SELECT MAX(i), COUNT(*) FROM s2.tbl);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"2|4", "subscription s2 replicated data");
+
+# Drop the unique index on "s1" which caused the subscription to be disabled
+#
+$cmd = qq(DROP INDEX s1.s1_tbl_unique);
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Re-enable the subscription "s1"
+#
+$cmd = q(ALTER SUBSCRIPTION s1 ENABLE);
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Wait for the data to replicate
+#
+$node_subscriber->wait_for_subscriptions('postgres', @schemas)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check that we have the new data in s1.tbl
+#
+$cmd = q(SELECT MAX(i), COUNT(*) FROM s1.tbl);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"2|4", "subscription s1 replicated data");
+
+# Delete the data from the subscriber only, and recreate the unique index
+#
+$cmd = q(
+DELETE FROM s1.tbl;
+CREATE UNIQUE INDEX s1_tbl_unique ON s1.tbl (i));
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Add more non-unique data to the publisher
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (3), (3), (3));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Wait for the data to replicate for the subscriptions.  This tests that
+# uniqueness violations encountered during replication cause s1 to be disabled.
+#
+$node_subscriber->wait_for_subscriptions('postgres', @schemas)
+	or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should have disabled itself due to error.
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 no longer enabled");
+
+# Subscription "s2" should have copied the initial data without incident.
+#
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = qq(SELECT MAX(i), COUNT(*) FROM s2.tbl);
+is ($node_subscriber->safe_psql('postgres', $cmd),
+	"3|7",
+	"subscription s2 replicated additional data");
+
+$node_subscriber->stop;
+$node_publisher->stop;
-- 
2.2.0

