From 634247cff61908b6c63c29b242bdaef5ce28767b Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Tue, 14 Jun 2022 13:44:09 +0800
Subject: [PATCH v7] Fix data replicated twice when specifying
 publish_via_partition_root option.

If there are two publications that publish the parent table and the child table
separately, and both specify the option publish_via_partition_root, subscribing
to both publications from one subscription causes initial copy twice. It should
only be copied once.

To fix this, the API function pg_get_publication_tables has been extended to
take a publication list. Now, when getting the table information, if the
publish_via_partition_root is true, the function can exclude a partition table
whose ancestor is also published by the same publication list.
---
 src/backend/catalog/pg_publication.c       | 133 ++++++++++++++-------
 src/backend/commands/subscriptioncmds.c    |  65 ++++++++--
 src/include/catalog/pg_proc.dat            |  12 +-
 src/test/regress/expected/rules.out        |   2 +-
 src/test/subscription/t/013_partition.pl   |  19 +--
 src/test/subscription/t/028_row_filter.pl  |  13 +-
 src/test/subscription/t/031_column_list.pl |   5 +-
 7 files changed, 175 insertions(+), 74 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index ade3bf3aca..44e95d7c1f 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1077,22 +1077,28 @@ get_publication_name(Oid pubid, bool missing_ok)
 }
 
 /*
- * Returns information of tables in a publication.
+ * Returns information of the tables in the given publication array.
  */
 Datum
 pg_get_publication_tables(PG_FUNCTION_ARGS)
 {
 #define NUM_PUBLICATION_TABLES_ELEM	3
-	FuncCallContext *funcctx;
-	char	   *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
-	Publication *publication;
-	List	   *tables;
+	FuncCallContext	*funcctx;
+	Publication		*publication;
+	List			*tables = NIL,
+					*results = NIL;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
 		TupleDesc	tupdesc;
 		MemoryContext oldcontext;
+		ArrayType  *arr;
+		Datum	   *elems;
+		int			nelems,
+					i;
+		bool		viaroot = false;
+		ListCell   *lc;
 
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
@@ -1100,42 +1106,87 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		/* switch to memory context appropriate for multiple function calls */
 		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-		publication = GetPublicationByName(pubname, false);
+		/* Deconstruct the parameter into elements. */
+		arr = PG_GETARG_ARRAYTYPE_P(0);
+		deconstruct_array(arr, TEXTOID, -1, false, TYPALIGN_INT,
+						  &elems, NULL, &nelems);
 
-		/*
-		 * Publications support partitioned tables, although all changes are
-		 * replicated using leaf partition identity and schema, so we only
-		 * need those.
-		 */
-		if (publication->alltables)
+		/* Get Oids of tables from each publication. */
+		for (i = 0; i < nelems; i++)
 		{
-			tables = GetAllTablesPublicationRelations(publication->pubviaroot);
-		}
-		else
-		{
-			List	   *relids,
-					   *schemarelids;
-
-			relids = GetPublicationRelations(publication->oid,
-											 publication->pubviaroot ?
-											 PUBLICATION_PART_ROOT :
-											 PUBLICATION_PART_LEAF);
-			schemarelids = GetAllSchemaPublicationRelations(publication->oid,
-															publication->pubviaroot ?
-															PUBLICATION_PART_ROOT :
-															PUBLICATION_PART_LEAF);
-			tables = list_concat_unique_oid(relids, schemarelids);
+			List		*current_tables = NIL;
+
+			publication = GetPublicationByName(TextDatumGetCString(elems[i]), false);
 
 			/*
-			 * If the publication publishes partition changes via their
-			 * respective root partitioned tables, we must exclude partitions
-			 * in favor of including the root partitioned tables. Otherwise,
-			 * the function could return both the child and parent tables
-			 * which could cause data of the child table to be
-			 * double-published on the subscriber side.
+			 * Publications support partitioned tables. If
+			 * publish_via_partition_root is false, all changes are replicated
+			 * using leaf partition identity and schema, so we only need those.
+			 * Otherwise, If publish_via_partition_root is true, get the
+			 * partitioned table itself.
 			 */
+			if (publication->alltables)
+				current_tables = GetAllTablesPublicationRelations(publication->pubviaroot);
+			else
+			{
+				List	   *relids,
+						   *schemarelids;
+
+				relids = GetPublicationRelations(publication->oid,
+												 publication->pubviaroot ?
+												 PUBLICATION_PART_ROOT :
+												 PUBLICATION_PART_LEAF);
+				schemarelids = GetAllSchemaPublicationRelations(publication->oid,
+																publication->pubviaroot ?
+																PUBLICATION_PART_ROOT :
+																PUBLICATION_PART_LEAF);
+				current_tables = list_concat(relids, schemarelids);
+			}
+
+			/*
+			 * Record the published table and the corresponding publication so
+			 * that we can get row filters and column list later.
+			 */
+			foreach(lc, current_tables)
+			{
+				Oid			*result = (Oid *) malloc(sizeof(Oid) * 2);
+
+				result[0] = lfirst_oid(lc);
+				result[1] = publication->oid;
+				results = lappend(results, result);
+			}
+
+			tables = list_concat(tables, current_tables);
+
+			/* At least one publication is using publish_via_partition_root. */
 			if (publication->pubviaroot)
-				tables = filter_partitions(tables);
+				viaroot = true;
+		}
+
+		pfree(elems);
+
+		/* Now sort and de-duplicate the result list */
+		list_sort(tables, list_oid_cmp);
+		list_deduplicate_oid(tables);
+
+		/*
+		 * If the publication publishes partition changes via their respective
+		 * root partitioned tables, we must exclude partitions in favor of
+		 * including the root partitioned tables. Otherwise, the function
+		 * could return both the child and parent tables which could cause
+		 * data of the child table to be double-published on the subscriber
+		 * side.
+		 */
+		if (viaroot)
+			tables = filter_partitions(tables);
+
+		/* Filter by final published table. */
+		foreach(lc, results)
+		{
+			Oid *table_info = (Oid *) lfirst(lc);
+
+			if (!list_member_oid(tables, table_info[0]))
+				results = foreach_delete_current(results, lc);
 		}
 
 		/* Construct a tuple descriptor for the result rows. */
@@ -1148,20 +1199,22 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 						   PG_NODE_TREEOID, -1, 0);
 
 		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
-		funcctx->user_fctx = (void *) tables;
+		funcctx->user_fctx = (void *) results;
 
 		MemoryContextSwitchTo(oldcontext);
 	}
 
 	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();
-	tables = (List *) funcctx->user_fctx;
+	results = (List *) funcctx->user_fctx;
 
-	if (funcctx->call_cntr < list_length(tables))
+	if (funcctx->call_cntr < list_length(results))
 	{
 		HeapTuple	pubtuple = NULL;
 		HeapTuple	rettuple;
-		Oid			relid = list_nth_oid(tables, funcctx->call_cntr);
+		Oid		   *table_info = (Oid *) list_nth(results, funcctx->call_cntr);
+		Oid			relid = table_info[0],
+					pubid = table_info[1];
 		Datum		values[NUM_PUBLICATION_TABLES_ELEM] = {0};
 		bool		nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
 
@@ -1169,7 +1222,7 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * Form tuple with appropriate data.
 		 */
 
-		publication = GetPublicationByName(pubname, false);
+		publication = GetPublication(pubid);
 
 		values[0] = ObjectIdGetDatum(relid);
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index bd0cc0848d..cca862adb6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1794,25 +1794,70 @@ static List *
 fetch_table_list(WalReceiverConn *wrconn, List *publications)
 {
 	WalRcvExecResult *res;
-	StringInfoData cmd;
+	StringInfoData cmd,
+				pub_names;
 	TupleTableSlot *slot;
-	Oid			tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID};
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, INT2VECTOROID};
 	List	   *tablelist = NIL;
-	bool		check_columnlist = (walrcv_server_version(wrconn) >= 150000);
+	int			server_version = walrcv_server_version(wrconn);
+	bool		check_columnlist = (server_version >= 150000);
+
+	initStringInfo(&pub_names);
+	get_publications_str(publications, &pub_names, true);
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n");
 
 	/* Get column lists for each relation if the publisher supports it */
-	if (check_columnlist)
-		appendStringInfoString(&cmd, ", t.attnames\n");
+	if (server_version >= 160000)
+		appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname,\n"
+						 "              ( CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)\n"
+						 "                     THEN NULL ELSE gpt.attrs END\n"
+						 "              ) AS attnames\n"
+						 " FROM pg_class c\n"
+						 "   JOIN pg_namespace n ON n.oid = c.relnamespace\n"
+						 "   JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
+						 "          FROM pg_publication\n"
+						 "          WHERE pubname IN ( %s )) as gpt\n"
+						 "       ON gpt.relid = c.oid\n",
+						 pub_names.data);
+	else
+	{
+		/*
+		 * Get the list of tables from publisher, the partition table whose
+		 * ancestor is also in this list should be ignored, otherwise the
+		 * initial data in the partition table would be replicated twice.
+		 */
 
-	appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	get_publications_str(publications, &cmd, true);
-	appendStringInfoChar(&cmd, ')');
+		appendStringInfoString(&cmd, "WITH pub_tabs AS(\n"
+							   " SELECT DISTINCT N.nspname, C.oid, C.relname, C.relispartition\n");
+
+		/* Get column lists for each relation if the publisher supports it */
+		if (check_columnlist)
+			appendStringInfoString(&cmd, ",( CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)\n"
+								   "              THEN NULL ELSE gpt.attrs END\n"
+								   "       ) AS attnames\n");
+
+		appendStringInfo(&cmd, " FROM pg_publication P,\n"
+						 "      LATERAL pg_get_publication_tables(P.pubname) GPT,\n"
+						 "      pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+						 "  WHERE C.oid = GPT.relid AND P.pubname IN ( %s )\n"
+						 ")\n"
+						 "SELECT DISTINCT pub_tabs.nspname, pub_tabs.relname\n",
+						 pub_names.data);
+
+		/* Get column lists for each relation if the publisher supports it */
+		if (check_columnlist)
+			appendStringInfoString(&cmd, ", pub_tabs.attnames\n");
+
+		appendStringInfoString(&cmd, "FROM pub_tabs\n"
+							   " WHERE (pub_tabs.relispartition IS FALSE\n"
+							   "  OR NOT EXISTS (SELECT 1 FROM pg_partition_ancestors(pub_tabs.oid) as pa\n"
+							   "                  WHERE pa.relid IN (SELECT pub_tabs.oid FROM pub_tabs)\n"
+							   "                   AND pa.relid != pub_tabs.oid))\n");
+	}
 
 	res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow);
+	pfree(pub_names.data);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 2e41f4d9e8..8275ba9d4e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11673,11 +11673,13 @@
   prosrc => 'pg_show_replication_origin_status' },
 
 # publications
-{ oid => '6119', descr => 'get information of tables in a publication',
-  proname => 'pg_get_publication_tables', prorows => '1000', proretset => 't',
-  provolatile => 's', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,oid,int2vector,pg_node_tree}',
-  proargmodes => '{i,o,o,o}', proargnames => '{pubname,relid,attrs,qual}',
+{ oid => '6119',
+  descr => 'get information of tables in one or more publications',
+  proname => 'pg_get_publication_tables', prorows => '1000',
+  provariadic => 'text', proretset => 't', provolatile => 's',
+  prorettype => 'record', proargtypes => '_text',
+  proallargtypes => '{_text,oid,int2vector,pg_node_tree}',
+  proargmodes => '{v,o,o,o}', proargnames => '{pubname,relid,attrs,qual}',
   prosrc => 'pg_get_publication_tables' },
 { oid => '6121',
   descr => 'returns whether a relation can be part of a publication',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 7ec3d2688f..54f5ce5da9 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1449,7 +1449,7 @@ pg_publication_tables| SELECT p.pubname,
              JOIN pg_attribute a ON (((a.attrelid = gpt.relid) AND (a.attnum = k.k))))) AS attnames,
     pg_get_expr(gpt.qual, gpt.relid) AS rowfilter
    FROM pg_publication p,
-    LATERAL pg_get_publication_tables((p.pubname)::text) gpt(relid, attrs, qual),
+    LATERAL pg_get_publication_tables(VARIADIC ARRAY[(p.pubname)::text]) gpt(relid, attrs, qual),
     (pg_class c
      JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
   WHERE (c.oid = gpt.relid);
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 0dfbbabc3b..008cea6181 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -477,12 +477,13 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)"
 );
 
-# Note: We create two separate tables, not a partitioned one, so that we can
-# easily identity through which relation were the changes replicated.
+# Note: We only create one table for the partitioned table (tab4) here. Because
+# we specify option "publish_via_partition_root" (see pub_all and
+# pub_lower_level above), all data should be replicated to the partitioned
+# table. So we do not need to create table for the partition table.
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)");
-$node_subscriber2->safe_psql('postgres',
-	"CREATE TABLE tab4_1 (a int PRIMARY KEY)");
+
 # Publication that sub2 points to now publishes via root, so must update
 # subscription target relations. We set the list of publications so that
 # the FOR ALL TABLES publication is second (the list order matters).
@@ -556,11 +557,6 @@ $result =
   $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1");
 is($result, qq(0), 'inserts into tab4 replicated');
 
-$result =
-  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
-is($result, qq(), 'inserts into tab4_1 replicated');
-
-
 # now switch the order of publications in the list, try again, the result
 # should be the same (no dependence on order of pulications)
 $node_subscriber2->safe_psql('postgres',
@@ -584,11 +580,6 @@ $result =
 is( $result, qq(0
 1), 'inserts into tab4 replicated');
 
-$result =
-  $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1");
-is($result, qq(), 'inserts into tab4_1 replicated');
-
-
 # update (replicated as update)
 $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
 $node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index b1fb2d7cae..043b5031fc 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -393,6 +393,10 @@ $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rowfilter_child(a, b) VALUES(0,'0'),(30,'30'),(40,'40')"
 );
 
+# insert data into partitioned table.
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_viaroot_part(a) VALUES(13), (17)");
+
 $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
 );
@@ -718,13 +722,18 @@ is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast');
 # the row filter for the top-level ancestor:
 #
 # tab_rowfilter_viaroot_part filter is: (a > 15)
+# - INSERT (13)        NO, 13 < 15
 # - INSERT (14)        NO, 14 < 15
 # - INSERT (15)        NO, 15 = 15
 # - INSERT (16)        YES, 16 > 15
+# - INSERT (17)        YES, 17 > 15
 $result =
   $node_subscriber->safe_psql('postgres',
-	"SELECT a FROM tab_rowfilter_viaroot_part");
-is($result, qq(16), 'check replicated rows to tab_rowfilter_viaroot_part');
+	"SELECT a FROM tab_rowfilter_viaroot_part ORDER BY 1");
+is($result, qq(16
+17),
+	'check replicated rows to tab_rowfilter_viaroot_part'
+);
 
 # Check there is no data in tab_rowfilter_viaroot_part_1 because rows are
 # replicated via the top most parent table tab_rowfilter_viaroot_part
diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl
index 9fa6e0b35f..3d07b456bc 100644
--- a/src/test/subscription/t/031_column_list.pl
+++ b/src/test/subscription/t/031_column_list.pl
@@ -973,7 +973,8 @@ $node_publisher->safe_psql(
 	CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10);
 	CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20);
 
-	CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_1 FOR TABLE test_root (a) WITH (publish_via_partition_root = true);
+	CREATE PUBLICATION pub_root_true_2 FOR TABLE test_root_1 (a, b) WITH (publish_via_partition_root = true);
 
 	-- initial data
 	INSERT INTO test_root VALUES (1, 2, 3);
@@ -982,7 +983,7 @@ $node_publisher->safe_psql(
 
 $node_subscriber->safe_psql(
 	'postgres', qq(
-	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true;
+	CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true_1, pub_root_true_2;
 ));
 
 wait_for_subscription_sync($node_subscriber);
-- 
2.23.0.windows.1

