From 813a27ed2a96b0096d64950ff749513302f67148 Mon Sep 17 00:00:00 2001
From: amit <amitlangote09@gmail.com>
Date: Thu, 7 Nov 2019 18:19:33 +0900
Subject: [PATCH 1/2] Some refactoring of publication and subscription code

---
 src/backend/catalog/pg_publication.c        |  5 +-
 src/backend/commands/subscriptioncmds.c     | 79 ++++++++++++++++++++---------
 src/backend/commands/tablecmds.c            |  2 +-
 src/backend/replication/pgoutput/pgoutput.c | 11 ++--
 src/backend/utils/cache/relcache.c          |  2 +-
 src/include/catalog/pg_publication.h        |  2 +-
 6 files changed, 67 insertions(+), 34 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index fd5da7d5f7..80b98e2c3c 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -224,11 +224,12 @@ publication_add_relation(Oid pubid, Relation targetrel,
 
 
 /*
- * Gets list of publication oids for a relation oid.
+ * Finds all publications associated with the relation.
  */
 List *
-GetRelationPublications(Oid relid)
+GetRelationPublications(Relation rel)
 {
+	Oid			relid = RelationGetRelid(rel);
 	List	   *result = NIL;
 	CatCList   *pubrellist;
 	int			i;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 1419195766..11c0f305ff 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -52,7 +52,19 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 
+/*
+ * Structure for fetch_table_list() to store the information about
+ * a given published table.
+ */
+typedef struct PublicationTable
+{
+	char	   *nspname;
+	char	   *relname;
+	char		relkind;
+} PublicationTable;
+
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static Oid ValidateSubscriptionRel(PublicationTable *pt);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -464,15 +476,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 			tables = fetch_table_list(wrconn, publications);
 			foreach(lc, tables)
 			{
-				RangeVar   *rv = (RangeVar *) lfirst(lc);
+				PublicationTable *pt = lfirst(lc);
 				Oid			relid;
 
-				relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
-				/* Check for supported relkind. */
-				CheckSubscriptionRelkind(get_rel_relkind(relid),
-										 rv->schemaname, rv->relname);
-
+				relid = ValidateSubscriptionRel(pt);
 				AddSubscriptionRelState(subid, relid, table_state,
 										InvalidXLogRecPtr);
 			}
@@ -573,14 +580,11 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 
 	foreach(lc, pubrel_names)
 	{
-		RangeVar   *rv = (RangeVar *) lfirst(lc);
+		PublicationTable *pt = lfirst(lc);
 		Oid			relid;
 
-		relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
-		/* Check for supported relkind. */
-		CheckSubscriptionRelkind(get_rel_relkind(relid),
-								 rv->schemaname, rv->relname);
+		/* Check that there's an appropriate relation present locally. */
+		relid = ValidateSubscriptionRel(pt);
 
 		pubrel_local_oids[off++] = relid;
 
@@ -592,7 +596,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 									InvalidXLogRecPtr);
 			ereport(DEBUG1,
 					(errmsg("table \"%s.%s\" added to subscription \"%s\"",
-							rv->schemaname, rv->relname, sub->name)));
+							pt->nspname, pt->relname, sub->name)));
 		}
 	}
 
@@ -1137,7 +1141,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, CHAROID};
 	ListCell   *lc;
 	bool		first;
 	List	   *tablelist = NIL;
@@ -1145,9 +1149,12 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	Assert(list_length(publications) > 0);
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
+	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, c.relkind\n"
 						   "  FROM pg_catalog.pg_publication_tables t\n"
+						   "  JOIN pg_class c ON t.schemaname = c.relnamespace::regnamespace::name\n"
+						   "  AND t.tablename = c.relname\n"
 						   " WHERE t.pubname IN (");
+
 	first = true;
 	foreach(lc, publications)
 	{
@@ -1162,7 +1169,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	}
 	appendStringInfoChar(&cmd, ')');
 
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, tableRow);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
@@ -1174,18 +1181,17 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		char	   *nspname;
-		char	   *relname;
+		PublicationTable *pt = palloc0(sizeof(PublicationTable));
 		bool		isnull;
-		RangeVar   *rv;
 
-		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		pt->nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		pt->relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+		pt->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 		Assert(!isnull);
 
-		rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
-		tablelist = lappend(tablelist, rv);
+		tablelist = lappend(tablelist, pt);
 
 		ExecClearTuple(slot);
 	}
@@ -1195,3 +1201,28 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 
 	return tablelist;
 }
+
+/*
+ * Looks up a local relation matching the given publication table and
+ * checks that it's appropriate to use as replication target, erroring
+ * out if not.
+ *
+ * Oid of the successfully validated local relation is returned.
+ */
+static Oid
+ValidateSubscriptionRel(PublicationTable *pt)
+{
+	Oid		relid;
+	RangeVar *rv;
+	char	local_relkind;
+
+	rv = makeRangeVar(pstrdup(pt->nspname), pstrdup(pt->relname), -1);
+	relid = RangeVarGetRelid(rv, AccessShareLock, false);
+	Assert(OidIsValid(relid));
+
+	/* Check for supported relkind. */
+	local_relkind = get_rel_relkind(relid);
+	CheckSubscriptionRelkind(local_relkind, rv->schemaname, rv->relname);
+
+	return relid;
+}
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 5597be6e3d..270e76ad73 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14157,7 +14157,7 @@ ATPrepChangePersistence(Relation rel, bool toLogged)
 	 * UNLOGGED as UNLOGGED tables can't be published.
 	 */
 	if (!toLogged &&
-		list_length(GetRelationPublications(RelationGetRelid(rel))) > 0)
+		list_length(GetRelationPublications(rel)) > 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot change table \"%s\" to unlogged because it is part of a publication",
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c08757fca..20856fa33c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -66,7 +66,7 @@ typedef struct RelationSyncEntry
 static HTAB *RelationSyncCache = NULL;
 
 static void init_rel_sync_cache(MemoryContext decoding_context);
-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation rel);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
@@ -314,7 +314,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (!is_publishable_relation(relation))
 		return;
 
-	relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
+	relentry = get_rel_sync_entry(data, relation);
 
 	/* First check the table filter */
 	switch (change->action)
@@ -404,7 +404,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		if (!is_publishable_relation(relation))
 			continue;
 
-		relentry = get_rel_sync_entry(data, relid);
+		relentry = get_rel_sync_entry(data, relation);
 
 		if (!relentry->pubactions.pubtruncate)
 			continue;
@@ -529,8 +529,9 @@ init_rel_sync_cache(MemoryContext cachectx)
  * Find or create entry in the relation schema cache.
  */
 static RelationSyncEntry *
-get_rel_sync_entry(PGOutputData *data, Oid relid)
+get_rel_sync_entry(PGOutputData *data, Relation rel)
 {
+	Oid			relid = RelationGetRelid(rel);
 	RelationSyncEntry *entry;
 	bool		found;
 	MemoryContext oldctx;
@@ -548,7 +549,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	/* Not found means schema wasn't sent */
 	if (!found || !entry->replicate_valid)
 	{
-		List	   *pubids = GetRelationPublications(relid);
+		List	   *pubids = GetRelationPublications(rel);
 		ListCell   *lc;
 
 		/* Reload publications if needed before use. */
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 585dcee5db..161fe95fe6 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -5105,7 +5105,7 @@ GetRelationPublicationActions(Relation relation)
 					  sizeof(PublicationActions));
 
 	/* Fetch the publication membership info. */
-	puboids = GetRelationPublications(RelationGetRelid(relation));
+	puboids = GetRelationPublications(relation);
 	puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
 
 	foreach(lc, puboids)
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 20a2f0ac1b..2981f61db1 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -80,7 +80,7 @@ typedef struct Publication
 
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
-extern List *GetRelationPublications(Oid relid);
+extern List *GetRelationPublications(Relation rel);
 extern List *GetPublicationRelations(Oid pubid);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(void);
-- 
2.11.0

