From 4788d7222f9eefb62f27ffd330455127bea4a0ca Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Wed, 1 Dec 2021 11:55:40 +0000
Subject: [PATCH v9] Optionally disable subscriptions on error

Logical replication apply workers for a subscription can easily get
stuck in an infinite loop of attempting to apply a change,
triggering an error (such as a constraint violation), exiting with
an error written to the subscription worker log, and restarting.

To partially remedy the situation, adding a new
subscription_parameter named 'disable_on_error'. To be consistent
with old behavior, the parameter defaults to false. When true, the
apply worker catches errors thrown, and for errors that are deemed
not to be transient, disables the subscription in order to break the
loop. The error is still also written to the logs.

Proposed and written originally by Mark Dilger
Taken over by Osumi Takamichi, Greg Nancarrow
Reviewed by Greg Nancarrow, Vignesh C
Discussion : https://www.postgresql.org/message-id/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com
---
 doc/src/sgml/catalogs.sgml                      |  12 ++
 doc/src/sgml/ref/alter_subscription.sgml        |   4 +-
 doc/src/sgml/ref/create_subscription.sgml       |  12 ++
 src/backend/catalog/pg_subscription.c           |   1 +
 src/backend/catalog/system_views.sql            |   2 +-
 src/backend/commands/subscriptioncmds.c         |  27 +++-
 src/backend/replication/logical/launcher.c      |   1 +
 src/backend/replication/logical/worker.c        | 154 +++++++++++++++++-
 src/bin/pg_dump/pg_dump.c                       |  17 +-
 src/bin/pg_dump/pg_dump.h                       |   1 +
 src/bin/psql/describe.c                         |  10 +-
 src/bin/psql/tab-complete.c                     |   4 +-
 src/include/catalog/pg_subscription.h           |   4 +
 src/test/regress/expected/subscription.out      | 119 ++++++++------
 src/test/regress/sql/subscription.sql           |  14 ++
 src/test/subscription/t/027_disable_on_error.pl | 203 ++++++++++++++++++++++++
 16 files changed, 517 insertions(+), 68 deletions(-)
 create mode 100644 src/test/subscription/t/027_disable_on_error.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1d11be..9041a75 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7713,6 +7713,18 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subdisableonerr</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the subscription will be disabled when subscription
+       worker detects non-transient errors (e.g. duplication error)
+       that require user intervention in the subscription, schema,
+       or data
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
       </para>
       <para>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0b027cc..3109ee9 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -201,8 +201,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, and
-      <literal>streaming</literal>.
+      <literal>binary</literal>,<literal>streaming</literal>, and
+      <literal>disable_on_error</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 990a41f..471352a 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -142,6 +142,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
        </varlistentry>
 
        <varlistentry>
+        <term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription should be automatically disabled
+          if replicating data from the publisher triggers non-transient errors
+          such as referential integrity or permission errors. The default is
+          <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
+       <varlistentry>
         <term><literal>enabled</literal> (<type>boolean</type>)</term>
         <listitem>
          <para>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 25021e2..9b416dd 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
+	sub->disableonerr = subform->subdisableonerr;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 61b515c..41f61bf 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1259,7 +1259,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+              substream, subtwophasestate, subdisableonerr, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_workers AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9427e86..5cb6ca7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_DISABLE_ON_ERR		0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	bool		disableonerr;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -129,6 +131,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->streaming = false;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
+	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
+		opts->disableonerr = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -248,6 +252,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
 			opts->twophase = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+				 strcmp(defel->defname, "disable_on_error") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
+			opts->disableonerr = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -397,7 +410,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+					  SUBOPT_DISABLE_ON_ERR);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -471,6 +485,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
+	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -871,7 +886,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING);
+								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -920,6 +935,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
+				{
+					values[Anum_pg_subscription_subdisableonerr - 1]
+						= BoolGetDatum(opts.disableonerr);
+					replaces[Anum_pg_subscription_subdisableonerr - 1]
+						= true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3fb4caa..febfc4d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -132,6 +132,7 @@ get_subscription_list(void)
 		sub->dbid = subform->subdbid;
 		sub->owner = subform->subowner;
 		sub->enabled = subform->subenabled;
+		sub->disableonerr = subform->subdisableonerr;
 		sub->name = pstrdup(NameStr(subform->subname));
 		/* We don't fill fields we are not interested in. */
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 2e79302..f3a63cd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,6 +136,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
+#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -2755,6 +2756,114 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 }
 
 /*
+ * Check if the error is transient, network protocol related, or resource exhaustion
+ * related. These may clear up without user intervention in the subscription, schema,
+ * or data being replicated.
+ */
+static bool
+IsTransientError(ErrorData *edata)
+{
+	switch (edata->sqlerrcode)
+	{
+		case ERRCODE_CONNECTION_EXCEPTION:
+		case ERRCODE_CONNECTION_DOES_NOT_EXIST:
+		case ERRCODE_CONNECTION_FAILURE:
+		case ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION:
+		case ERRCODE_SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION:
+		case ERRCODE_TRANSACTION_RESOLUTION_UNKNOWN:
+		case ERRCODE_PROTOCOL_VIOLATION:
+		case ERRCODE_INSUFFICIENT_RESOURCES:
+		case ERRCODE_DISK_FULL:
+		case ERRCODE_OUT_OF_MEMORY:
+		case ERRCODE_TOO_MANY_CONNECTIONS:
+		case ERRCODE_CONFIGURATION_LIMIT_EXCEEDED:
+		case ERRCODE_PROGRAM_LIMIT_EXCEEDED:
+		case ERRCODE_STATEMENT_TOO_COMPLEX:
+		case ERRCODE_TOO_MANY_COLUMNS:
+		case ERRCODE_TOO_MANY_ARGUMENTS:
+		case ERRCODE_OPERATOR_INTERVENTION:
+		case ERRCODE_QUERY_CANCELED:
+		case ERRCODE_ADMIN_SHUTDOWN:
+		case ERRCODE_CRASH_SHUTDOWN:
+		case ERRCODE_CANNOT_CONNECT_NOW:
+		case ERRCODE_DATABASE_DROPPED:
+		case ERRCODE_IDLE_SESSION_TIMEOUT:
+			return true;
+		default:
+			break;
+	}
+
+	return false;
+}
+
+/*
+ * Recover from a possibly aborted transaction state and disable the current
+ * subscription.
+ */
+static ErrorData *
+DisableSubscriptionOnError(ErrorData *edata)
+{
+	Relation	rel;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+	HeapTuple	tup;
+	Form_pg_subscription subform;
+
+	/* Disable the subscription in a fresh transaction */
+	ereport(LOG,
+			errmsg("logical replication subscription \"%s\" will be disabled due to error: %s",
+				   MySubscription->name, edata->message));
+
+	AbortOutOfAnyTransaction();
+	FlushErrorState();
+
+	StartTransactionCommand();
+
+	/* Look up our subscription in the catalogs */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(MySubscription->name));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				errmsg("subscription \"%s\" does not exist",
+					   MySubscription->name));
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+	LockSharedObject(SubscriptionRelationId, subform->oid, 0, AccessExclusiveLock);
+
+	/*
+	 * We would not be here unless this subscription's disableonerr field was
+	 * true when our worker began applying changes, but check whether that
+	 * field has changed in the interim.
+	 */
+	if (!subform->subdisableonerr)
+		ReThrowError(edata);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Set the subscription to disabled, and note the reason. */
+	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+	replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+	/* Update the catalog */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+	heap_freetuple(tup);
+
+	table_close(rel, RowExclusiveLock);
+
+	CommitTransactionCommand();
+
+	return edata;
+}
+
+/*
  * Send a Standby Status Update message to server.
  *
  * 'recvpos' is the latest LSN we've received data to, force is set if we need
@@ -3339,6 +3448,8 @@ ApplyWorkerMain(Datum main_arg)
 	char	   *myslotname;
 	WalRcvStreamOptions options;
 	int			server_version;
+	bool		disable_subscription = false;
+	ErrorData  *errdata = NULL;
 
 	/* Attach to slot */
 	logicalrep_worker_attach(worker_slot);
@@ -3441,7 +3552,8 @@ ApplyWorkerMain(Datum main_arg)
 		PG_CATCH();
 		{
 			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
-			ErrorData  *errdata = CopyErrorData();
+
+			errdata = CopyErrorData();
 
 			/*
 			 * Report the table sync error. There is no corresponding message
@@ -3453,11 +3565,23 @@ ApplyWorkerMain(Datum main_arg)
 										  0,	/* message type */
 										  InvalidTransactionId,
 										  errdata->message);
-			MemoryContextSwitchTo(ecxt);
-			PG_RE_THROW();
+
+			/* Decide whether or not we disable this subscription */
+			if (MySubscription->disableonerr &&
+				!IsTransientError(errdata))
+				disable_subscription = true;
+			else
+			{
+				MemoryContextSwitchTo(ecxt);
+				PG_RE_THROW();
+			}
 		}
 		PG_END_TRY();
 
+		/* If we caught an error above, disable the subscription */
+		if (disable_subscription)
+			ReThrowError(DisableSubscriptionOnError(errdata));
+
 		/* allocate slot name in long-lived context */
 		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
@@ -3584,7 +3708,8 @@ ApplyWorkerMain(Datum main_arg)
 		if (apply_error_callback_arg.command != 0)
 		{
 			MemoryContext ecxt = MemoryContextSwitchTo(cctx);
-			ErrorData  *errdata = CopyErrorData();
+
+			errdata = CopyErrorData();
 
 			pgstat_report_subworker_error(MyLogicalRepWorker->subid,
 										  MyLogicalRepWorker->relid,
@@ -3594,13 +3719,28 @@ ApplyWorkerMain(Datum main_arg)
 										  apply_error_callback_arg.command,
 										  apply_error_callback_arg.remote_xid,
 										  errdata->message);
-			MemoryContextSwitchTo(ecxt);
-		}
 
-		PG_RE_THROW();
+			if (MySubscription->disableonerr &&
+				!IsTransientError(errdata))
+				disable_subscription = true;
+			else
+			{
+				/*
+				 * Some work in error recovery work is done. Switch to the old
+				 * memory context.
+				 */
+				MemoryContextSwitchTo(ecxt);
+				PG_RE_THROW();
+			}
+		}
+		else
+			PG_RE_THROW();		/* No need to change the memory context */
 	}
 	PG_END_TRY();
 
+	if (disable_subscription)
+		ReThrowError(DisableSubscriptionOnError(errdata));
+
 	proc_exit(0);
 }
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 5a2094d..3a67a8d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4451,6 +4451,7 @@ getSubscriptions(Archive *fout)
 	int			i_rolname;
 	int			i_substream;
 	int			i_subtwophasestate;
+	int			i_subdisableonerr;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4499,12 +4500,18 @@ getSubscriptions(Archive *fout)
 		appendPQExpBufferStr(query, " false AS substream,\n");
 
 	if (fout->remoteVersion >= 150000)
-		appendPQExpBufferStr(query, " s.subtwophasestate\n");
+		appendPQExpBufferStr(query, " s.subtwophasestate,\n");
 	else
 		appendPQExpBuffer(query,
-						  " '%c' AS subtwophasestate\n",
+						  " '%c' AS subtwophasestate,\n",
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBuffer(query, " s.subdisableonerr\n");
+	else
+		appendPQExpBuffer(query,
+						  " false AS subdisableonerr\n");
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
@@ -4525,6 +4532,7 @@ getSubscriptions(Archive *fout)
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
+	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4552,6 +4560,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_substream));
 		subinfo[i].subtwophasestate =
 			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+		subinfo[i].subdisableonerr =
+			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 
 		if (strlen(subinfo[i].rolname) == 0)
 			pg_log_warning("owner of subscription \"%s\" appears to be invalid",
@@ -4624,6 +4634,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
 		appendPQExpBufferStr(query, ", two_phase = on");
 
+	if (strcmp(subinfo->subdisableonerr, "f") != 0)
+		appendPQExpBufferStr(query, ", disable_on_error = on");
+
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index d1d8608..0512111 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -655,6 +655,7 @@ typedef struct _SubscriptionInfo
 	char	   *subbinary;
 	char	   *substream;
 	char	   *subtwophasestate;
+	char	   *subdisableonerr;
 	char	   *subsynccommit;
 	char	   *subpublications;
 } SubscriptionInfo;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index ea721d9..e2521a2 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6508,7 +6508,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false};
+	false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6542,11 +6542,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Binary"),
 							  gettext_noop("Streaming"));
 
-		/* Two_phase is only supported in v15 and higher */
+		/* Two_phase and disable_on_error is only supported in v15 and higher */
 		if (pset.sversion >= 150000)
 			appendPQExpBuffer(&buf,
-							  ", subtwophasestate AS \"%s\"\n",
-							  gettext_noop("Two phase commit"));
+							  ", subtwophasestate AS \"%s\"\n"
+							  ", subdisableonerr AS \"%s\"\n",
+							  gettext_noop("Two phase commit"),
+							  gettext_noop("Disable On Error"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 2f412ca..5497290 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1681,7 +1681,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
 	/* ALTER SUBSCRIPTION <name> SET PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
 	{
@@ -2939,7 +2939,7 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "enabled", "slot_name", "streaming",
-					  "synchronous_commit", "two_phase");
+					  "synchronous_commit", "two_phase", "disable_on_error");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 2106149..284a90d 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
+	bool		subdisableonerr;	/* True if apply errors should disable the
+									 * subscription upon error */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +106,7 @@ typedef struct Subscription
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
+	bool		disableonerr;	/* Whether errors automatically disable */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 15a1ac6..db0bd19 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -94,10 +94,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | off                | dbname=regress_doesnotexist2
+                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -129,10 +129,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                            List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | local              | dbname=regress_doesnotexist2
+                                                                                     List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +165,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +188,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +215,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +233,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +270,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +282,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +294,33 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+ERROR:  disable_on_error requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+\dRs+
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable On Error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 7faa935..ee4a39b 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -228,6 +228,20 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+
+\dRs+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/027_disable_on_error.pl b/src/test/subscription/t/027_disable_on_error.pl
new file mode 100644
index 0000000..1104a50
--- /dev/null
+++ b/src/test/subscription/t/027_disable_on_error.pl
@@ -0,0 +1,203 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 10;
+
+# Wait for the named subscriptions to catch up or to be disabled.
+sub wait_for_subscriptions
+{
+	my ($node_name, $dbname, @subscriptions) = @_;
+
+	# Unique-ify the subscriptions passed by the caller
+	my %unique       = map { $_ => 1 } @subscriptions;
+	my @unique       = sort keys %unique;
+	my $unique_count = scalar(@unique);
+
+	# Construct a SQL list from the unique subscription names
+	my @escaped = map { s/'/''/g; s/\\/\\\\/g; $_ } @unique;
+	my $sublist = join(', ', map { "'$_'" } @escaped);
+
+	my $polling_sql = qq(
+		SELECT COUNT(1) = $unique_count FROM
+			(SELECT s.oid
+				FROM pg_catalog.pg_subscription s
+				LEFT JOIN pg_catalog.pg_subscription_rel sr
+				ON sr.srsubid = s.oid
+				WHERE (sr IS NULL OR sr.srsubstate IN ('s', 'r'))
+				  AND s.subname IN ($sublist)
+				  AND s.subenabled IS TRUE
+			 UNION
+			 SELECT s.oid
+				FROM pg_catalog.pg_subscription s
+				WHERE s.subname IN ($sublist)
+				  AND s.subenabled IS FALSE
+			) AS synced_or_disabled
+		);
+	return $node_name->poll_query_until($dbname, $polling_sql);
+}
+
+my @schemas = qw(s1 s2);
+my ($schema, $cmd);
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create identical schema, table and index on both the publisher and
+# subscriber
+for $schema (@schemas)
+{
+	$cmd = qq(
+CREATE SCHEMA $schema;
+CREATE TABLE $schema.tbl (i INT);
+ALTER TABLE $schema.tbl REPLICA IDENTITY FULL;
+CREATE INDEX ${schema}_tbl_idx ON $schema.tbl(i));
+	$node_publisher->safe_psql('postgres', $cmd);
+	$node_subscriber->safe_psql('postgres', $cmd);
+}
+
+# Create non-unique data in both schemas on the publisher.
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (1), (1), (1));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Create an additional unique index in schema s1 on the subscriber only.  When
+# we create subscriptions, below, this should cause subscription "s1" on the
+# subscriber to fail during initial synchronization and to get automatically
+# disabled.
+$cmd = qq(CREATE UNIQUE INDEX s1_tbl_unique ON s1.tbl (i));
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Create publications and subscriptions linking the schemas on
+# the publisher with those on the subscriber.  This tests that the
+# uniqueness violations cause subscription "s1" to fail during
+# initial synchronization.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+for $schema (@schemas)
+{
+	# Create the publication for this table
+	$cmd = qq(
+CREATE PUBLICATION $schema FOR TABLE $schema.tbl);
+	$node_publisher->safe_psql('postgres', $cmd);
+
+	# Create the subscription for this table
+	$cmd = qq(
+CREATE SUBSCRIPTION $schema
+	CONNECTION '$publisher_connstr'
+	PUBLICATION $schema
+	WITH (disable_on_error = true));
+	$node_subscriber->safe_psql('postgres', $cmd);
+}
+
+# Wait for the initial subscription synchronizations to finish or fail.
+wait_for_subscriptions($node_subscriber, 'postgres', @schemas)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should have disabled itself due to error.
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 no longer enabled");
+
+# Subscription "s2" should have copied the initial data without incident.
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = qq(SELECT i, COUNT(*) FROM s2.tbl GROUP BY i);
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"1|3", "subscription s2 replicated initial data");
+
+# Enter unique data for both schemas on the publisher.  This should succeed on
+# the publisher node, and not cause any additional problems on the subscriber
+# side either, though disabled subscription "s1" should not replicate anything.
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (2));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Wait for the data to replicate for the subscriptions.  This tests that the
+# problems encountered by subscription "s1" do not cause subscription "s2" to
+# get stuck.
+wait_for_subscriptions($node_subscriber, 'postgres', @schemas)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should still be disabled and have replicated no data
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 still disabled");
+
+# Subscription "s2" should still be enabled and have replicated all changes
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = q(SELECT MAX(i), COUNT(*) FROM s2.tbl);
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"2|4", "subscription s2 replicated data");
+
+# Drop the unique index on "s1" which caused the subscription to be disabled
+$cmd = qq(DROP INDEX s1.s1_tbl_unique);
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Re-enable the subscription "s1"
+$cmd = q(ALTER SUBSCRIPTION s1 ENABLE);
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Wait for the data to replicate
+wait_for_subscriptions($node_subscriber, 'postgres', @schemas)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check that we have the new data in s1.tbl
+$cmd = q(SELECT MAX(i), COUNT(*) FROM s1.tbl);
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"2|4", "subscription s1 replicated data");
+
+# Delete the data from the subscriber only, and recreate the unique index
+$cmd = q(
+DELETE FROM s1.tbl;
+CREATE UNIQUE INDEX s1_tbl_unique ON s1.tbl (i));
+$node_subscriber->safe_psql('postgres', $cmd);
+
+# Add more non-unique data to the publisher
+for $schema (@schemas)
+{
+	$cmd = qq(INSERT INTO $schema.tbl (i) VALUES (3), (3), (3));
+	$node_publisher->safe_psql('postgres', $cmd);
+}
+
+# Wait for the data to replicate for the subscriptions.  This tests that
+# uniqueness violations encountered during replication cause s1 to be disabled.
+wait_for_subscriptions($node_subscriber, 'postgres', @schemas)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscription "s1" should have disabled itself due to error.
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's1');
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"f", "subscription s1 no longer enabled");
+
+# Subscription "s2" should have copied the initial data without incident.
+$cmd = qq(
+SELECT subenabled FROM pg_catalog.pg_subscription WHERE subname = 's2');
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"t", "subscription s2 still enabled");
+$cmd = qq(SELECT MAX(i), COUNT(*) FROM s2.tbl);
+is($node_subscriber->safe_psql('postgres', $cmd),
+	"3|7", "subscription s2 replicated additional data");
+
+$node_subscriber->stop;
+$node_publisher->stop;
-- 
1.8.3.1

