From f7d28477ea1fe41f1ed52014c59f09e10e5be2ae Mon Sep 17 00:00:00 2001
From: Melih Mutlu <m.melihmutlu@gmail.com>
Date: Thu, 6 Oct 2022 15:58:55 +0300
Subject: [PATCH] Do not apply for tables which not exist on catalog

If a new table is missing on subscriber and ALTER SUBSCRIPTION ...
REFRESH PUBLICATION has not been run yet after the tables was created
on publisher, show WARNING and HINT messages on subscriber logs.

Before this change, operations on such a recently added table were
causing constant logical replication worker failures until the table
was created on subscriber. There is no need to fail in this case.
Regardless of whether the table exists on subscriber, newly added tables
will not be replicated to subscriber until ALTER SUBSCRIPTION ... REFRESH
PUBLICATION is called.

The patch relies on the information from pg_subscription_rel. Apply
worker only applies changes for tables exist on pg_subscription_rel. New
tables wouldn't exist in the catalog until the next REFRESH PUBLICATION.
---
 src/backend/catalog/pg_subscription.c      | 34 ++++++++++++++++++++++
 src/backend/replication/logical/relation.c | 12 ++++++++
 src/backend/replication/logical/worker.c   | 20 ++++++++-----
 src/include/catalog/pg_subscription_rel.h  |  1 +
 4 files changed, 59 insertions(+), 8 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a506fc3ec8..01ae95929c 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -543,3 +543,37 @@ GetSubscriptionRelations(Oid subid, bool not_ready)
 
 	return res;
 }
+
+/*
+ * Check whether the subscription has the given relation.
+ *
+ * Returns true if the relation exists in the subscription, false otherwise.
+ */
+bool
+CheckSubscriptionRelation(Oid subid, Oid relid)
+{
+	HeapTuple	tup;
+	Relation	rel;
+	bool		result = false;
+
+	/*
+	 * This is to avoid the race condition with AlterSubscription which tries
+	 * to remove this relstate.
+	 */
+	rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+	/* Try finding the mapping. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
+							  ObjectIdGetDatum(relid),
+							  ObjectIdGetDatum(subid));
+
+	if (HeapTupleIsValid(tup))
+	{
+		result = true;
+	}
+
+	/* Cleanup */
+	table_close(rel, AccessShareLock);
+
+	return result;
+}
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e989047681..994421a2d7 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -384,6 +384,18 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
 											  remoterel->relname, -1),
 								 lockmode, true);
+
+		/* Check if relation really exists for the subscription. */
+		if (!CheckSubscriptionRelation(MySubscription->oid, relid))
+		{
+			ereport(WARNING,
+					errmsg("Subscription \"%s\" does not have a relation named \"%s.%s\".",
+						   MySubscription->name, remoterel->nspname, remoterel->relname),
+					errhint("Try to run \"ALTER SUBSCRIPTION %s REFRESH PUBLICATION\".",
+							MySubscription->name));
+			return NULL;
+		}
+
 		if (!OidIsValid(relid))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 96772e4d73..e0cde9df37 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1691,13 +1691,14 @@ apply_handle_insert(StringInfo s)
 
 	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
-	if (!should_apply_changes_for_rel(rel))
+	if (!rel || !should_apply_changes_for_rel(rel))
 	{
 		/*
 		 * The relation can't become interesting in the middle of the
 		 * transaction so it's safe to unlock it.
 		 */
-		logicalrep_rel_close(rel, RowExclusiveLock);
+		if (rel)
+			logicalrep_rel_close(rel, RowExclusiveLock);
 		end_replication_step();
 		return;
 	}
@@ -1832,13 +1833,14 @@ apply_handle_update(StringInfo s)
 	relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
 								   &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
-	if (!should_apply_changes_for_rel(rel))
+	if (!rel || !should_apply_changes_for_rel(rel))
 	{
 		/*
 		 * The relation can't become interesting in the middle of the
 		 * transaction so it's safe to unlock it.
 		 */
-		logicalrep_rel_close(rel, RowExclusiveLock);
+		if (rel)
+			logicalrep_rel_close(rel, RowExclusiveLock);
 		end_replication_step();
 		return;
 	}
@@ -2001,13 +2003,14 @@ apply_handle_delete(StringInfo s)
 
 	relid = logicalrep_read_delete(s, &oldtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
-	if (!should_apply_changes_for_rel(rel))
+	if (!rel || !should_apply_changes_for_rel(rel))
 	{
 		/*
 		 * The relation can't become interesting in the middle of the
 		 * transaction so it's safe to unlock it.
 		 */
-		logicalrep_rel_close(rel, RowExclusiveLock);
+		if (rel)
+			logicalrep_rel_close(rel, RowExclusiveLock);
 		end_replication_step();
 		return;
 	}
@@ -2421,13 +2424,14 @@ apply_handle_truncate(StringInfo s)
 		LogicalRepRelMapEntry *rel;
 
 		rel = logicalrep_rel_open(relid, lockmode);
-		if (!should_apply_changes_for_rel(rel))
+		if (!rel || !should_apply_changes_for_rel(rel))
 		{
 			/*
 			 * The relation can't become interesting in the middle of the
 			 * transaction so it's safe to unlock it.
 			 */
-			logicalrep_rel_close(rel, lockmode);
+			if (rel)
+				logicalrep_rel_close(rel, lockmode);
 			continue;
 		}
 
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 8e88de7b2b..33cc75b656 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -89,5 +89,6 @@ extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern bool HasSubscriptionRelations(Oid subid);
 extern List *GetSubscriptionRelations(Oid subid, bool not_ready);
+extern bool CheckSubscriptionRelation(Oid subid, Oid relid);
 
 #endif							/* PG_SUBSCRIPTION_REL_H */
-- 
2.25.1

