On 1/10/23 11:36, Jacob Champion wrote:
> 1) I'm playing around with a marker in pg_inherits, where the inhseqno
> is set to a sentinel value (0) for an inheritance relationship that
> has been marked for logical publication. The intent is that the
> pg_inherits helpers will prevent further inheritance relationships
> when they see that marker, and reusing inhseqno means we can make use
> of the existing index to do the lookups. An example:
> 
>     =# CREATE TABLE root (a int);
>     =# CREATE TABLE root_p1 () INHERITS (root);
>     =# SELECT pg_set_logical_root('root_p1', 'root');
> 
> and then any data written to root_p1 gets replicated via root instead,
> if publish_via_partition_root = true. If root_p1 is set up with extra
> columns, they'll be omitted from replication.

First draft attached. (Due to some indentation changes, it's easiest to
read with --ignore-all-space.)

The overall strategy is
- introduce pg_set_logical_root, which sets the sentinel in pg_inherits,
- swap out any checks for partition parents with checks for logical
parents in the publishing code, and
- introduce the ability for a subscriber to perform an initial table
sync from multiple tables on the publisher.

> 2) While this strategy works well for ongoing replication, it's not
> enough to get the initial synchronization correct. The subscriber
> still does a COPY of the root table directly, missing out on all the
> logical descendant data. The publisher will have to tell the
> subscriber about the relationship somehow, and older subscriber
> versions won't understand how to use that (similar to how old
> subscribers can't correctly handle row filters).

I partially solved this by having the subscriber pull the logical
hierarchy from the publisher to figure out which tables to COPY. This
works when publish_via_partition_root=true, but it doesn't correctly
return to the previous behavior when the setting is false. I need to
check the publication setting from the subscriber, too, but that opens
up the question of what to do if two different publications conflict.

And while I go down that rabbit hole, I wanted to see if anyone thinks
this whole thing is unacceptable. :D

Thanks,
--Jacob
From 379bf99ea022203d428a4027da753a00a3989c04 Mon Sep 17 00:00:00 2001
From: Jacob Champion <jchamp...@timescale.com>
Date: Mon, 26 Sep 2022 13:23:51 -0700
Subject: [PATCH] WIP: introduce pg_set_logical_root for use with pubviaroot

Allows regular inherited tables to be published via their root table,
just like partitions. This works by hijacking pg_inherit's inhseqno
column, and replacing a (single) existing entry for the child with the
value zero, indicating that it should be treated as a logical partition
by the publication machinery.

(For this to work correctly at the moment, the publication must set
publish_via_partition_root=true. See bugs below.)

Initial sync works by pulling in all logical descendants on the
subscriber, then COPYing them one-by-one into the root. The publisher
reuses the existing pubviaroot logic, adding the new logical roots to
code that previously looked only for partition roots.

Known bugs/TODOs:
- When pubviaroot is false, initial sync doesn't work correctly (it
  assumes pubviaroot is on and COPYs all descendants into the root).
- The pg_inherits machinery doesn't prohibit changes to inheritance
  after an entry has been marked as a logical root.
- I haven't given any thought to interactions with row filters, or to
  column lists, or to multiple publications with conflicting pubviaroot
  settings.
- pg_set_logical_root() doesn't check for table ownership yet. Anyone
  can muck with pg_inherits through it.
- I'm not sure that I'm taking all the necessary locks yet, and those I
  do take may be taken in the wrong order.
---
 src/backend/catalog/pg_inherits.c           | 154 ++++++++++++++
 src/backend/catalog/pg_publication.c        |  30 +--
 src/backend/commands/publicationcmds.c      |  10 +
 src/backend/replication/logical/tablesync.c | 221 +++++++++++++-------
 src/backend/replication/pgoutput/pgoutput.c |  54 ++---
 src/include/catalog/pg_inherits.h           |   2 +
 src/include/catalog/pg_proc.dat             |   5 +
 src/test/regress/expected/publication.out   |  32 +++
 src/test/regress/sql/publication.sql        |  25 +++
 src/test/subscription/t/013_partition.pl    | 186 ++++++++++++++++
 10 files changed, 608 insertions(+), 111 deletions(-)

diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index da969bd2f9..c290a9936a 100644
--- a/src/backend/catalog/pg_inherits.c
+++ b/src/backend/catalog/pg_inherits.c
@@ -24,10 +24,13 @@
 #include "access/table.h"
 #include "catalog/indexing.h"
 #include "catalog/pg_inherits.h"
+#include "catalog/partition.h"
 #include "parser/parse_type.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/fmgrprotos.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
@@ -655,3 +658,154 @@ PartitionHasPendingDetach(Oid partoid)
 	elog(ERROR, "relation %u is not a partition", partoid);
 	return false;				/* keep compiler quiet */
 }
+
+static Oid
+get_logical_parent_worker(Relation inhRel, Oid relid)
+{
+	SysScanDesc scan;
+	ScanKeyData key[2];
+	Oid			result = InvalidOid;
+	HeapTuple	tuple;
+
+	ScanKeyInit(&key[0],
+				Anum_pg_inherits_inhrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+	ScanKeyInit(&key[1],
+				Anum_pg_inherits_inhseqno,
+				BTEqualStrategyNumber, F_INT4EQ,
+				Int32GetDatum(0));
+
+	scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId, true,
+							  NULL, 2, key);
+	tuple = systable_getnext(scan);
+	if (HeapTupleIsValid(tuple))
+	{
+		Form_pg_inherits form = (Form_pg_inherits) GETSTRUCT(tuple);
+		result = form->inhparent;
+	}
+
+	systable_endscan(scan);
+
+	return result;
+}
+
+static void
+get_logical_ancestors_worker(Relation inhRel, Oid relid, List **ancestors)
+{
+	Oid			parentOid;
+
+	/*
+	 * Recursion ends at the topmost level, ie., when there's no parent.
+	 */
+	parentOid = get_logical_parent_worker(inhRel, relid);
+	if (parentOid == InvalidOid)
+		return;
+
+	*ancestors = lappend_oid(*ancestors, parentOid);
+	get_logical_ancestors_worker(inhRel, parentOid, ancestors);
+}
+
+List *
+get_logical_ancestors(Oid relid, bool is_partition)
+{
+	List	   *result = NIL;
+	Relation	inhRel;
+
+	/* For partitions, this is identical to get_partition_ancestors(). */
+	if (is_partition)
+		return get_partition_ancestors(relid);
+
+	inhRel = table_open(InheritsRelationId, AccessShareLock);
+	get_logical_ancestors_worker(inhRel, relid, &result);
+	table_close(inhRel, AccessShareLock);
+
+	return result;
+}
+
+bool
+has_logical_parent(Relation inhRel, Oid relid)
+{
+	return (get_logical_parent_worker(inhRel, relid) != InvalidOid);
+}
+
+Datum
+pg_set_logical_root(PG_FUNCTION_ARGS)
+{
+	Oid			tableoid = PG_GETARG_OID(0);
+	Oid			rootoid = PG_GETARG_OID(1);
+	char	   *tablename;
+	char	   *rootname;
+	Relation	inhRel;
+	ScanKeyData key;
+	SysScanDesc scan;
+	Oid			parent = InvalidOid;
+	HeapTuple	tuple, copyTuple;
+	Form_pg_inherits form;
+
+	/*
+	 * Check that the tables exist.
+	 * TODO: check inheritance
+	 * TODO: and identical schemas too? or does replication handle that?
+	 * TODO: check ownership
+	 */
+	tablename = get_rel_name(tableoid);
+	if (tablename == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_TABLE),
+				 errmsg("OID %u does not refer to a table", tableoid)));
+	rootname = get_rel_name(rootoid);
+	if (rootname == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_TABLE),
+				 errmsg("OID %u does not refer to a table", rootoid)));
+
+	/* Open pg_inherits with RowExclusiveLock so that we can update it. */
+	inhRel = table_open(InheritsRelationId, RowExclusiveLock);
+
+	/*
+	 * We have to make sure that the inheritance relationship already exists,
+	 * and that there is only one existing parent for this table.
+	 *
+	 * TODO: do we have to lock the tables themselves to avoid races?
+	 */
+	ScanKeyInit(&key,
+				Anum_pg_inherits_inhrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(tableoid));
+
+	scan = systable_beginscan(inhRel, InheritsRelidSeqnoIndexId, true,
+							  NULL, 1, &key);
+	tuple = systable_getnext(scan);
+	if (HeapTupleIsValid(tuple))
+	{
+		form = (Form_pg_inherits) GETSTRUCT(tuple);
+		parent = form->inhparent;
+		copyTuple = heap_copytuple(tuple);
+
+		if (HeapTupleIsValid(systable_getnext(scan)))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("table \"%s\" inherits from multiple tables",
+							tablename)));
+	}
+
+	if (parent != rootoid)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("table \"%s\" does not inherit from intended root table \"%s\"",
+						tablename, rootname)));
+
+	systable_endscan(scan);
+
+	/* Mark the inheritance as a logical root by setting it to zero. */
+	form = (Form_pg_inherits) GETSTRUCT(copyTuple);
+	form->inhseqno = 0;
+
+	CatalogTupleUpdate(inhRel, &copyTuple->t_self, copyTuple);
+
+	heap_freetuple(copyTuple);
+	table_close(inhRel, RowExclusiveLock);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index a98fcad421..e1a02e18d9 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -172,11 +172,11 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
 }
 
 /*
- * Filter out the partitions whose parent tables were also specified in
+ * Filter out the tables whose logical parent tables were also specified in
  * the publication.
  */
 static List *
-filter_partitions(List *relids)
+filter_logical_descendants(List *relids)
 {
 	List	   *result = NIL;
 	ListCell   *lc;
@@ -188,8 +188,7 @@ filter_partitions(List *relids)
 		List	   *ancestors = NIL;
 		Oid			relid = lfirst_oid(lc);
 
-		if (get_rel_relispartition(relid))
-			ancestors = get_partition_ancestors(relid);
+		ancestors = get_logical_ancestors(relid, get_rel_relispartition(relid));
 
 		foreach(lc2, ancestors)
 		{
@@ -782,11 +781,15 @@ List *
 GetAllTablesPublicationRelations(bool pubviaroot)
 {
 	Relation	classRel;
+	Relation	inhRel;
 	ScanKeyData key[1];
 	TableScanDesc scan;
 	HeapTuple	tuple;
 	List	   *result = NIL;
 
+	/* TODO: is there a required order to acquire these locks? */
+	if (pubviaroot)
+		inhRel = table_open(InheritsRelationId, AccessShareLock);
 	classRel = table_open(RelationRelationId, AccessShareLock);
 
 	ScanKeyInit(&key[0],
@@ -802,7 +805,8 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 		Oid			relid = relForm->oid;
 
 		if (is_publishable_class(relid, relForm) &&
-			!(relForm->relispartition && pubviaroot))
+			!(relForm->relispartition && pubviaroot) &&
+			!(pubviaroot && has_logical_parent(inhRel, relid)))
 			result = lappend_oid(result, relid);
 	}
 
@@ -831,6 +835,9 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 	}
 
 	table_close(classRel, AccessShareLock);
+	if (pubviaroot)
+		table_close(inhRel, AccessShareLock);
+
 	return result;
 }
 
@@ -1076,15 +1083,14 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 			tables = list_concat_unique_oid(relids, schemarelids);
 
 			/*
-			 * If the publication publishes partition changes via their
-			 * respective root partitioned tables, we must exclude partitions
-			 * in favor of including the root partitioned tables. Otherwise,
-			 * the function could return both the child and parent tables
-			 * which could cause data of the child table to be
-			 * double-published on the subscriber side.
+			 * If the publication publishes table changes via their respective
+			 * logical root tables, we must exclude logical descendants in favor
+			 * of including the root tables. Otherwise, the function could
+			 * return both the child and parent tables which could cause data of
+			 * the child table to be double-published on the subscriber side.
 			 */
 			if (publication->pubviaroot)
-				tables = filter_partitions(tables);
+				tables = filter_logical_descendants(tables);
 		}
 
 		/* Construct a tuple descriptor for the result rows. */
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f4ba572697..7e23bed6c0 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -238,6 +238,8 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
 		 * parent table, but the bitmap contains the replica identity
 		 * information of the child table. So, get the column number of the
 		 * child table as parent and child column order could be different.
+		 *
+		 * TODO: is this applicable to pg_set_logical_root()?
 		 */
 		if (context->pubviaroot)
 		{
@@ -286,6 +288,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
 	 *
 	 * Note that even though the row filter used is for an ancestor, the
 	 * REPLICA IDENTITY used will be for the actual child table.
+	 *
+	 * TODO: is this applicable to pg_set_logical_root()?
 	 */
 	if (pubviaroot && relation->rd_rel->relispartition)
 	{
@@ -336,6 +340,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
  * the column list.
  *
  * Returns true if any replica identity column is not covered by column list.
+ *
+ * TODO: pg_set_logical_root()?
  */
 bool
 pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
@@ -628,6 +634,8 @@ TransformPubWhereClauses(List *tables, const char *queryString,
 		 * If the publication doesn't publish changes via the root partitioned
 		 * table, the partition's row filter will be used. So disallow using
 		 * WHERE clause on partitioned table in this case.
+		 *
+		 * TODO: decide how this interacts with pg_set_logical_root
 		 */
 		if (!pubviaroot &&
 			pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
@@ -715,6 +723,8 @@ CheckPubRelationColumnList(char *pubname, List *tables,
 		 * If the publication doesn't publish changes via the root partitioned
 		 * table, the partition's column list will be used. So disallow using
 		 * a column list on the partitioned table in this case.
+		 *
+		 * TODO: decide if this interacts with pg_set_logical_root()
 		 */
 		if (!pubviaroot &&
 			pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 38dfce7129..b83b9e91ef 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -748,11 +748,12 @@ copy_read_data(void *outbuf, int minread, int maxread)
 /*
  * Get information about remote relation in similar fashion the RELATION
  * message provides during replication. This function also returns the relation
- * qualifications to be used in the COPY command.
+ * qualifications to be used in the COPY command, and the list of tables to COPY
+ * (which for most tables will contain only one entry).
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
-						LogicalRepRelation *lrel, List **qual)
+						LogicalRepRelation *lrel, List **qual, List **to_copy)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
@@ -1063,6 +1064,73 @@ fetch_remote_table_info(char *nspname, char *relname,
 		walrcv_clear_result(res);
 	}
 
+	/*
+	 * See if there are any other tables to be copied besides the original. This
+	 * happens when a descendant in the inheritance relationship is marked with
+	 * pg_set_logical_root().
+	 */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000)
+	{
+		Oid			descRow[] = {TEXTOID, TEXTOID};
+
+		/*
+		 * Find all logical descendants rooted at this table (including the
+		 * table itself).
+		 */
+		resetStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "WITH RECURSIVE descendants(relid) AS ("
+						 "  VALUES (%u::oid) UNION ALL"
+						 "  SELECT inhrelid"
+						 "    FROM pg_catalog.pg_inherits i, descendants d"
+						 "   WHERE i.inhseqno = 0"
+						 "     AND d.relid = i.inhparent"
+						 ")"
+						 "SELECT n.nspname, c.relname"
+						 "  FROM descendants,"
+						 "       pg_catalog.pg_class c"
+						 "  JOIN pg_catalog.pg_namespace n"
+						 "    ON (c.relnamespace = n.oid)"
+						 " WHERE c.oid = descendants.relid",
+						 lrel->remoteid);
+
+		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+						  lengthof(descRow), descRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errmsg("could not fetch logical descendants for table \"%s.%s\" from publisher: %s",
+							nspname, relname, res->err)));
+
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			char	   *desc_nspname;
+			char	   *desc_relname;
+			char	   *quoted;
+
+			desc_nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+			Assert(!isnull);
+			desc_relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+			Assert(!isnull);
+
+			quoted = quote_qualified_identifier(desc_nspname, desc_relname);
+			*to_copy = lappend(*to_copy, quoted);
+
+			ExecClearTuple(slot);
+		}
+
+		ExecDropSingleTupleTableSlot(slot);
+		walrcv_clear_result(res);
+	}
+	else
+	{
+		/* For older servers, we only COPY the table itself. */
+		char   *quoted = quote_qualified_identifier(lrel->nspname,
+													lrel->relname);
+		*to_copy = lappend(*to_copy, quoted);
+	}
+
 	pfree(cmd.data);
 }
 
@@ -1077,6 +1145,8 @@ copy_table(Relation rel)
 	LogicalRepRelMapEntry *relmapentry;
 	LogicalRepRelation lrel;
 	List	   *qual = NIL;
+	List	   *to_copy = NIL;
+	ListCell   *cur;
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	CopyFromState cstate;
@@ -1085,7 +1155,8 @@ copy_table(Relation rel)
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel, &qual);
+							RelationGetRelationName(rel), &lrel, &qual,
+							&to_copy);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -1094,92 +1165,96 @@ copy_table(Relation rel)
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
 
-	/* Start copy on the publisher. */
-	initStringInfo(&cmd);
-
-	/* Regular table with no row filter */
-	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+	foreach(cur, to_copy)
 	{
-		appendStringInfo(&cmd, "COPY %s (",
-						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		char	   *quoted_name = lfirst(cur);
 
-		/*
-		 * XXX Do we need to list the columns in all cases? Maybe we're
-		 * replicating all columns?
-		 */
-		for (int i = 0; i < lrel.natts; i++)
-		{
-			if (i > 0)
-				appendStringInfoString(&cmd, ", ");
-
-			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
-		}
+		/* Start copy on the publisher. */
+		initStringInfo(&cmd);
 
-		appendStringInfoString(&cmd, ") TO STDOUT");
-	}
-	else
-	{
-		/*
-		 * For non-tables and tables with row filters, we need to do COPY
-		 * (SELECT ...), but we can't just do SELECT * because we need to not
-		 * copy generated columns. For tables with any row filters, build a
-		 * SELECT query with OR'ed row filters for COPY.
-		 */
-		appendStringInfoString(&cmd, "COPY (SELECT ");
-		for (int i = 0; i < lrel.natts; i++)
+		/* Regular table with no row filter */
+		if (lrel.relkind == RELKIND_RELATION && qual == NIL)
 		{
-			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
-			if (i < lrel.natts - 1)
-				appendStringInfoString(&cmd, ", ");
-		}
+			appendStringInfo(&cmd, "COPY %s (", quoted_name);
 
-		appendStringInfoString(&cmd, " FROM ");
+			/*
+			 * XXX Do we need to list the columns in all cases? Maybe we're
+			 * replicating all columns?
+			 */
+			for (int i = 0; i < lrel.natts; i++)
+			{
+				if (i > 0)
+					appendStringInfoString(&cmd, ", ");
 
-		/*
-		 * For regular tables, make sure we don't copy data from a child that
-		 * inherits the named table as those will be copied separately.
-		 */
-		if (lrel.relkind == RELKIND_RELATION)
-			appendStringInfoString(&cmd, "ONLY ");
+				appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			}
 
-		appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
-		/* list of OR'ed filters */
-		if (qual != NIL)
+			appendStringInfoString(&cmd, ") TO STDOUT");
+		}
+		else
 		{
-			ListCell   *lc;
-			char	   *q = strVal(linitial(qual));
+			/*
+			 * For non-tables and tables with row filters, we need to do COPY
+			 * (SELECT ...), but we can't just do SELECT * because we need to not
+			 * copy generated columns. For tables with any row filters, build a
+			 * SELECT query with OR'ed row filters for COPY.
+			 */
+			appendStringInfoString(&cmd, "COPY (SELECT ");
+			for (int i = 0; i < lrel.natts; i++)
+			{
+				appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+				if (i < lrel.natts - 1)
+					appendStringInfoString(&cmd, ", ");
+			}
 
-			appendStringInfo(&cmd, " WHERE %s", q);
-			for_each_from(lc, qual, 1)
+			appendStringInfoString(&cmd, " FROM ");
+
+			/*
+			 * For regular tables, make sure we don't copy data from a child that
+			 * inherits the named table as those will be copied separately.
+			 */
+			if (lrel.relkind == RELKIND_RELATION)
+				appendStringInfoString(&cmd, "ONLY ");
+
+			appendStringInfoString(&cmd, quoted_name);
+			/* list of OR'ed filters */
+			if (qual != NIL)
 			{
-				q = strVal(lfirst(lc));
-				appendStringInfo(&cmd, " OR %s", q);
+				ListCell   *lc;
+				char	   *q = strVal(linitial(qual));
+
+				appendStringInfo(&cmd, " WHERE %s", q);
+				for_each_from(lc, qual, 1)
+				{
+					q = strVal(lfirst(lc));
+					appendStringInfo(&cmd, " OR %s", q);
+				}
+				list_free_deep(qual);
 			}
-			list_free_deep(qual);
-		}
 
-		appendStringInfoString(&cmd, ") TO STDOUT");
-	}
-	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
-	pfree(cmd.data);
-	if (res->status != WALRCV_OK_COPY_OUT)
-		ereport(ERROR,
-				(errcode(ERRCODE_CONNECTION_FAILURE),
-				 errmsg("could not start initial contents copy for table \"%s.%s\": %s",
-						lrel.nspname, lrel.relname, res->err)));
-	walrcv_clear_result(res);
+			appendStringInfoString(&cmd, ") TO STDOUT");
+		}
+		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
+		pfree(cmd.data);
+		if (res->status != WALRCV_OK_COPY_OUT)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("could not start initial contents copy for table \"%s.%s\" from remote %s: %s",
+							lrel.nspname, lrel.relname, quoted_name, res->err)));
+		walrcv_clear_result(res);
 
-	copybuf = makeStringInfo();
+		copybuf = makeStringInfo();
 
-	pstate = make_parsestate(NULL);
-	(void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
-										 NULL, false, false);
+		pstate = make_parsestate(NULL);
+		(void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
+											 NULL, false, false);
 
-	attnamelist = make_copy_attnamelist(relmapentry);
-	cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
+		attnamelist = make_copy_attnamelist(relmapentry);
+		cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL);
 
-	/* Do the copy */
-	(void) CopyFrom(cstate);
+		/* Do the copy */
+		(void) CopyFrom(cstate);
+	}
 
 	logicalrep_rel_close(relmapentry, NoLock);
 }
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1a80d67bb9..897c5e0d92 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -14,6 +14,7 @@
 
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 #include "catalog/pg_subscription.h"
@@ -1450,7 +1451,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			/* Switch relation if publishing via root. */
 			if (relentry->publish_as_relid != RelationGetRelid(relation))
 			{
-				Assert(relation->rd_rel->relispartition);
 				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
 				targetrel = ancestor;
 				/* Convert tuple if needed. */
@@ -2141,55 +2141,57 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			int			ancestor_level = 0;
 
 			/*
-			 * If this is a FOR ALL TABLES publication, pick the partition
+			 * If this is a FOR ALL TABLES publication, pick the logical
 			 * root and set the ancestor level accordingly.
 			 */
 			if (pub->alltables)
 			{
 				publish = true;
-				if (pub->pubviaroot && am_partition)
+				if (pub->pubviaroot)
 				{
-					List	   *ancestors = get_partition_ancestors(relid);
+					List	   *ancestors;
 
-					pub_relid = llast_oid(ancestors);
-					ancestor_level = list_length(ancestors);
+					ancestors = get_logical_ancestors(relid, am_partition);
+					if (ancestors != NIL)
+					{
+						pub_relid = llast_oid(ancestors);
+						ancestor_level = list_length(ancestors);
+					}
 				}
 			}
 
 			if (!publish)
 			{
 				bool		ancestor_published = false;
+				Oid			ancestor;
+				int			level;
+				List	   *ancestors;
 
 				/*
-				 * For a partition, check if any of the ancestors are
-				 * published.  If so, note down the topmost ancestor that is
+				 * Check if any of the logical ancestors (that is, partition
+				 * parents or tables marked with pg_set_logical_root()) are
+				 * published. If so, note down the topmost ancestor that is
 				 * published via this publication, which will be used as the
-				 * relation via which to publish the partition's changes.
+				 * relation via which to publish this table's changes.
 				 */
-				if (am_partition)
-				{
-					Oid			ancestor;
-					int			level;
-					List	   *ancestors = get_partition_ancestors(relid);
-
-					ancestor = GetTopMostAncestorInPublication(pub->oid,
-															   ancestors,
-															   &level);
+				ancestors = get_logical_ancestors(relid, am_partition);
+				ancestor = GetTopMostAncestorInPublication(pub->oid,
+														   ancestors,
+														   &level);
 
-					if (ancestor != InvalidOid)
+				if (ancestor != InvalidOid)
+				{
+					ancestor_published = true;
+					if (pub->pubviaroot)
 					{
-						ancestor_published = true;
-						if (pub->pubviaroot)
-						{
-							pub_relid = ancestor;
-							ancestor_level = level;
-						}
+						pub_relid = ancestor;
+						ancestor_level = level;
 					}
 				}
 
 				if (list_member_oid(pubids, pub->oid) ||
 					list_member_oid(schemaPubids, pub->oid) ||
-					ancestor_published)
+					(am_partition && ancestor_published))
 					publish = true;
 			}
 
diff --git a/src/include/catalog/pg_inherits.h b/src/include/catalog/pg_inherits.h
index ce154ab943..59d72a97b8 100644
--- a/src/include/catalog/pg_inherits.h
+++ b/src/include/catalog/pg_inherits.h
@@ -63,5 +63,7 @@ extern bool DeleteInheritsTuple(Oid inhrelid, Oid inhparent,
 								bool expect_detach_pending,
 								const char *childname);
 extern bool PartitionHasPendingDetach(Oid partoid);
+extern List *get_logical_ancestors(Oid relid, bool is_partition);
+extern bool has_logical_parent(Relation inhRel, Oid relid);
 
 #endif							/* PG_INHERITS_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 86eb8e8c58..6f5f85c5cc 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11891,4 +11891,9 @@
   prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary',
   prosrc => 'brin_minmax_multi_summary_send' },
 
+{ oid => '8136', descr => 'mark a table root for logical replication',
+  proname => 'pg_set_logical_root', provolatile => 'v', proparallel => 'u',
+  prorettype => 'void', proargtypes => 'regclass regclass',
+  prosrc => 'pg_set_logical_root' },
+
 ]
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 427f87ea07..0e06103f4c 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -1718,9 +1718,41 @@ SELECT * FROM pg_publication_tables;
  pub     | sch1       | tbl1      | {a}      | 
 (1 row)
 
+-- Sanity check cases for pg_set_logical_root().
+CREATE TABLE sch1.iroot (a int);
+CREATE TABLE sch1.ipart1 (a int);
+CREATE TABLE sch1.ipart2 () INHERITS (sch1.iroot);
+-- marking roots between unrelated tables is not allowed
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+ERROR:  table "ipart1" does not inherit from intended root table "iroot"
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.tbl1');
+ERROR:  table "ipart2" does not inherit from intended root table "tbl1"
+-- establishing an inheritance relationship fixes the problem
+ALTER TABLE sch1.ipart1 INHERIT sch1.iroot;
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+ pg_set_logical_root 
+---------------------
+ 
+(1 row)
+
+-- but multiple inheritance is not allowed
+ALTER TABLE sch1.ipart2 INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+ERROR:  table "ipart2" inherits from multiple tables
+ALTER TABLE sch1.ipart2 NO INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+ pg_set_logical_root 
+---------------------
+ 
+(1 row)
+
+-- TODO: make sure existing logical descendant can't be ALTERed [NO] INHERIT
 RESET client_min_messages;
 DROP PUBLICATION pub;
 DROP TABLE sch1.tbl1;
+DROP TABLE sch1.ipart1;
+DROP TABLE sch1.ipart2;
+DROP TABLE sch1.iroot;
 DROP SCHEMA sch1 cascade;
 DROP SCHEMA sch2 cascade;
 RESET SESSION AUTHORIZATION;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index a47c5939d5..52a0d6ba48 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -1087,9 +1087,34 @@ ALTER TABLE sch1.tbl1 ATTACH PARTITION sch1.tbl1_part3 FOR VALUES FROM (20) to (
 CREATE PUBLICATION pub FOR TABLES IN SCHEMA sch1 WITH (PUBLISH_VIA_PARTITION_ROOT=1);
 SELECT * FROM pg_publication_tables;
 
+-- Sanity check cases for pg_set_logical_root().
+CREATE TABLE sch1.iroot (a int);
+CREATE TABLE sch1.ipart1 (a int);
+CREATE TABLE sch1.ipart2 () INHERITS (sch1.iroot);
+
+-- marking roots between unrelated tables is not allowed
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.tbl1');
+
+-- establishing an inheritance relationship fixes the problem
+ALTER TABLE sch1.ipart1 INHERIT sch1.iroot;
+SELECT pg_set_logical_root('sch1.ipart1', 'sch1.iroot');
+
+-- but multiple inheritance is not allowed
+ALTER TABLE sch1.ipart2 INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+
+ALTER TABLE sch1.ipart2 NO INHERIT sch1.ipart1;
+SELECT pg_set_logical_root('sch1.ipart2', 'sch1.iroot');
+
+-- TODO: make sure existing logical descendant can't be ALTERed [NO] INHERIT
+
 RESET client_min_messages;
 DROP PUBLICATION pub;
 DROP TABLE sch1.tbl1;
+DROP TABLE sch1.ipart1;
+DROP TABLE sch1.ipart2;
+DROP TABLE sch1.iroot;
 DROP SCHEMA sch1 cascade;
 DROP SCHEMA sch2 cascade;
 
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 11a5c3c03e..b6e3143536 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -877,4 +877,190 @@ $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT a, b, c FROM tab5_1 ORDER BY 1");
 is($result, qq(4||1), 'updates of tab5 replicated correctly');
 
+# Test that replication works for older inheritance/trigger setups as well.
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE itab1 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE itab1_2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_publisher->safe_psql('postgres', "
+	CREATE OR REPLACE FUNCTION itab1_trigger()
+	RETURNS TRIGGER AS \$\$
+	BEGIN
+		IF ( NEW.a = 1 ) THEN INSERT INTO itab1_1 VALUES (NEW.*);
+		ELSIF ( NEW.a = 2 ) THEN INSERT INTO itab1_2 VALUES (NEW.*);
+		ELSE RETURN NEW;
+		END IF;
+		RETURN NULL;
+	END;
+	\$\$
+	LANGUAGE plpgsql;");
+$node_publisher->safe_psql('postgres', "
+	CREATE TRIGGER itab1_trigger
+	BEFORE INSERT ON itab1
+	FOR EACH ROW EXECUTE FUNCTION itab1_trigger();");
+
+$node_publisher->safe_psql('postgres',
+	"SELECT pg_set_logical_root('itab1_1', 'itab1')");
+$node_publisher->safe_psql('postgres',
+	"SELECT pg_set_logical_root('itab1_2', 'itab1')");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE itab2 (a int, b text)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE itab2_1 (CHECK (a = 1)) INHERITS (itab2)");
+
+$node_publisher->safe_psql('postgres', "
+	CREATE OR REPLACE FUNCTION itab2_trigger()
+	RETURNS TRIGGER AS \$\$
+	BEGIN
+		IF ( NEW.a = 1 ) THEN INSERT INTO itab2_1 VALUES (NEW.*);
+		ELSE RETURN NEW;
+		END IF;
+		RETURN NULL;
+	END;
+	\$\$
+	LANGUAGE plpgsql;");
+$node_publisher->safe_psql('postgres', "
+	CREATE TRIGGER itab2_trigger
+	BEFORE INSERT ON itab2
+	FOR EACH ROW EXECUTE FUNCTION itab2_trigger();");
+
+$node_publisher->safe_psql('postgres',
+	"SELECT pg_set_logical_root('itab2_1', 'itab2')");
+
+# itab2_1 should be published using its own identity here, since its parent is
+# not included. itab1_1 should be published via its parent, itab1, without
+# duplicating the rows.
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub_viaroot ADD TABLE itab1, itab1_1, itab2_1");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (0, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (1, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (2, 'itab1')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (0, 'itab2')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (1, 'itab2')");
+
+# Subscriber 1 only subscribes to some of the partitions, and does not set up
+# partition triggers, to check for the correct routing.
+$node_subscriber1->safe_psql('postgres',
+	"CREATE TABLE itab1 (a int, b text)");
+$node_subscriber1->safe_psql('postgres',
+	"CREATE TABLE itab1_1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_subscriber1->safe_psql('postgres',
+	"CREATE TABLE itab2_1 (a int, b text)");
+
+# Subscriber 2 has different partition names for itab1, and it doesn't partition
+# itab2 at all.
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE itab1 (a int, b text)");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE itab1_part1 (CHECK (a = 1)) INHERITS (itab1)");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE itab1_part2 (CHECK (a = 2)) INHERITS (itab1)");
+
+$node_subscriber2->safe_psql('postgres', "
+	CREATE OR REPLACE FUNCTION itab_trigger()
+	RETURNS TRIGGER AS \$\$
+	BEGIN
+		IF ( NEW.a = 1 ) THEN INSERT INTO public.itab1_part1 VALUES (NEW.*);
+		ELSIF ( NEW.a = 2 ) THEN INSERT INTO public.itab1_part2 VALUES (NEW.*);
+		ELSE RETURN NEW;
+		END IF;
+		RETURN NULL;
+	END;
+	\$\$
+	LANGUAGE plpgsql;");
+$node_subscriber2->safe_psql('postgres', "
+	CREATE TRIGGER itab_trigger
+	BEFORE INSERT ON itab1
+	FOR EACH ROW EXECUTE FUNCTION itab_trigger();");
+$node_subscriber2->safe_psql('postgres', "
+	ALTER TABLE itab1 ENABLE ALWAYS TRIGGER itab_trigger;");
+
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE itab2 (a int, b text)");
+
+$node_subscriber1->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub_viaroot REFRESH PUBLICATION");
+$node_subscriber2->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+$node_subscriber1->wait_for_subscription_sync;
+$node_subscriber2->wait_for_subscription_sync;
+
+# check that data is synced correctly
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data synced for itab1 on subscriber 1');
+
+# all of the data should have been routed to itab1 directly (there are no
+# triggers on subscriber 1 to move it elsewhere)
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data correctly routed for itab1 on subscriber 1');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT a, b FROM itab2_1 ORDER BY 1, 2");
+is($result, qq(1|itab2), 'initial data synced for itab2_1 on subscriber 1');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+2|itab1), 'initial data synced for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1), 'initial data correctly routed for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM itab2 ORDER BY 1, 2");
+is($result, qq(0|itab2
+1|itab2), 'initial data synced for itab2 on subscriber 2');
+
+# make sure new data is also correctly routed to the roots
+$node_publisher->safe_psql('postgres', "INSERT INTO itab1 VALUES (1, 'itab1-new')");
+$node_publisher->safe_psql('postgres', "INSERT INTO itab2 VALUES (1, 'itab2-new')");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT a, b FROM ONLY itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+1|itab1-new
+2|itab1), 'new data routed for itab1 on subscriber 1');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT a, b FROM itab2_1 ORDER BY 1, 2");
+is($result, qq(1|itab2
+1|itab2-new), 'new data routed for itab2_1 on subscriber 1');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM itab1 ORDER BY 1, 2");
+is($result, qq(0|itab1
+1|itab1
+1|itab1-new
+2|itab1), 'new data routed for itab1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM itab1_part1 ORDER BY 1, 2");
+is($result, qq(1|itab1
+1|itab1-new), 'new data moved to itab1_part1 on subscriber 2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT a, b FROM itab2 ORDER BY 1, 2");
+is($result, qq(0|itab2
+1|itab2
+1|itab2-new), 'new data routed for itab2 on subscriber 2');
+
 done_testing();
-- 
2.25.1

Reply via email to