From 31a80bc3ebc7629390bedc0d1ef53c9f03677e1d Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 4 Mar 2022 15:45:40 +0900
Subject: [PATCH] fixup! Optionally disable subscriptions on error

---
 src/backend/catalog/pg_subscription.c    |  39 +++++++
 src/backend/replication/logical/worker.c | 142 +++++++++--------------
 src/include/catalog/pg_subscription.h    |   1 +
 3 files changed, 95 insertions(+), 87 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d2beebcc9d..1fc95ad867 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -157,6 +157,45 @@ FreeSubscription(Subscription *sub)
 	pfree(sub);
 }
 
+/*
+ * Disable the given subscription.
+ */
+void
+DisableSubscription(Oid subid)
+{
+	Relation	rel;
+	bool		nulls[Natts_pg_subscription];
+	bool		replaces[Natts_pg_subscription];
+	Datum		values[Natts_pg_subscription];
+	HeapTuple	tup;
+
+	/* Look up our subscription in the catalog */
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+	LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
+
+	/* Form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	/* Set the subscription to disabled. */
+	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
+	replaces[Anum_pg_subscription_subenabled - 1] = true;
+
+	/* Update the catalog */
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+	heap_freetuple(tup);
+
+	table_close(rel, NoLock);
+}
+
 /*
  * get_subscription_oid - given a subscription name, look up the OID
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 71b007719c..eba75bb48c 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -136,7 +136,6 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
-#include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
@@ -304,6 +303,8 @@ static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void maybe_reread_subscription(void);
 
+static void worker_post_error_processing(void);
+
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
@@ -2802,57 +2803,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 }
 
-/*
- * Disable the current subscription.
- */
-static void
-DisableSubscriptionOnError(void)
-{
-	Relation	rel;
-	bool		nulls[Natts_pg_subscription];
-	bool		replaces[Natts_pg_subscription];
-	Datum		values[Natts_pg_subscription];
-	HeapTuple	tup;
-
-	/* Disable the subscription in a fresh transaction */
-	StartTransactionCommand();
-
-	/* Look up our subscription in the catalog */
-	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
-	tup = SearchSysCacheCopy1(SUBSCRIPTIONOID,
-							  ObjectIdGetDatum(MySubscription->oid));
-
-	if (!HeapTupleIsValid(tup))
-		elog(ERROR, "subscription \"%s\" does not exist",
-			 MySubscription->name);
-
-	LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, AccessExclusiveLock);
-
-	/* Form a new tuple. */
-	memset(values, 0, sizeof(values));
-	memset(nulls, false, sizeof(nulls));
-	memset(replaces, false, sizeof(replaces));
-
-	/* Set the subscription to disabled. */
-	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
-	replaces[Anum_pg_subscription_subenabled - 1] = true;
-
-	/* Update the catalog */
-	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
-							replaces);
-	CatalogTupleUpdate(rel, &tup->t_self, tup);
-	heap_freetuple(tup);
-
-	table_close(rel, RowExclusiveLock);
-
-	CommitTransactionCommand();
-
-	/* Notify the subscription has been disabled */
-	ereport(LOG,
-			errmsg("logical replication subscription \"%s\" has been be disabled due to an error",
-				   MySubscription->name));
-}
-
 /*
  * Send a Standby Status Update message to server.
  *
@@ -3443,25 +3393,16 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
 	PG_CATCH();
 	{
 		/*
-		 * First, ensure that we log the error message so that it won't be
-		 * lost if some other internal error occurs in the following code.
-		 * Then, abort the current transaction and send the stats message of
-		 * the table synchronization failure in an idle state.
+		 * Abort the current transaction with the error, do post-error
+		 * processing.
 		 */
-		HOLD_INTERRUPTS();
-		EmitErrorReport();
-		FlushErrorState();
-		AbortOutOfAnyTransaction();
-		RESUME_INTERRUPTS();
-		pgstat_report_subscription_error(MySubscription->oid, false);
-
-		if (MySubscription->disableonerr)
-		{
-			DisableSubscriptionOnError();
-			proc_exit(0);
-		}
+		worker_post_error_processing();
 
-		PG_RE_THROW();
+		/*
+		 * Similar to apply worker cases, the worker exits without re-throwing
+		 * the error since it's already reported.
+		 */
+		proc_exit(1);
 	}
 	PG_END_TRY();
 
@@ -3484,26 +3425,19 @@ start_apply(XLogRecPtr origin_startpos)
 	PG_CATCH();
 	{
 		/*
-		 * First, ensure that we log the error message so that it won't be
-		 * lost if some other internal error occurs in the following code.
-		 * Then, abort the current transaction and send the stats message of
-		 * the apply failure in an idle state.
+		 * Abort the current transaction with the error, do post-error
+		 * processing and exits.
 		 */
-		HOLD_INTERRUPTS();
-		EmitErrorReport();
-		FlushErrorState();
-		AbortOutOfAnyTransaction();
-		RESUME_INTERRUPTS();
-		pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
-
-		if (MySubscription->disableonerr)
-		{
-			DisableSubscriptionOnError();
-			proc_exit(0);
-		}
+		worker_post_error_processing();
 
-		PG_RE_THROW();
-	}
+		/*
+		 * The worker exits without re-throwing the error since it's already
+		 * reported. So we don't get in further PG_CATCH() blocks, but it's
+		 * okay since this PG_TRY()/PG_CATCH() block is the last one in the
+		 * logical replication workers.
+		 */
+		proc_exit(1);
+}
 	PG_END_TRY();
 }
 
@@ -3726,6 +3660,40 @@ ApplyWorkerMain(Datum main_arg)
 	proc_exit(0);
 }
 
+/*
+ * Abort and cleanup the current transaction, then do post-error processing.
+ * This function must be called in a PG_CATCH() block.
+ */
+static void
+worker_post_error_processing(void)
+{
+	/* Emit the error message, and recover from the error state to an idle state */
+	HOLD_INTERRUPTS();
+
+	EmitErrorReport();
+	FlushErrorState();
+	AbortOutOfAnyTransaction();
+
+	RESUME_INTERRUPTS();
+
+	/* Report the worker failed during table synchronization */
+	pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+									 !am_tablesync_worker());
+
+	/* Disable the subscription if required */
+	if (MySubscription->disableonerr)
+	{
+		StartTransactionCommand();
+		DisableSubscription(MySubscription->oid);
+		CommitTransactionCommand();
+
+		/* Notify the subscription has been disabled */
+		ereport(LOG,
+				errmsg("logical replication subscription \"%s\" has been be disabled due to an error",
+					   MySubscription->name));
+	}
+}
+
 /*
  * Is current process a logical replication worker?
  */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 90b1eb53aa..e2befaf351 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -117,6 +117,7 @@ typedef struct Subscription
 
 extern Subscription *GetSubscription(Oid subid, bool missing_ok);
 extern void FreeSubscription(Subscription *sub);
+extern void DisableSubscription(Oid subid);
 extern Oid	get_subscription_oid(const char *subname, bool missing_ok);
 extern char *get_subscription_name(Oid subid, bool missing_ok);
 
-- 
2.24.3 (Apple Git-128)

