From c7d8a9a5b124ae808263c3a5773c3925196deb7a Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Tue, 14 Jun 2022 13:44:09 +0800
Subject: [PATCH v8] Fix data replicated twice when specifying
 publish_via_partition_root option.

If there are two publications, one of them publish a parent table with
(publish_via_partition_root = true) and another publish child table,
subscribing to both publications from one subscription results in two initial
replications. It should only be copied once.

To fix this, the API function pg_get_publication_tables has been extended to
take a publication list. Now, when getting the table information, if the
publish_via_partition_root is true, the function can exclude a partition table
whose ancestor is also published by the same publication list.
---
 src/backend/catalog/pg_publication.c       | 138 ++++++++++++++-------
 src/backend/commands/subscriptioncmds.c    |  67 ++++++++--
 src/include/catalog/pg_proc.dat            |  12 +-
 src/test/regress/expected/rules.out        |   2 +-
 src/test/subscription/t/013_partition.pl   |  18 +--
 src/test/subscription/t/028_row_filter.pl  |  13 +-
 src/test/subscription/t/031_column_list.pl |   5 +-
 7 files changed, 178 insertions(+), 77 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 6af3570005..93e8e03bab 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1026,22 +1026,27 @@ GetPublicationByName(const char *pubname, bool missing_ok)
 }
 
 /*
- * Returns information of tables in a publication.
+ * Returns information of the tables in the given publication array.
  */
 Datum
 pg_get_publication_tables(PG_FUNCTION_ARGS)
 {
 #define NUM_PUBLICATION_TABLES_ELEM	3
-	FuncCallContext *funcctx;
-	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
-	Publication *publication;
-	List	   *tables;
+	FuncCallContext	*funcctx;
+	List			*tables = NIL,
+					*results = NIL;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
 		TupleDesc	tupdesc;
 		MemoryContext oldcontext;
+		ArrayType  *arr;
+		Datum	   *elems;
+		int			nelems,
+					i;
+		bool		viaroot = false;
+		ListCell   *lc;
 
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
@@ -1049,42 +1054,88 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		/* switch to memory context appropriate for multiple function calls */
 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-		publication = GetPublicationByName(pubname, false);
+		/* Deconstruct the parameter into elements. */
+		arr = PG_GETARG_ARRAYTYPE_P(0);
+		deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
+						  &elems, NULL, &nelems);
 
-		/*
-		 * Publications support partitioned tables, although all changes are
-		 * replicated using leaf partition identity and schema, so we only
-		 * need those.
-		 */
-		if (publication->alltables)
+		/* Get Oids of tables from each publication. */
+		for (i = 0; i < nelems; i++)
 		{
-			tables = GetAllTablesPublicationRelations(publication->pubviaroot);
-		}
-		else
-		{
-			List	   *relids,
-					   *schemarelids;
-
-			relids = GetPublicationRelations(publication->oid,
-											 publication->pubviaroot ?
-											 PUBLICATION_PART_ROOT :
-											 PUBLICATION_PART_LEAF);
-			schemarelids = GetAllSchemaPublicationRelations(publication->oid,
-															publication->pubviaroot ?
-															PUBLICATION_PART_ROOT :
-															PUBLICATION_PART_LEAF);
-			tables = list_concat_unique_oid(relids, schemarelids);
+			Publication *pub_elem;
+			List		*current_tables = NIL;
+
+			pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
 
 			/*
-			 * If the publication publishes partition changes via their
-			 * respective root partitioned tables, we must exclude partitions
-			 * in favor of including the root partitioned tables. Otherwise,
-			 * the function could return both the child and parent tables
-			 * which could cause data of the child table to be
-			 * double-published on the subscriber side.
+			 * Publications support partitioned tables. If
+			 * publish_via_partition_root is false, all changes are replicated
+			 * using leaf partition identity and schema, so we only need those.
+			 * Otherwise, If publish_via_partition_root is true, get the
+			 * partitioned table itself.
 			 */
-			if (publication->pubviaroot)
-				tables = filter_partitions(tables);
+			if (pub_elem->alltables)
+				current_tables = GetAllTablesPublicationRelations(pub_elem->pubviaroot);
+			else
+			{
+				List	   *relids,
+						   *schemarelids;
+
+				relids = GetPublicationRelations(pub_elem->oid,
+												 pub_elem->pubviaroot ?
+												 PUBLICATION_PART_ROOT :
+												 PUBLICATION_PART_LEAF);
+				schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
+																pub_elem->pubviaroot ?
+																PUBLICATION_PART_ROOT :
+																PUBLICATION_PART_LEAF);
+				current_tables = list_concat(relids, schemarelids);
+			}
+
+			/*
+			 * Record the published table and the corresponding publication so
+			 * that we can get row filters and column list later.
+			 */
+			foreach(lc, current_tables)
+			{
+				Oid			*result = (Oid *) malloc(sizeof(Oid) * 2);
+
+				result[0] = lfirst_oid(lc);
+				result[1] = pub_elem->oid;
+				results = lappend(results, result);
+			}
+
+			tables = list_concat(tables, current_tables);
+
+			/* At least one publication is using publish_via_partition_root. */
+			if (pub_elem->pubviaroot)
+				viaroot = true;
+		}
+
+		pfree(elems);
+
+		/* Now sort and de-duplicate the result list */
+		list_sort(tables, list_oid_cmp);
+		list_deduplicate_oid(tables);
+
+		/*
+		 * If the publication publishes partition changes via their respective
+		 * root partitioned tables, we must exclude partitions in favor of
+		 * including the root partitioned tables. Otherwise, the function
+		 * could return both the child and parent tables which could cause
+		 * data of the child table to be double-published on the subscriber
+		 * side.
+		 */
+		if (viaroot)
+			tables = filter_partitions(tables);
+
+		/* Filter by final published table */
+		foreach(lc, results)
+		{
+			Oid *table_info = (Oid *) lfirst(lc);
+
+			if (!list_member_oid(tables, table_info[0]))
+				results = foreach_delete_current(results, lc);
 		}
 
 		/* Construct a tuple descriptor for the result rows. */
@@ -1097,20 +1148,23 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 						   PG_NODE_TREEOID, -1, 0);
 
 		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
-		funcctx->user_fctx = (void *) tables;
+		funcctx->user_fctx = (void *) results;
 
 		MemoryContextSwitchTo(oldcontext);
 	}
 
 	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();
-	tables = (List *) funcctx->user_fctx;
+	results = (List *) funcctx->user_fctx;
 
-	if (funcctx->call_cntr < list_length(tables))
+	if (funcctx->call_cntr < list_length(results))
 	{
 		HeapTuple	pubtuple = NULL;
 		HeapTuple	rettuple;
-		Oid			relid = list_nth_oid(tables, funcctx->call_cntr);
+		Publication *pub;
+		Oid		   *table_info = (Oid *) list_nth(results, funcctx->call_cntr);
+		Oid			relid = table_info[0],
+					pubid = table_info[1];
 		Datum		values[NUM_PUBLICATION_TABLES_ELEM] = {0};
 		bool		nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
 
@@ -1118,13 +1172,13 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * Form tuple with appropriate data.
 		 */
 
-		publication = GetPublicationByName(pubname, false);
+		pub = GetPublication(pubid);
 
 		values[0] = ObjectIdGetDatum(relid);
 
 		pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
 									   ObjectIdGetDatum(relid),
-									   ObjectIdGetDatum(publication->oid));
+									   ObjectIdGetDatum(pub->oid));
 
 		if (HeapTupleIsValid(pubtuple))
 		{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f73dfb6067..163b79291a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1794,25 +1794,70 @@ static List *
 fetch_table_list(WalReceiverConn *wrconn, List *publications)
 {
 	WalRcvExecResult *res;
-	StringInfoData cmd;
+	StringInfoData cmd,
+				pub_names;
 	TupleTableSlot *slot;
-	Oid			tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID};
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, INT2VECTOROID};
 	List	   *tablelist = NIL;
-	bool		check_columnlist = (walrcv_server_version(wrconn) >= 150000);
+	int			server_version = walrcv_server_version(wrconn);
+	bool		check_columnlist = (server_version >= 150000);
+
+	initStringInfo(&pub_names);
+	get_publications_str(publications, &pub_names, true);
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
 
-	/* Get column lists for each relation if the publisher supports it */
-	if (check_columnlist)
-		appendStringInfoString(&cmd, ", t.attnames\n");
+	/* Get information of the tables belonging to the specified publications */
+	if (server_version >= 160000)
+		appendStringInfo(&cmd, "SELECT DISTINCT N.nspname, C.relname,\n"
+						 "              ( CASE WHEN (array_length(GPT.attrs, 1) = C.relnatts)\n"
+						 "                     THEN NULL ELSE GPT.attrs END\n"
+						 "              ) AS attnames\n"
+						 " FROM pg_class C\n"
+						 "   JOIN pg_namespace N ON N.oid = C.relnamespace\n"
+						 "   JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
+						 "          FROM pg_publication\n"
+						 "          WHERE pubname IN ( %s )) as GPT\n"
+						 "       ON GPT.relid = C.oid\n",
+						 pub_names.data);
+	else
+	{
+		/*
+		 * Get the list of tables from publisher, the partition table whose
+		 * ancestor is also in this list will be ignored, otherwise the initial
+		 * data in the partition table would be replicated twice.
+		 */
 
-	appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	get_publications_str(publications, &cmd, true);
-	appendStringInfoChar(&cmd, ')');
+		appendStringInfoString(&cmd, "WITH pub_tabs AS(\n"
+							   " SELECT DISTINCT N.nspname, C.oid, C.relname, C.relispartition\n");
+
+		/* Get column lists for each relation if the publisher supports it */
+		if (check_columnlist)
+			appendStringInfoString(&cmd, ",( CASE WHEN (array_length(GPT.attrs, 1) = C.relnatts)\n"
+								   "              THEN NULL ELSE GPT.attrs END\n"
+								   "       ) AS attnames\n");
+
+		appendStringInfo(&cmd, " FROM pg_publication P,\n"
+						 "      LATERAL pg_get_publication_tables(P.pubname) GPT,\n"
+						 "      pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+						 "  WHERE C.oid = GPT.relid AND P.pubname IN ( %s )\n"
+						 ")\n"
+						 "SELECT DISTINCT pub_tabs.nspname, pub_tabs.relname\n",
+						 pub_names.data);
+
+		/* Get column lists for each relation if the publisher supports it */
+		if (check_columnlist)
+			appendStringInfoString(&cmd, ", pub_tabs.attnames\n");
+
+		appendStringInfoString(&cmd, "FROM pub_tabs\n"
+							   " WHERE (pub_tabs.relispartition IS FALSE\n"
+							   "  OR NOT EXISTS (SELECT 1 FROM pg_partition_ancestors(pub_tabs.oid) as PA\n"
+							   "                  WHERE PA.relid IN (SELECT pub_tabs.oid FROM pub_tabs)\n"
+							   "                   AND PA.relid != pub_tabs.oid))\n");
+	}
 
 	res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
+	pfree(pub_names.data);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index be47583122..33f40e81d6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11681,11 +11681,13 @@
   prosrc => 'pg_show_replication_origin_status' },
 
 # publications
-{ oid => '6119', descr => 'get information of tables in a publication',
-  proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
-  provolatile => 's', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,oid,int2vector,pg_node_tree}',
-  proargmodes => '{i,o,o,o}', proargnames => '{pubname,relid,attrs,qual}',
+{ oid => '6119',
+  descr => 'get information of tables in one or more publications',
+  proname => 'pg_get_publication_tables', prorows => '1000',
+  provariadic => 'text', proretset => 't', provolatile => 's',
+  prorettype => 'record', proargtypes => '_text',
+  proallargtypes => '{_text,oid,int2vector,pg_node_tree}',
+  proargmodes => '{v,o,o,o}', proargnames => '{pubname,relid,attrs,qual}',
   prosrc => 'pg_get_publication_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 7ec3d2688f..54f5ce5da9 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1449,7 +1449,7 @@ pg_publication_tables| SELECT p.pubname,
              JOIN pg_attribute a ON (((a.attrelid = gpt.relid) AND (a.attnum = k.k))))) AS attnames,
     pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
    FROM pg_publication p,
-    LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid, attrs, qual),
+    LATERAL pg_get_publication_tables(VARIADIC ARRAY[(p.pubname)::text]) gpt(relid, attrs, qual),
     (pg_class c
      JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
   WHERE (c.oid = gpt.relid);
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 8b33e4e7ae..100e2d2fd0 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -473,12 +473,12 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)"
 );
 
-# Note: We create two separate tables, not a partitioned one, so that we can
-# easily identity through which relation were the changes replicated.
+# Note: We only create one table (tab4) here. We specified
+# publish_via_partition_root = true (see pub_all and pub_lower_level above), so
+# all data will be replicated to that table.
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)");
-$node_subscriber2->safe_psql('postgres',
-	"CREATE TABLE tab4_1 (a int PRIMARY KEY)");
+
 # Publication that sub2 points to now publishes via root, so must update
 # subscription target relations. We set the list of publications so that
 # the FOR ALL TABLES publication is second (the list order matters).
@@ -550,11 +550,6 @@ $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1");
 is($result, qq(0), 'inserts into tab4 replicated');
 
-$result =
-  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
-is($result, qq(), 'inserts into tab4_1 replicated');
-
-
 # now switch the order of publications in the list, try again, the result
 # should be the same (no dependence on order of pulications)
 $node_subscriber2->safe_psql('postgres',
@@ -577,11 +572,6 @@ $result =
 is( $result, qq(0
 1), 'inserts into tab4 replicated');
 
-$result =
-  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
-is($result, qq(), 'inserts into tab4_1 replicated');
-
-
 # update (replicated as update)
 $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
 $node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index f5f8a67092..e0d388d37f 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -386,6 +386,10 @@ $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rowfilter_child(a, b) VALUES(0,'0'),(30,'30'),(40,'40')"
 );
 
+# insert data into partitioned table.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_viaroot_part(a) VALUES(13), (17)");
+
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
 );
@@ -707,13 +711,18 @@ is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast');
 # the row filter for the top-level ancestor:
 #
 # tab_rowfilter_viaroot_part filter is: (a > 15)
+# - INSERT (13)        NO, 13 < 15
 # - INSERT (14)        NO, 14 < 15
 # - INSERT (15)        NO, 15 = 15
 # - INSERT (16)        YES, 16 > 15
+# - INSERT (17)        YES, 17 > 15
 $result =
   $node_subscriber->safe_psql('postgres',
-	"SELECT a FROM tab_rowfilter_viaroot_part");
-is($result, qq(16), 'check replicated rows to tab_rowfilter_viaroot_part');
+	"SELECT a FROM tab_rowfilter_viaroot_part ORDER BY 1");
+is($result, qq(16
+17),
+	'check replicated rows to tab_rowfilter_viaroot_part'
+);
 
 # Check there is no data in tab_rowfilter_viaroot_part_1 because rows are
 # replicated via the top most parent table tab_rowfilter_viaroot_part
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index b6644556cf..9f88b9ef52 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -959,7 +959,8 @@ $node_publisher->safe_psql(
 	CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
 	CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
 
-	CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_1 FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_2 FOR TABLE test_root_1 (a, b) WITH (publish_via_partition_root = true);
 
 	-- initial data
 	INSERT INTO test_root VALUES (1, 2, 3);
@@ -968,7 +969,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true_1, pub_root_true_2;
 ));
 
 $node_subscriber->wait_for_subscription_sync;
-- 
2.23.0.windows.1

