From 8bdc99120d584afe86620ab141acc6ef1aebf8a8 Mon Sep 17 00:00:00 2001
From: Takamichi Osumi <osumi.takamichi@fujitsu.com>
Date: Mon, 7 Mar 2022 09:06:23 +0000
Subject: [PATCH v29] Optionally disable subscriptions on error

Logical replication apply workers for a subscription can easily get
stuck in an infinite loop of attempting to apply a change,
triggering an error (such as a constraint violation), exiting with
the error written to the subscription worker log, and restarting.

To partially remedy the situation, this patch adds a new
subscription_parameter named 'disable_on_error'. To be consistent
with old behavior, the parameter defaults to false. When true, both
the tablesync worker and apply worker catch any errors thrown and
disable the subscription in order to break the loop. The error is
still also written to the logs.

A catalog version bump is required.

Proposed and written originally by Mark Dilger
Taken over by Osumi Takamichi, Greg Nancarrow, Peter Smith, Masahiko Sawada
Reviewed by Greg Nancarrow, Vignesh C, Amit Kapila, Wang wei, Tang Haiying,
            Peter Smith, Masahiko Sawada, Shi Yu
Discussion : https://www.postgresql.org/message-id/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com
---
 doc/src/sgml/catalogs.sgml                      |  10 ++
 doc/src/sgml/ref/alter_subscription.sgml        |   4 +-
 doc/src/sgml/ref/create_subscription.sgml       |  12 ++
 src/backend/catalog/pg_subscription.c           |  40 ++++++
 src/backend/catalog/system_views.sql            |   3 +-
 src/backend/commands/subscriptioncmds.c         |  27 +++-
 src/backend/replication/logical/worker.c        | 159 +++++++++++++++++-------
 src/bin/pg_dump/pg_dump.c                       |  17 ++-
 src/bin/pg_dump/pg_dump.h                       |   1 +
 src/bin/psql/describe.c                         |  10 +-
 src/bin/psql/tab-complete.c                     |   4 +-
 src/include/catalog/pg_subscription.h           |   7 ++
 src/test/regress/expected/subscription.out      | 119 +++++++++++-------
 src/test/regress/sql/subscription.sql           |  15 +++
 src/test/subscription/t/029_disable_on_error.pl | 100 +++++++++++++++
 15 files changed, 421 insertions(+), 107 deletions(-)
 create mode 100644 src/test/subscription/t/029_disable_on_error.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 83987a9..7777d60 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -7771,6 +7771,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subdisableonerr</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the subscription will be disabled if one of its workers
+       detects an error
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
       </para>
       <para>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0d6f064..58b78a9 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -204,8 +204,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, and
-      <literal>streaming</literal>.
+      <literal>binary</literal>, <literal>streaming</literal>, and
+      <literal>disable_on_error</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e80a261..b701752 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -290,6 +290,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
 
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription should be automatically disabled
+          if any errors are detected by subscription workers during data
+          replication from the publisher. The default is
+          <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </para>
 
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca65a8b..1fc95ad 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
+	sub->disableonerr = subform->subdisableonerr;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
@@ -157,6 +158,45 @@ FreeSubscription(Subscription *sub)
 }
 
 /*
+ * Disable the given subscription.
+ */
+void
+DisableSubscription(Oid subid)
+{
+	Relation	rel;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+	HeapTuple	tup;
+
+	/* Look up our subscription in the catalog */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Set the subscription to disabled. */
+	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+	replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+	/* Update the catalog */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+	heap_freetuple(tup);
+
+	table_close(rel, NoLock);
+}
+
+/*
  * get_subscription_oid - given a subscription name, look up the OID
  *
  * If missing_ok is false, throw an error if name not found.  If true, just
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 40b7bca..bb1ac30 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
-              substream, subtwophasestate, subslotname, subsynccommit, subpublications)
+              substream, subtwophasestate, subdisableonerr, subslotname,
+              subsynccommit, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607..3922658 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_DISABLE_ON_ERR		0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	bool		disableonerr;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->streaming = false;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
+	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
+		opts->disableonerr = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
 			opts->twophase = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
+				 strcmp(defel->defname, "disable_on_error") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
+			opts->disableonerr = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+					  SUBOPT_DISABLE_ON_ERR);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
 					 LOGICALREP_TWOPHASE_STATE_DISABLED);
+	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (opts.slot_name)
@@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING);
+								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
+				{
+					values[Anum_pg_subscription_subdisableonerr - 1]
+						= BoolGetDatum(opts.disableonerr);
+					replaces[Anum_pg_subscription_subdisableonerr - 1]
+						= true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 92aa794..aaaaea8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -301,6 +301,8 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void maybe_reread_subscription(void);
 
+static void worker_post_error_processing(void);
+
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
@@ -3370,6 +3372,79 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
 	snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
 }
 
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if it's required.
+ */
+static void
+start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
+{
+	char	   *syncslotname;
+
+	Assert(am_tablesync_worker());
+
+	PG_TRY();
+	{
+		/* Call initial sync. */
+		syncslotname = LogicalRepSyncTableStart(origin_startpos);
+	}
+	PG_CATCH();
+	{
+		if (MySubscription->disableonerr)
+			worker_post_error_processing();
+		else
+		{
+			/*
+			 * Abort the current transaction so that we send the stats message
+			 * in an idle state.
+			 */
+			AbortOutOfAnyTransaction();
+
+			/* Report the worker failed during table synchronization */
+			pgstat_report_subscription_error(MySubscription->oid, false);
+
+			PG_RE_THROW();
+		}
+	}
+	PG_END_TRY();
+
+	/* allocate slot name in long-lived context */
+	*myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
+	pfree(syncslotname);
+}
+
+/*
+ * Run the apply loop with error handling. Disable the subscription,
+ * if necessary.
+ */
+static void
+start_apply(XLogRecPtr origin_startpos)
+{
+	PG_TRY();
+	{
+		LogicalRepApplyLoop(origin_startpos);
+	}
+	PG_CATCH();
+	{
+		if (MySubscription->disableonerr)
+			worker_post_error_processing();
+		else
+		{
+			/*
+			 * Abort the current transaction so that we send the stats message
+			 * in an idle state.
+			 */
+			AbortOutOfAnyTransaction();
+
+			/* Report the worker failed while applying changes */
+			pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+
+			PG_RE_THROW();
+		}
+	}
+	PG_END_TRY();
+}
+
 /* Logical Replication Apply worker entry point */
 void
 ApplyWorkerMain(Datum main_arg)
@@ -3377,8 +3452,8 @@ ApplyWorkerMain(Datum main_arg)
 	int			worker_slot = DatumGetInt32(main_arg);
 	MemoryContext oldctx;
 	char		originname[NAMEDATALEN];
-	XLogRecPtr	origin_startpos;
-	char	   *myslotname;
+	XLogRecPtr	origin_startpos = InvalidXLogRecPtr;
+	char	   *myslotname = NULL;
 	WalRcvStreamOptions options;
 	int			server_version;
 
@@ -3472,34 +3547,7 @@ ApplyWorkerMain(Datum main_arg)
 		 MySubscription->conninfo);
 
 	if (am_tablesync_worker())
-	{
-		char	   *syncslotname;
-
-		PG_TRY();
-		{
-			/* This is table synchronization worker, call initial sync. */
-			syncslotname = LogicalRepSyncTableStart(&origin_startpos);
-		}
-		PG_CATCH();
-		{
-			/*
-			 * Abort the current transaction so that we send the stats message
-			 * in an idle state.
-			 */
-			AbortOutOfAnyTransaction();
-
-			/* Report the worker failed during table synchronization */
-			pgstat_report_subscription_error(MySubscription->oid, false);
-
-			PG_RE_THROW();
-		}
-		PG_END_TRY();
-
-		/* allocate slot name in long-lived context */
-		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
-
-		pfree(syncslotname);
-	}
+		start_table_sync(&origin_startpos, &myslotname);
 	else
 	{
 		/* This is main apply worker */
@@ -3611,24 +3659,43 @@ ApplyWorkerMain(Datum main_arg)
 	}
 
 	/* Run the main loop. */
-	PG_TRY();
-	{
-		LogicalRepApplyLoop(origin_startpos);
-	}
-	PG_CATCH();
-	{
-		/*
-		 * Abort the current transaction so that we send the stats message in
-		 * an idle state.
-		 */
-		AbortOutOfAnyTransaction();
+	start_apply(origin_startpos);
 
-		/* Report the worker failed while applying changes */
-		pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+	proc_exit(0);
+}
 
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
+/*
+ * Abort and cleanup the current transaction, then do post-error processing.
+ * This function must be called in a PG_CATCH() block.
+ */
+static void
+worker_post_error_processing(void)
+{
+	/*
+	 * Emit the error message, and recover from the error state to an idle
+	 * state
+	 */
+	HOLD_INTERRUPTS();
+
+	EmitErrorReport();
+	AbortOutOfAnyTransaction();
+	FlushErrorState();
+
+	RESUME_INTERRUPTS();
+
+	/* Report the worker failed during either table synchronization or apply */
+	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+									 !am_tablesync_worker());
+
+	/* Disable the subscription */
+	StartTransactionCommand();
+	DisableSubscription(MySubscription->oid);
+	CommitTransactionCommand();
+
+	/* Notify the subscription has been disabled */
+	ereport(LOG,
+			errmsg("logical replication subscription \"%s\" has been be disabled due to an error",
+				   MySubscription->name));
 
 	proc_exit(0);
 }
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e69dcf8..7c9ab0c 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4293,6 +4293,7 @@ getSubscriptions(Archive *fout)
 	int			i_subowner;
 	int			i_substream;
 	int			i_subtwophasestate;
+	int			i_subdisableonerr;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4340,12 +4341,18 @@ getSubscriptions(Archive *fout)
 		appendPQExpBufferStr(query, " false AS substream,\n");
 
 	if (fout->remoteVersion >= 150000)
-		appendPQExpBufferStr(query, " s.subtwophasestate\n");
+		appendPQExpBufferStr(query, " s.subtwophasestate,\n");
 	else
 		appendPQExpBuffer(query,
-						  " '%c' AS subtwophasestate\n",
+						  " '%c' AS subtwophasestate,\n",
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBuffer(query, " s.subdisableonerr\n");
+	else
+		appendPQExpBuffer(query,
+						  " false AS subdisableonerr\n");
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
@@ -4366,6 +4373,7 @@ getSubscriptions(Archive *fout)
 	i_subbinary = PQfnumber(res, "subbinary");
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
+	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4393,6 +4401,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_substream));
 		subinfo[i].subtwophasestate =
 			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
+		subinfo[i].subdisableonerr =
+			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4463,6 +4473,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
 		appendPQExpBufferStr(query, ", two_phase = on");
 
+	if (strcmp(subinfo->subdisableonerr, "t") == 0)
+		appendPQExpBufferStr(query, ", disable_on_error = true");
+
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 997a3b6..772dc0c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -657,6 +657,7 @@ typedef struct _SubscriptionInfo
 	char	   *subbinary;
 	char	   *substream;
 	char	   *subtwophasestate;
+	char	   *subdisableonerr;
 	char	   *subsynccommit;
 	char	   *subpublications;
 } SubscriptionInfo;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index e338293..9229eac 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6084,7 +6084,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false};
+	false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6118,11 +6118,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Binary"),
 							  gettext_noop("Streaming"));
 
-		/* Two_phase is only supported in v15 and higher */
+		/* Two_phase and disable_on_error are only supported in v15 and higher */
 		if (pset.sversion >= 150000)
 			appendPQExpBuffer(&buf,
-							  ", subtwophasestate AS \"%s\"\n",
-							  gettext_noop("Two phase commit"));
+							  ", subtwophasestate AS \"%s\"\n"
+							  ", subdisableonerr AS \"%s\"\n",
+							  gettext_noop("Two phase commit"),
+							  gettext_noop("Disable on error"));
 
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 6957567..e630acc 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1834,7 +1834,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
 	/* ALTER SUBSCRIPTION <name> SET PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
 	{
@@ -3104,7 +3104,7 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "enabled", "slot_name", "streaming",
-					  "synchronous_commit", "two_phase");
+					  "synchronous_commit", "two_phase", "disable_on_error");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 18c2912..e2befaf 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -67,6 +67,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
+	bool		subdisableonerr;	/* True if a worker error should cause the
+									 * subscription to be disabled */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -103,6 +106,9 @@ typedef struct Subscription
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
+	bool		disableonerr;	/* Indicates if the subscription should be
+								 * automatically disabled if a worker error
+								 * occurs */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
@@ -111,6 +117,7 @@ typedef struct Subscription
 
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
+extern void DisableSubscription(Oid subid);
 extern Oid	get_subscription_oid(const char *subname, bool missing_ok);
 extern char *get_subscription_name(Oid subid, bool missing_ok);
 
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 80aae83..ad8003f 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -94,10 +94,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | off                | dbname=regress_doesnotexist2
+                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -129,10 +129,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                            List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | local              | dbname=regress_doesnotexist2
+                                                                                     List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -165,19 +165,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +188,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication already exists
@@ -215,10 +215,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                             List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 -- fail - publication used more then once
@@ -233,10 +233,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +270,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -282,10 +282,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +294,33 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | off                | dbname=regress_doesnotexist
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+ERROR:  disable_on_error requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+\dRs+
+                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index bd0f4af..a7c15b1 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -228,6 +228,21 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl
new file mode 100644
index 0000000..80d4e54
--- /dev/null
+++ b/src/test/subscription/t/029_disable_on_error.pl
@@ -0,0 +1,100 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Create the two nodes for the publisher and the subscriber.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create identical table on both nodes.
+$node_publisher->safe_psql('postgres', qq(CREATE TABLE tbl (i INT)));
+$node_subscriber->safe_psql('postgres', qq(CREATE TABLE tbl (i INT)));
+
+# Insert duplicate values on the publisher.
+$node_publisher->safe_psql('postgres',
+	qq(INSERT INTO tbl (i) VALUES (1), (1), (1)));
+
+# Create an additional unique index on the subscriber.
+$node_subscriber->safe_psql('postgres',
+	qq(CREATE UNIQUE INDEX tbl_unique ON tbl (i)));
+
+# Create one pair of publication and subscription.
+# This tests the uniqueness violation will cause the subscription
+# to fail during initial synchronization and make it disabled.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	qq(CREATE PUBLICATION pub FOR TABLE tbl));
+$node_subscriber->safe_psql(
+	'postgres', qq(
+CREATE SUBSCRIPTION sub
+CONNECTION '$publisher_connstr'
+PUBLICATION pub WITH (disable_on_error = true)));
+
+# Initial synchronization failure causes the subscription
+# to be disabled.
+$node_subscriber->poll_query_until(
+	'postgres', qq(
+SELECT subenabled = false FROM pg_catalog.pg_subscription
+WHERE subname = 'sub')
+) or die "Timed out while waiting for subscriber to be disabled";
+
+# Drop the unique index on the sub which caused the subscription
+# to be disabled.
+$node_subscriber->safe_psql('postgres', qq(DROP INDEX tbl_unique));
+
+# Re-enable the subscription "sub".
+$node_subscriber->safe_psql('postgres', qq(ALTER SUBSCRIPTION sub ENABLE));
+
+# Wait for the data to replicate.
+$node_subscriber->poll_query_until(
+	'postgres', qq(
+SELECT COUNT(1) = 1 FROM pg_catalog.pg_subscription_rel sr
+WHERE sr.srsubstate IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass));
+
+# Confirm that we have finished the table sync.
+my $result = $node_subscriber->safe_psql('postgres',
+	qq(SELECT MAX(i), COUNT(*) FROM tbl));
+is($result, "1|3", "subscription sub replicated data");
+
+# Delete the data from the subscriber and recreate the unique index.
+$node_subscriber->safe_psql(
+	'postgres', q(
+DELETE FROM tbl;
+CREATE UNIQUE INDEX tbl_unique ON tbl (i)));
+
+# Add more non-unique data to the publisher.
+$node_publisher->safe_psql('postgres',
+	qq(INSERT INTO tbl (i) VALUES (3), (3), (3)));
+
+# Apply failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until(
+	'postgres', qq(
+SELECT subenabled = false FROM pg_catalog.pg_subscription
+WHERE subname = 'sub')
+) or die "Timed out while waiting for subscription sub to be disabled";
+
+# Drop the unique index on the sub and re-enabled the subscription.
+# Then, confirm that the previously failing insert was applied OK.
+$node_subscriber->safe_psql('postgres', qq(DROP INDEX tbl_unique));
+$node_subscriber->safe_psql('postgres', qq(ALTER SUBSCRIPTION sub ENABLE));
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+	qq(SELECT COUNT(*) FROM tbl WHERE i = 3));
+is($result, qq(3), 'check the result of apply');
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();
-- 
1.8.3.1

