From d5524bb4287dd6c9d1e69404561dffc2b8c7c3e7 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Fri, 1 Oct 2021 17:20:29 +0530
Subject: [PATCH v1] Changes by Amit.

---
 src/backend/catalog/pg_publication.c        | 146 ++++++++++----------
 src/backend/commands/publicationcmds.c      |  34 +++--
 src/backend/nodes/copyfuncs.c               |  26 ++--
 src/backend/nodes/equalfuncs.c              |  22 +--
 src/backend/parser/gram.y                   |  29 ++--
 src/backend/replication/pgoutput/pgoutput.c |   2 +-
 src/backend/utils/cache/syscache.c          |   2 +-
 src/include/catalog/pg_publication.h        |  10 +-
 src/include/nodes/nodes.h                   |   2 +-
 src/include/nodes/parsenodes.h              |   4 +-
 10 files changed, 147 insertions(+), 130 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 4e8ccdabc6..644eb17680 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -303,8 +303,6 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
 	ObjectAddress myself,
 				referenced;
 
-	check_publication_add_schema(schemaid);
-
 	rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
 
 	/*
@@ -327,6 +325,8 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
 						get_namespace_name(schemaid), pub->name)));
 	}
 
+	check_publication_add_schema(schemaid);
+
 	/* Form a tuple */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
@@ -358,7 +358,10 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
 	/* Close the table */
 	table_close(rel, RowExclusiveLock);
 
-	/* Invalidate relcache so that publication info is rebuilt. */
+	/*
+	 * Invalidate relcache so that publication info is rebuilt. See
+	 * publication_add_relation for why we need to consider all the partitions.
+	 */
 	schemaRels = GetSchemaPublicationRelations(schemaid, PUBLICATION_PART_ALL);
 	InvalidatePublicationRels(schemaRels);
 
@@ -431,73 +434,6 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 	return result;
 }
 
-/*
- * Gets the list of schema oids for a publication.
- *
- * This should only be used FOR ALL TABLES IN SCHEMA publications.
- */
-List *
-GetPublicationSchemas(Oid pubid)
-{
-	List	   *result = NIL;
-	Relation	pubschsrel;
-	ScanKeyData scankey;
-	SysScanDesc scan;
-	HeapTuple	tup;
-
-	/* Find all publications associated with the schema */
-	pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
-
-	ScanKeyInit(&scankey,
-				Anum_pg_publication_namespace_pnpubid,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(pubid));
-
-	scan = systable_beginscan(pubschsrel,
-							  PublicationNamespacePnnspidPnpubidIndexId,
-							  true, NULL, 1, &scankey);
-	while (HeapTupleIsValid(tup = systable_getnext(scan)))
-	{
-		Form_pg_publication_namespace pubsch;
-
-		pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
-
-		result = lappend_oid(result, pubsch->pnnspid);
-	}
-
-	systable_endscan(scan);
-	table_close(pubschsrel, AccessShareLock);
-
-	return result;
-}
-
-
-/*
- * Gets the list of publication oids associated with a specified schema.
- */
-List *
-GetSchemaPublications(Oid schemaid)
-{
-	List	   *result = NIL;
-	CatCList   *pubschlist;
-	int			i;
-
-	/* Find all publications associated with the schema */
-	pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
-									 ObjectIdGetDatum(schemaid));
-	for (i = 0; i < pubschlist->n_members; i++)
-	{
-		HeapTuple	tup = &pubschlist->members[i]->tuple;
-		Oid			pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
-
-		result = lappend_oid(result, pubid);
-	}
-
-	ReleaseSysCacheList(pubschlist);
-
-	return result;
-}
-
 /*
  * Gets list of publication oids for publications marked as FOR ALL TABLES.
  */
@@ -536,7 +472,7 @@ GetAllTablesPublications(void)
 }
 
 /*
- * Gets the list of relations published.
+ * Gets the list of relation published by FOR ALL TABLES publication(s).
  *
  * If the publication publishes partition changes via their respective root
  * partitioned tables, we must exclude partitions in favor of including the
@@ -598,6 +534,72 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 	return result;
 }
 
+/*
+ * Gets the list of schema oids for a publication.
+ *
+ * This should only be used FOR ALL TABLES IN SCHEMA publications.
+ */
+List*
+GetPublicationSchemas(Oid pubid)
+{
+	List* result = NIL;
+	Relation	pubschsrel;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	/* Find all publications associated with the schema */
+	pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_publication_namespace_pnpubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(pubid));
+
+	scan = systable_beginscan(pubschsrel,
+							  PublicationNamespacePnnspidPnpubidIndexId,
+							  true, NULL, 1, &scankey);
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_publication_namespace pubsch;
+
+		pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
+
+		result = lappend_oid(result, pubsch->pnnspid);
+	}
+
+	systable_endscan(scan);
+	table_close(pubschsrel, AccessShareLock);
+
+	return result;
+}
+
+/*
+ * Gets the list of publication oids associated with a specified schema.
+ */
+List*
+GetSchemaPublications(Oid schemaid)
+{
+	List* result = NIL;
+	CatCList* pubschlist;
+	int			i;
+
+	/* Find all publications associated with the schema */
+	pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
+									 ObjectIdGetDatum(schemaid));
+	for (i = 0; i < pubschlist->n_members; i++)
+	{
+		HeapTuple	tup = &pubschlist->members[i]->tuple;
+		Oid			pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
+
+		result = lappend_oid(result, pubid);
+	}
+
+	ReleaseSysCacheList(pubschlist);
+
+	return result;
+}
+
 /*
  * Get the list of publishable relation oids for a specified schema.
  */
@@ -820,7 +822,9 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * need those.
 		 */
 		if (publication->alltables)
+		{
 			tables = GetAllTablesPublicationRelations(publication->pubviaroot);
+		}
 		else
 		{
 			List	   *relids,
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 2f4d0b1544..73d46612c4 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -176,7 +176,6 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
 				*schemas = list_append_unique_oid(*schemas, schemaid);
 				break;
 			case PUBLICATIONOBJ_CURRSCHEMA:
-
 				search_path = fetch_search_path(false);
 				if (search_path == NIL) /* nothing valid in search_path? */
 					ereport(ERROR,
@@ -190,7 +189,9 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
 				*schemas = list_append_unique_oid(*schemas, schemaid);
 				break;
 			default:
-				Assert(0);
+				/* shouldn't happen */
+				elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
+				break;
 		}
 	}
 }
@@ -320,6 +321,7 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	/* Make the changes visible. */
 	CommandCounterIncrement();
 
+	/* Associate objects with the publication. */
 	if (stmt->for_all_tables)
 	{
 		/* Invalidate relcache so that publication info is rebuilt. */
@@ -350,9 +352,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 						errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
 
 			/*
-			 * The schemas specified in the schema list are locked in
-			 * AccessShareLock mode in order to prevent concurrent schema
-			 * deletion.
+			 * Schema lock is held until the publication is created to prevent
+			 * concurrent schema deletion.
 			 */
 			LockSchemaList(schemaidlist);
 			PublicationAddSchemas(puboid, schemaidlist, true, NULL);
@@ -496,6 +497,10 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 	Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 	Oid			pubid = pubform->oid;
 
+	/*
+	 * It is quite possible that for the SET case user has not specified any
+	 * tables in which case we need to remove all the existing tables.
+	 */
 	if (!tables && stmt->action != DEFELEM_SET)
 		return;
 
@@ -571,7 +576,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 /*
  * Alter the publication schemas.
  *
- * Add/Remove/Set all tables from schemas to/from publication.
+ * Add or remove schemas to/from publication.
  */
 static void
 AlterPublicationSchemas(AlterPublicationStmt *stmt,
@@ -583,8 +588,8 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 		return;
 
 	/*
-	 * The schemas specified in the schema list are locked in AccessShareLock
-	 * mode in order to prevent concurrent schema deletion.
+	 * Schema lock is held until the publication is altered to prevent
+	 * concurrent schema deletion.
 	 */
 	LockSchemaList(schemaidlist);
 	if (stmt->action == DEFELEM_ADD)
@@ -612,9 +617,8 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 		delschemas = list_difference_oid(oldschemaids, schemaidlist);
 
 		/*
-		 * The schemas specified in the schema list are locked in
-		 * AccessShareLock mode in order to prevent concurrent schema
-		 * deletion.
+		 * Schema lock is held until the publication is altered to prevent
+		 * concurrent schema deletion.
 		 */
 		LockSchemaList(delschemas);
 
@@ -820,7 +824,10 @@ RemovePublicationSchemaById(Oid psoid)
 
 	pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
 
-	/* Invalidate relcache so that publication info is rebuilt. */
+	/*
+	 * Invalidate relcache so that publication info is rebuilt. See
+	 * RemovePublicationRelById for why we need to consider all the partitions.
+	 */
 	schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
 											   PUBLICATION_PART_ALL);
 	InvalidatePublicationRels(schemaRels);
@@ -834,8 +841,7 @@ RemovePublicationSchemaById(Oid psoid)
 
 /*
  * The schemas specified in the schema list are locked in AccessShareLock mode
- * in order to prevent concurrent schema deletion. No need to unlock the
- * schemas, the locks will be released at the end of the command.
+ * in order to prevent concurrent schema deletion.
  */
 static void
 LockSchemaList(List *schemalist)
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 553cd834e6..dfa5d8d705 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4810,6 +4810,19 @@ _copyPartitionCmd(const PartitionCmd *from)
 	return newnode;
 }
 
+static PublicationObjSpec*
+_copyPublicationObject(const PublicationObjSpec *from)
+{
+	PublicationObjSpec *newnode = makeNode(PublicationObjSpec);
+
+	COPY_SCALAR_FIELD(pubobjtype);
+	COPY_STRING_FIELD(name);
+	COPY_NODE_FIELD(rangevar);
+	COPY_LOCATION_FIELD(location);
+
+	return newnode;
+}
+
 static CreatePublicationStmt *
 _copyCreatePublicationStmt(const CreatePublicationStmt *from)
 {
@@ -4958,19 +4971,6 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from)
 	return newnode;
 }
 
-static PublicationObjSpec *
-_copyPublicationObject(const PublicationObjSpec *from)
-{
-	PublicationObjSpec *newnode = makeNode(PublicationObjSpec);
-
-	COPY_SCALAR_FIELD(pubobjtype);
-	COPY_STRING_FIELD(name);
-	COPY_NODE_FIELD(rangevar);
-	COPY_LOCATION_FIELD(location);
-
-	return newnode;
-}
-
 /*
  * copyObjectImpl -- implementation of copyObject(); see nodes/nodes.h
  *
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 054b2d94e5..0532bb20ee 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -3038,6 +3038,18 @@ _equalPartitionCmd(const PartitionCmd *a, const PartitionCmd *b)
 	return true;
 }
 
+static bool
+_equalPublicationObject(const PublicationObjSpec* a,
+						const PublicationObjSpec* b)
+{
+	COMPARE_SCALAR_FIELD(pubobjtype);
+	COMPARE_STRING_FIELD(name);
+	COMPARE_NODE_FIELD(rangevar);
+	COMPARE_LOCATION_FIELD(location);
+
+	return true;
+}
+
 /*
  * Stuff from pg_list.h
  */
@@ -3133,16 +3145,6 @@ _equalBitString(const BitString *a, const BitString *b)
 	return true;
 }
 
-static bool
-_equalPublicationObject(const PublicationObjSpec *a,
-						const PublicationObjSpec *b)
-{
-	COMPARE_STRING_FIELD(name);
-	COMPARE_NODE_FIELD(rangevar);
-
-	return true;
-}
-
 /*
  * equal
  *	  returns whether two nodes are equal
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 4f11ef5cbc..e7b33de27f 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -559,8 +559,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <str>		createdb_opt_name plassign_target
 %type <node>	var_value zone_value
 %type <rolespec> auth_ident RoleSpec opt_granted_by
-
 %type <publicationobjectspec> PublicationObjSpec
+
 %type <keyword> unreserved_keyword type_func_name_keyword
 %type <keyword> col_name_keyword reserved_keyword
 %type <keyword> bare_label_keyword
@@ -17108,9 +17108,11 @@ static RangeVar *
 makeRangeVarFromQualifiedName(char *name, List *namelist, int location,
 							  core_yyscan_t yyscanner)
 {
-	RangeVar *r = makeRangeVar(NULL, NULL, location);
+	RangeVar *r;
 
 	check_qualified_name(namelist, yyscanner);
+	r = makeRangeVar(NULL, NULL, location);
+
 	switch (list_length(namelist))
 	{
 		case 1:
@@ -17315,10 +17317,12 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 
 		if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE)
 		{
-			/* relation name was specified as CURRENT_SCHEMA */
+			/* relation name or rangevar must be set for this type of object */
 			if (!pubobj->name && !pubobj->rangevar)
-				pubobj->rangevar = makeRangeVar(NULL, "CURRENT_SCHEMA",
-												pubobj->location);
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("invalid table name at or near"),
+						parser_errposition(pubobj->location));
 			else if (pubobj->name)
 			{
 				/* convert it to rangevar */
@@ -17327,14 +17331,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 				pubobj->name = NULL;
 			}
 		}
-		else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA)
+		else if (pubobj->pubobjtype == PUBLICATIONOBJ_REL_IN_SCHEMA ||
+				 pubobj->pubobjtype == PUBLICATIONOBJ_CURRSCHEMA)
 		{
 			/*
-			 * Schema name was specified as CURRENT_SCHEMA, set pubobjtype as
-			 * PUBLICATIONOBJ_CURRSCHEMA to indicate the schema name should be
-			 * set with the first schema in search_path.
+			 * We can distinguish between the different type of schema
+			 * objects based on whether name and rangevar is set.
 			 */
-			if (!pubobj->name && !pubobj->rangevar)
+			if (pubobj->name)
+				pubobj->pubobjtype = PUBLICATIONOBJ_REL_IN_SCHEMA;
+			else if (!pubobj->name && !pubobj->rangevar)
 				pubobj->pubobjtype = PUBLICATIONOBJ_CURRSCHEMA;
 			else if (!pubobj->name)
 				ereport(ERROR,
@@ -17343,8 +17349,7 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 						parser_errposition(pubobj->location));
 		}
 
-		prevobjtype = (pubobj->pubobjtype == PUBLICATIONOBJ_CURRSCHEMA) ?
-					  PUBLICATIONOBJ_REL_IN_SCHEMA : pubobj->pubobjtype;
+		prevobjtype = pubobj->pubobjtype;
 	}
 }
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e902ed73da..6f6a203dea 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1358,7 +1358,7 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 }
 
 /*
- * Publication relation map syscache invalidation callback
+ * Publication relation/schema map syscache invalidation callback
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index d6c656edc8..56870b46e4 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -618,7 +618,7 @@ static const struct cachedesc cacheinfo[] = {
 		},
 		8
 	},
-	{PublicationNamespaceRelationId,	/* PUBLICATIONNAMESPCE */
+	{PublicationNamespaceRelationId,	/* PUBLICATIONNAMESPACE */
 		PublicationNamespaceObjectIndexId,
 		1,
 		{
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 5911824d09..a4c894ec9d 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -106,17 +106,17 @@ typedef enum PublicationPartOpt
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
-extern List *GetPubPartitionOptionRelations(List *result,
-											PublicationPartOpt pub_partopt,
-											Oid relid);
 extern List *GetPublicationSchemas(Oid pubid);
 extern List *GetSchemaPublications(Oid schemaid);
-extern List *GetAllSchemaPublicationRelations(Oid puboid,
-											  PublicationPartOpt pub_partopt);
 extern List *GetSchemaPublicationRelations(Oid schemaid,
 										   PublicationPartOpt pub_partopt);
+extern List *GetAllSchemaPublicationRelations(Oid puboid,
+											  PublicationPartOpt pub_partopt);
 
 extern bool is_publishable_relation(Relation rel);
+extern List *GetPubPartitionOptionRelations(List* result,
+											PublicationPartOpt pub_partopt,
+											Oid relid);
 extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
 											  bool if_not_exists);
 extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 8220c72469..d34b4ac8e5 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -479,7 +479,6 @@ typedef enum NodeTag
 	T_CTESearchClause,
 	T_CTECycleClause,
 	T_CommonTableExpr,
-	T_PublicationObjSpec,
 	T_RoleSpec,
 	T_TriggerTransition,
 	T_PartitionElem,
@@ -488,6 +487,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationObjSpec,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 35a6b8ddde..c75dbece52 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -360,8 +360,8 @@ typedef enum PublicationObjSpecType
 {
 	PUBLICATIONOBJ_TABLE,		/* Table type */
 	PUBLICATIONOBJ_REL_IN_SCHEMA,	/* Relations in schema type */
-	PUBLICATIONOBJ_CONTINUATION,	/* Continuation of previous type */
-	PUBLICATIONOBJ_CURRSCHEMA	/* Get the first element from search_path */
+	PUBLICATIONOBJ_CURRSCHEMA,	/* Get the first element from search_path */
+	PUBLICATIONOBJ_CONTINUATION	/* Continuation of previous type */
 } PublicationObjSpecType;
 
 typedef struct PublicationObjSpec
-- 
2.28.0.windows.1

