From 1bab59788056c21060e6a1de41d1f42831544294 Mon Sep 17 00:00:00 2001
From: "houzj.fnst" <houzj.fnst@cn.fujitsu.com>
Date: Fri, 17 Sep 2021 11:43:16 +0800
Subject: [PATCH] Invalidate all partitions for partitioned table in
 publication.

Updates/Deletes on a partition were allowed even without replica identity
after the parent table was added to a publiaction. This would later lead to
an error on subscribers.The reason was that we were not invalidating the
partition's relcache and the publication information for partitions was not
getting rebuilt. Similarly, we were not invalidating the partitions' relcache
after dropping a partitioned table from publication which will prohibit
Updates/Deletes on its partition without replica identity even without any
publication.

In addition to invalidating all partitions, made the existing relation cache
invalidation code into a function. Also made getting the relations based on the
publication partition option for a specified relation into a function.

---
 src/backend/catalog/pg_publication.c      | 82 ++++++++++++++++++++-----------
 src/backend/commands/publicationcmds.c    | 57 ++++++++++++---------
 src/include/catalog/pg_publication.h      |  3 ++
 src/include/commands/publicationcmds.h    |  5 ++
 src/test/regress/expected/publication.out | 13 ++++-
 src/test/regress/sql/publication.sql      | 11 ++++-
 6 files changed, 116 insertions(+), 55 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 61bed1d..5d7e1b1 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -31,6 +31,7 @@
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 #include "catalog/pg_type.h"
+#include "commands/publicationcmds.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "utils/array.h"
@@ -136,6 +137,42 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
 	PG_RETURN_BOOL(result);
 }
 
+/*
+ * Gets the relations based on the publication partition option for a specified
+ * relation.
+ */
+List *
+GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
+							   Oid relid)
+{
+	if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
+		pub_partopt != PUBLICATION_PART_ROOT)
+	{
+		List	   *all_parts = find_all_inheritors(relid, NoLock,
+													NULL);
+
+		if (pub_partopt == PUBLICATION_PART_ALL)
+			result = list_concat(result, all_parts);
+		else if (pub_partopt == PUBLICATION_PART_LEAF)
+		{
+			ListCell   *lc;
+
+			foreach(lc, all_parts)
+			{
+				Oid			partOid = lfirst_oid(lc);
+
+				if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
+					result = lappend_oid(result, partOid);
+			}
+		}
+		else
+			Assert(false);
+	}
+	else
+		result = lappend_oid(result, relid);
+
+	return result;
+}
 
 /*
  * Insert new publication / relation mapping.
@@ -153,6 +190,7 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	List	   *relids = NIL;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -208,8 +246,18 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
-	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	/*
+	 * Invalidate relcache so that publication info is rebuilt.
+	 *
+	 * For the partitioned tables, we must invalidate all partitions contained
+	 * in the respective partition hierarchies, not just the one explicitly
+	 * mentioned in the publication. This is required because we implicitly
+	 * publish the child tables when the parent table is published.
+	 */
+	relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
+											relid);
+
+	InvalidatePublicationRels(relids);
 
 	return myself;
 }
@@ -241,7 +289,7 @@ GetRelationPublications(Oid relid)
 /*
  * Gets list of relation oids for a publication.
  *
- * This should only be used for normal publications, the FOR ALL TABLES
+ * This should only be used FOR TABLE publications, the FOR ALL TABLES
  * should use GetAllTablesPublicationRelations().
  */
 List *
@@ -270,32 +318,8 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 		Form_pg_publication_rel pubrel;
 
 		pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
-
-		if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
-			pub_partopt != PUBLICATION_PART_ROOT)
-		{
-			List	   *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
-														NULL);
-
-			if (pub_partopt == PUBLICATION_PART_ALL)
-				result = list_concat(result, all_parts);
-			else if (pub_partopt == PUBLICATION_PART_LEAF)
-			{
-				ListCell   *lc;
-
-				foreach(lc, all_parts)
-				{
-					Oid			partOid = lfirst_oid(lc);
-
-					if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
-						result = lappend_oid(result, partOid);
-				}
-			}
-			else
-				Assert(false);
-		}
-		else
-			result = lappend_oid(result, pubrel->prrelid);
+		result = GetPubPartitionOptionRelations(result, pub_partopt,
+												pubrel->prrelid);
 	}
 
 	systable_endscan(scan);
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 8797101..7ee8825 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -45,9 +45,6 @@
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
-/* Same as MAXNUMMESSAGES in sinvaladt.c */
-#define MAX_RELCACHE_INVAL_MSGS 4096
-
 static List *OpenTableList(List *tables);
 static void CloseTableList(List *rels);
 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
@@ -330,23 +327,7 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
 		List	   *relids = GetPublicationRelations(pubform->oid,
 													 PUBLICATION_PART_ALL);
 
-		/*
-		 * We don't want to send too many individual messages, at some point
-		 * it's cheaper to just reset whole relcache.
-		 */
-		if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
-		{
-			ListCell   *lc;
-
-			foreach(lc, relids)
-			{
-				Oid			relid = lfirst_oid(lc);
-
-				CacheInvalidateRelcacheByRelid(relid);
-			}
-		}
-		else
-			CacheInvalidateRelcacheAll();
+		InvalidatePublicationRels(relids);
 	}
 
 	ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
@@ -357,6 +338,27 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
 }
 
 /*
+ * Invalidate the relations.
+ */
+void
+InvalidatePublicationRels(List *relids)
+{
+	/*
+	 * We don't want to send too many individual messages, at some point it's
+	 * cheaper to just reset whole relcache.
+	 */
+	if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
+	{
+		ListCell   *lc;
+
+		foreach(lc, relids)
+			CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
+	}
+	else
+		CacheInvalidateRelcacheAll();
+}
+
+/*
  * Add or remove table to/from publication.
  */
 static void
@@ -512,6 +514,7 @@ RemovePublicationRelById(Oid proid)
 	Relation	rel;
 	HeapTuple	tup;
 	Form_pg_publication_rel pubrel;
+	List	   *relids = NIL;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -523,8 +526,18 @@ RemovePublicationRelById(Oid proid)
 
 	pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
 
-	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcacheByRelid(pubrel->prrelid);
+	/*
+	 * Invalidate relcache so that publication info is rebuilt.
+	 *
+	 * For the partitioned tables, we must invalidate all partitions contained
+	 * in the respective partition hierarchies, not just the one explicitly
+	 * mentioned in the publication. This is required because we implicitly
+	 * publish the child tables when the parent table is published.
+	 */
+	relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
+											pubrel->prrelid);
+
+	InvalidatePublicationRels(relids);
 
 	CatalogTupleDelete(rel, &tup->t_self);
 
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 5955ba0..c876085 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -107,6 +107,9 @@ extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 extern bool is_publishable_relation(Relation rel);
 extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
 											  bool if_not_exists);
+extern List *GetPubPartitionOptionRelations(List *result,
+											PublicationPartOpt pub_partopt,
+											Oid relid);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index e713df0..5573bc5 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -17,6 +17,10 @@
 
 #include "catalog/objectaddress.h"
 #include "nodes/parsenodes.h"
+#include "utils/inval.h"
+
+/* Same as MAXNUMMESSAGES in sinvaladt.c */
+#define MAX_RELCACHE_INVAL_MSGS 4096
 
 extern ObjectAddress CreatePublication(CreatePublicationStmt *stmt);
 extern void AlterPublication(AlterPublicationStmt *stmt);
@@ -25,5 +29,6 @@ extern void RemovePublicationRelById(Oid proid);
 
 extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
 extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
+extern void InvalidatePublicationRels(List *relids);
 
 #endif							/* PUBLICATIONCMDS_H */
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index c299702..f278593 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -124,10 +124,12 @@ CREATE PUBLICATION testpub_forparted;
 CREATE PUBLICATION testpub_forparted1;
 RESET client_min_messages;
 CREATE TABLE testpub_parted1 (LIKE testpub_parted);
+CREATE TABLE testpub_parted2 (LIKE testpub_parted);
 ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
 -- works despite missing REPLICA IDENTITY, because updates are not replicated
 UPDATE testpub_parted1 SET a = 1;
-ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
@@ -154,7 +156,14 @@ ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 Tables:
     "public.testpub_parted"
 
-DROP TABLE testpub_parted1;
+-- still fail, because parent's publication replicates updates
+UPDATE testpub_parted2 SET a = 2;
+ERROR:  cannot update table "testpub_parted2" because it does not have a replica identity and publishes updates
+HINT:  To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
+ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
+-- works again, because update is no longer replicated
+UPDATE testpub_parted2 SET a = 2;
+DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 -- Test cache invalidation FOR ALL TABLES publication
 SET client_min_messages = 'ERROR';
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 04b34ee..e5745d5 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -76,10 +76,12 @@ CREATE PUBLICATION testpub_forparted;
 CREATE PUBLICATION testpub_forparted1;
 RESET client_min_messages;
 CREATE TABLE testpub_parted1 (LIKE testpub_parted);
+CREATE TABLE testpub_parted2 (LIKE testpub_parted);
 ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted2 FOR VALUES IN (2);
 -- works despite missing REPLICA IDENTITY, because updates are not replicated
 UPDATE testpub_parted1 SET a = 1;
-ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
@@ -90,7 +92,12 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 UPDATE testpub_parted1 SET a = 1;
 ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 \dRp+ testpub_forparted
-DROP TABLE testpub_parted1;
+-- still fail, because parent's publication replicates updates
+UPDATE testpub_parted2 SET a = 2;
+ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
+-- works again, because update is no longer replicated
+UPDATE testpub_parted2 SET a = 2;
+DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
 -- Test cache invalidation FOR ALL TABLES publication
-- 
2.7.2.windows.1

