From edee4e94e98b0eabb3ba053a40ab1bed7ebecf0f Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Thu, 7 Apr 2022 11:03:32 +0800
Subject: [PATCH v1] Fix data replicated twice when specifying
 PUBLISH_VIA_PARTITION_ROOT option.

If there are two publications that publish the parent table and the child table
separately, and both specify the option PUBLISH_VIA_PARTITION_ROOT, when
subscribing to both publications using one subscription, the data is replicated
twice. What we expect is to be copied only once.

To fix this, we get table informations by two steps. The first is to get the
oid list of the table, and then do filtering. Second is to get the required
table informations according to the filtered oid list.
---
 src/backend/commands/subscriptioncmds.c | 235 ++++++++++++++++++++++--
 src/test/subscription/t/100_bugs.pl     |  69 +++++++
 2 files changed, 284 insertions(+), 20 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 51505373ea..52dc250cc3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1756,6 +1756,77 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 	table_close(rel, RowExclusiveLock);
 }
 
+/*
+ * Filters based on the option viaroot for tables with the same topmost root
+ * table on the publisher connection.
+ */
+static List *
+filter_partitions_oids(WalReceiverConn *wrconn, List **partition_oids,
+					   bool pubviaroot)
+{
+	WalRcvExecResult	*res;
+	StringInfoData		 cmd;
+	Oid					 tableRow[1] = {OIDOID};
+	TupleTableSlot		*slot;
+	ListCell			*lc;
+	List				*filter_list = NIL;
+
+	/* If no viaroot is specified, no filtering is required. */
+	if (!pubviaroot)
+		return *partition_oids;
+
+	/*
+	 * For each table in partition_oids, get all parent tables on the publisher
+	 * connection to see if any parent table is in partition_oids.
+	 * If any, the current table will be filtered out.
+	 */
+	foreach(lc, *partition_oids)
+	{
+		bool	skip = false;
+		Oid		relid = lfirst_oid(lc);
+
+		initStringInfo(&cmd);
+		appendStringInfo(&cmd,
+							   "SELECT pg_partition_ancestors( %u )::oid",
+							   relid);
+
+		res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+		pfree(cmd.data);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("could not receive list of parent tables from the publisher: %s",
+							res->err)));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			bool	isnull;
+			Oid		parentid;
+
+			parentid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
+			Assert(!isnull);
+
+			/* Check if the parent table exists in the published table list. */
+			if (parentid != relid &&
+				list_member_oid(*partition_oids, parentid))
+			{
+				skip = true;
+				break;
+			}
+		}
+		ExecClearTuple(slot);
+		ExecDropSingleTupleTableSlot(slot);
+		walrcv_clear_result(res);
+
+		if (!skip)
+			filter_list = lappend_oid(filter_list, relid);
+	}
+
+	return filter_list;
+}
+
 /*
  * Get the list of tables which belong to specified publications on the
  * publisher connection.
@@ -1764,19 +1835,41 @@ static List *
 fetch_table_list(WalReceiverConn *wrconn, List *publications)
 {
 	WalRcvExecResult *res;
-	StringInfoData cmd;
+	StringInfoData cmd,
+				   pub_names;
 	TupleTableSlot *slot;
-	Oid			tableRow[2] = {TEXTOID, TEXTOID};
-	List	   *tablelist = NIL;
+	Oid			 oidRow[4] = {OIDOID, BOOLOID, OIDOID, INT2OID},
+				 tableRow[2] = {TEXTOID, TEXTOID},
+				 root_of_partition = InvalidOid;
+	List		*result_oids = NIL,
+				*partition_oids = NIL,
+				*tablelist = NIL;
+	bool		 is_pubviaroot = false,
+				 first;
+	ListCell	*lc;
+
+	initStringInfo(&pub_names);
+	get_publications_str(publications, &pub_names, true);
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
-	get_publications_str(publications, &cmd, true);
-	appendStringInfoChar(&cmd, ')');
-
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	/*
+	 * Use the 'ORDER BY' clause here just to get all the tables with the same
+	 * topmost root table together, and then by the number of layers from high
+	 * to low. This allows us to filter while looping through the results of
+	 * this query.
+	 */
+	appendStringInfo(&cmd,
+						   "SELECT DISTINCT c.oid, p.pubviaroot, proot::oid,"
+						   "   (select count(*) from pg_partition_ancestors(c.oid)) as level"
+						   " FROM pg_catalog.pg_publication_tables t"
+						   "      JOIN pg_catalog.pg_publication p ON p.pubname = t.pubname"
+						   "      JOIN pg_catalog.pg_class c ON c.relname = t.tablename,"
+						   "      LATERAL pg_partition_root(c.oid) proot"
+						   " WHERE t.pubname IN (%s)"
+						   " ORDER BY proot, level DESC",
+						   pub_names.data);
+
+	res = walrcv_exec(wrconn, cmd.data, 4, oidRow);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
@@ -1785,29 +1878,131 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 				 errmsg("could not receive list of replicated tables from the publisher: %s",
 						res->err)));
 
-	/* Process tables. */
+	/* First, We get the list of relid of required table. */
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		char	   *nspname;
-		char	   *relname;
-		bool		isnull;
-		RangeVar   *rv;
+		Oid			relid,
+					rootrelid;
+		bool		pubviaroot,
+					isnull;
 
-		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		relid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		pubviaroot = DatumGetBool(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
+		rootrelid = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
 
-		rv = makeRangeVar(nspname, relname, -1);
-		tablelist = lappend(tablelist, rv);
+		/*
+		 * If rootrelid is NULL, it means this table is not a partition
+		 * table. Just add this table to result_oids.
+		 */
+		if (!rootrelid)
+		{
+			result_oids = lappend_oid(result_oids, relid);
+			continue;
+		}
+
+		/*
+		 * Tables related to the previous topmost root table have been
+		 * traversed, then filtered base on the option viaroot.
+		 */
+		if (OidIsValid(root_of_partition) && root_of_partition != rootrelid)
+		{
+			List	*result_filter = NIL;
+
+			result_filter = filter_partitions_oids(wrconn, &partition_oids,
+												   is_pubviaroot);
+
+			result_oids = list_concat_unique_oid(result_oids,
+												 result_filter);
+
+			is_pubviaroot = false;
+			root_of_partition = InvalidOid;
+			list_free(partition_oids);
+			partition_oids = NIL;
+		}
+
+		root_of_partition = rootrelid;
+
+		if (pubviaroot)
+			is_pubviaroot = true;
+
+		partition_oids = lappend_oid(partition_oids, relid);
 
 		ExecClearTuple(slot);
 	}
 	ExecDropSingleTupleTableSlot(slot);
-
 	walrcv_clear_result(res);
 
+	if (partition_oids != NIL)
+	{
+		List	*result_filter = NIL;
+
+		result_filter = filter_partitions_oids(wrconn, &partition_oids,
+											   is_pubviaroot);
+
+		result_oids = list_concat_unique_oid(result_oids,
+											 result_filter);
+	}
+
+
+	/* Then, we get the info of required tables in result_oids if any. */
+	if (list_length(result_oids) != 0)
+	{
+		initStringInfo(&cmd);
+		appendStringInfoString(&cmd,
+							   "SELECT DISTINCT n.nspname, c.relname"
+							   " FROM pg_class c JOIN pg_namespace n"
+							   "      ON (N.oid = C.relnamespace)"
+							   " WHERE C.oid IN (");
+
+		first = true;
+		foreach(lc, result_oids)
+		{
+			Oid	relid = lfirst_oid(lc);
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, ", ");
+
+			appendStringInfo(&cmd, "%u", relid);
+		}
+		appendStringInfoChar(&cmd, ')');
+
+		res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+		pfree(cmd.data);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("could not receive list of replicated tables from the publisher: %s",
+							res->err)));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			char	   *nspname;
+			char	   *relname;
+			bool		isnull;
+			RangeVar   *rv;
+
+			nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+			Assert(!isnull);
+			relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+			Assert(!isnull);
+
+			rv = makeRangeVar(nspname, relname, -1);
+			tablelist = lappend(tablelist, rv);
+
+			ExecClearTuple(slot);
+		}
+
+		ExecDropSingleTupleTableSlot(slot);
+		walrcv_clear_result(res);
+	}
+
 	return tablelist;
 }
 
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 11ba473715..5b11e06f02 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -307,4 +307,73 @@ is( $node_subscriber->safe_psql(
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 
+# https://www.postgresql.org/message-id/OS0PR01MB5716DC2982CC735FDE388804940B9%40OS0PR01MB5716.jpnprd01.prod.outlook.com
+
+# The bug was that if there are two publications that publish the parent table
+# and the child table separately, and both specify the option
+# PUBLISH_VIA_PARTITION_ROOT, when subscribing to both publications with one
+# subscription, the data is replicated twice. What we expect is to be copied
+# only once.
+$node_publisher = PostgreSQL::Test::Cluster->new('publisher4');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+$node_subscriber = PostgreSQL::Test::Cluster->new('subscriber4');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE viaroot_partition(a int) PARTITION BY RANGE(a)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE viaroot_partitioned(LIKE viaroot_partition)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE viaroot_partition ATTACH PARTITION viaroot_partitioned DEFAULT"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE viaroot_partition(a int) PARTITION BY RANGE(a)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE viaroot_partitioned(LIKE viaroot_partition)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE viaroot_partition ATTACH PARTITION viaroot_partitioned DEFAULT"
+);
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO viaroot_partition (a) VALUES (1), (2), (3)"
+);
+
+# create pub/sub
+$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub_viaroot1 FOR TABLE viaroot_partition WITH (PUBLISH_VIA_PARTITION_ROOT=TRUE)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub_viaroot2 FOR TABLE viaroot_partitioned WITH (PUBLISH_VIA_PARTITION_ROOT=TRUE)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub_viaroot CONNECTION '$publisher_connstr' PUBLICATION pub_viaroot1, pub_viaroot2"
+);
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+
+# Also wait for initial table sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check expected replicated result.
+is( $node_subscriber->safe_psql(
+		'postgres', "SELECT * FROM viaroot_partition"),
+	qq(1
+2
+3),
+	"check initial data copy from table viaroot_partition");
+
+$node_publisher->stop('fast');
+$node_subscriber->stop('fast');
+
 done_testing();
-- 
2.23.0.windows.1

