From 9f8cc8104a1440c9abdd2450acf63eef452c8004 Mon Sep 17 00:00:00 2001
From: amit <amitlangote09@gmail.com>
Date: Fri, 29 Nov 2019 17:40:11 +0900
Subject: [PATCH v13 3/3] Publish partitioned table inserts as its own

To control whether partition changes are replicated using their
own identity (and schema) or an ancestor's, add a new parameter
that cab be set per publication named 'publish_using_root_schema'.
---
 doc/src/sgml/logical-replication.sgml       |  11 +-
 doc/src/sgml/ref/create_publication.sgml    |  17 ++
 src/backend/catalog/partition.c             |   9 +
 src/backend/catalog/pg_publication.c        |  63 +++++-
 src/backend/commands/publicationcmds.c      |  95 +++++----
 src/backend/commands/tablecmds.c            |   2 +-
 src/backend/executor/nodeModifyTable.c      |   4 +
 src/backend/replication/pgoutput/pgoutput.c | 211 ++++++++++++++++----
 src/backend/utils/cache/relcache.c          |   7 +-
 src/bin/pg_dump/pg_dump.c                   |  22 +-
 src/bin/pg_dump/pg_dump.h                   |   1 +
 src/bin/psql/describe.c                     |  17 +-
 src/include/catalog/partition.h             |   1 +
 src/include/catalog/pg_publication.h        |   7 +-
 src/test/regress/expected/publication.out   | 103 +++++-----
 src/test/regress/sql/publication.sql        |   3 +
 src/test/subscription/t/013_partition.pl    | 170 +++++++++++++++-
 17 files changed, 590 insertions(+), 153 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 8bd7c9c8ac..a99e90b331 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -402,15 +402,8 @@
 
    <listitem>
     <para>
-     Replication is only supported by tables, partitioned or not, although a
-     given table must either be partitioned on both servers or not partitioned
-     at all.  Also, when replicating between partitioned tables, the actual
-     replication occurs between leaf partitions, so partitions on the two
-     servers must match one-to-one.
-    </para>
-
-    <para>
-     Attempts to replicate other types of relations such as views, materialized
+     Replication is only supported by tables, partitioned or not.
+     Attempts to replicate other types of relations such as view, materialized
      views, or foreign tables, will result in an error.
     </para>
    </listitem>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index 597cb28f33..0ca6cffaba 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -123,6 +123,23 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>publish_using_root_schema</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          This parameter determines whether DML operations on a partitioned
+          table (or on its partitions) contained in the publication will be
+          published using its own schema rather than of the individual
+          partitions which are actually changed; the latter is the default.
+          Setting it to <literal>true</literal> allows the changes to be
+          replicated into a non-partitioned table or a partitioned table
+          consisting of a different set of partitions.  However,
+          <literal>TRUNCATE</literal> operations performed directly on
+          partitions are not replicated.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
 
      </para>
diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index 239ac017fa..07853b85d5 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -28,6 +28,7 @@
 #include "partitioning/partbounds.h"
 #include "rewrite/rewriteManip.h"
 #include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
 #include "utils/partcache.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
@@ -126,6 +127,14 @@ get_partition_ancestors(Oid relid)
 	return result;
 }
 
+/* Is given relation a leaf partition? */
+bool
+is_leaf_partition(Oid relid)
+{
+	return	get_rel_relkind(relid) != RELKIND_PARTITIONED_TABLE &&
+			get_rel_relispartition(relid);
+}
+
 /*
  * get_partition_ancestors_worker
  *		recursive worker for get_partition_ancestors
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 500a5ae1ee..0c534a29c0 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -220,13 +220,30 @@ publication_add_relation(Oid pubid, Relation targetrel,
 /*
  * Gets list of publication oids for a relation, plus those of ancestors,
  * if any, if the relation is a partition.
+ *
+ * *published_rels, if asked for, will contain the OID of the relation for
+ * each publication returned, that is, of the relation that is actually
+ * published.  Examining this list allows the caller, for instance, to
+ * distinguish publications that it is directly part from those that it is
+ * indirectly part of via an ancestor.
  */
 List *
-GetRelationPublications(Oid relid)
+GetRelationPublications(Oid relid, List **published_rels)
 {
 	List	   *result = NIL;
+	int			i,
+				num;
+
+	if (published_rels)
+		*published_rels = NIL;
 
 	result = get_rel_publications(relid);
+	if (published_rels)
+	{
+		num = list_length(result);
+		for (i = 0; i < num; i++)
+			*published_rels = lappend_oid(*published_rels, relid);
+	}
 	if (get_rel_relispartition(relid))
 	{
 		List	   *ancestors = get_partition_ancestors(relid);
@@ -238,6 +255,12 @@ GetRelationPublications(Oid relid)
 			List	   *ancestor_pubs = get_rel_publications(ancestor);
 
 			result = list_concat(result, ancestor_pubs);
+			if (published_rels)
+			{
+				num = list_length(ancestor_pubs);
+				for (i = 0; i < num; i++)
+					*published_rels = lappend_oid(*published_rels, ancestor);
+			}
 		}
 	}
 
@@ -373,9 +396,13 @@ GetAllTablesPublications(void)
 
 /*
  * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
  */
 List *
-GetAllTablesPublicationRelations(void)
+GetAllTablesPublicationRelations(bool pubasroot)
 {
 	Relation	classRel;
 	ScanKeyData key[1];
@@ -397,12 +424,35 @@ GetAllTablesPublicationRelations(void)
 		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
 		Oid			relid = relForm->oid;
 
-		if (is_publishable_class(relid, relForm))
+		if (is_publishable_class(relid, relForm) &&
+			!(relForm->relispartition && pubasroot))
 			result = lappend_oid(result, relid);
 	}
 
 	table_endscan(scan);
-	table_close(classRel, AccessShareLock);
+
+	if (pubasroot)
+	{
+		ScanKeyInit(&key[0],
+					Anum_pg_class_relkind,
+					BTEqualStrategyNumber, F_CHAREQ,
+					CharGetDatum(RELKIND_PARTITIONED_TABLE));
+
+		scan = table_beginscan_catalog(classRel, 1, key);
+
+		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+		{
+			Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+			Oid			relid = relForm->oid;
+
+			if (is_publishable_class(relid, relForm) &&
+				!relForm->relispartition)
+				result = lappend_oid(result, relid);
+		}
+
+		table_endscan(scan);
+		table_close(classRel, AccessShareLock);
+	}
 
 	return result;
 }
@@ -433,6 +483,7 @@ GetPublication(Oid pubid)
 	pub->pubactions.pubupdate = pubform->pubupdate;
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
+	pub->pubasroot = pubform->pubasroot;
 
 	ReleaseSysCache(tup);
 
@@ -533,9 +584,11 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * need those.
 		 */
 		if (publication->alltables)
-			tables = GetAllTablesPublicationRelations();
+			tables = GetAllTablesPublicationRelations(publication->pubasroot);
 		else
 			tables = GetPublicationRelations(publication->oid,
+											 publication->pubasroot ?
+											 PUBLICATION_PART_ROOT :
 											 PUBLICATION_PART_LEAF);
 		funcctx->user_fctx = (void *) tables;
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 494c0bdc28..9e102a4b78 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -23,6 +23,7 @@
 #include "catalog/namespace.h"
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
+#include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
@@ -56,20 +57,23 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
 static void
 parse_publication_options(List *options,
 						  bool *publish_given,
-						  bool *publish_insert,
-						  bool *publish_update,
-						  bool *publish_delete,
-						  bool *publish_truncate)
+						  PublicationActions *pubactions,
+						  bool *publish_using_root_schema_given,
+						  bool *publish_using_root_schema)
 {
 	ListCell   *lc;
 
+	*publish_using_root_schema_given = false;
 	*publish_given = false;
 
 	/* Defaults are true */
-	*publish_insert = true;
-	*publish_update = true;
-	*publish_delete = true;
-	*publish_truncate = true;
+	pubactions->pubinsert = true;
+	pubactions->pubupdate = true;
+	pubactions->pubdelete = true;
+	pubactions->pubtruncate = true;
+
+	/* Relation changes published as of itself by default. */
+	*publish_using_root_schema = false;
 
 	/* Parse options */
 	foreach(lc, options)
@@ -91,10 +95,10 @@ parse_publication_options(List *options,
 			 * If publish option was given only the explicitly listed actions
 			 * should be published.
 			 */
-			*publish_insert = false;
-			*publish_update = false;
-			*publish_delete = false;
-			*publish_truncate = false;
+			pubactions->pubinsert = false;
+			pubactions->pubupdate = false;
+			pubactions->pubdelete = false;
+			pubactions->pubtruncate = false;
 
 			*publish_given = true;
 			publish = defGetString(defel);
@@ -110,19 +114,28 @@ parse_publication_options(List *options,
 				char	   *publish_opt = (char *) lfirst(lc);
 
 				if (strcmp(publish_opt, "insert") == 0)
-					*publish_insert = true;
+					pubactions->pubinsert = true;
 				else if (strcmp(publish_opt, "update") == 0)
-					*publish_update = true;
+					pubactions->pubupdate = true;
 				else if (strcmp(publish_opt, "delete") == 0)
-					*publish_delete = true;
+					pubactions->pubdelete = true;
 				else if (strcmp(publish_opt, "truncate") == 0)
-					*publish_truncate = true;
+					pubactions->pubtruncate = true;
 				else
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
 			}
 		}
+		else if (strcmp(defel->defname, "publish_using_root_schema") == 0)
+		{
+			if (*publish_using_root_schema_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			*publish_using_root_schema_given = true;
+			*publish_using_root_schema = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -143,10 +156,9 @@ CreatePublication(CreatePublicationStmt *stmt)
 	Datum		values[Natts_pg_publication];
 	HeapTuple	tup;
 	bool		publish_given;
-	bool		publish_insert;
-	bool		publish_update;
-	bool		publish_delete;
-	bool		publish_truncate;
+	PublicationActions pubactions;
+	bool		publish_using_root_schema_given;
+	bool		publish_using_root_schema;
 	AclResult	aclresult;
 
 	/* must have CREATE privilege on database */
@@ -183,9 +195,9 @@ CreatePublication(CreatePublicationStmt *stmt)
 	values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
 
 	parse_publication_options(stmt->options,
-							  &publish_given, &publish_insert,
-							  &publish_update, &publish_delete,
-							  &publish_truncate);
+							  &publish_given, &pubactions,
+							  &publish_using_root_schema_given,
+							  &publish_using_root_schema);
 
 	puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
 								Anum_pg_publication_oid);
@@ -193,13 +205,15 @@ CreatePublication(CreatePublicationStmt *stmt)
 	values[Anum_pg_publication_puballtables - 1] =
 		BoolGetDatum(stmt->for_all_tables);
 	values[Anum_pg_publication_pubinsert - 1] =
-		BoolGetDatum(publish_insert);
+		BoolGetDatum(pubactions.pubinsert);
 	values[Anum_pg_publication_pubupdate - 1] =
-		BoolGetDatum(publish_update);
+		BoolGetDatum(pubactions.pubupdate);
 	values[Anum_pg_publication_pubdelete - 1] =
-		BoolGetDatum(publish_delete);
+		BoolGetDatum(pubactions.pubdelete);
 	values[Anum_pg_publication_pubtruncate - 1] =
-		BoolGetDatum(publish_truncate);
+		BoolGetDatum(pubactions.pubtruncate);
+	values[Anum_pg_publication_pubasroot - 1] =
+		BoolGetDatum(publish_using_root_schema);
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -251,17 +265,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
 	bool		replaces[Natts_pg_publication];
 	Datum		values[Natts_pg_publication];
 	bool		publish_given;
-	bool		publish_insert;
-	bool		publish_update;
-	bool		publish_delete;
-	bool		publish_truncate;
+	PublicationActions pubactions;
+	bool		publish_using_root_schema_given;
+	bool		publish_using_root_schema;
 	ObjectAddress obj;
 	Form_pg_publication pubform;
 
 	parse_publication_options(stmt->options,
-							  &publish_given, &publish_insert,
-							  &publish_update, &publish_delete,
-							  &publish_truncate);
+							  &publish_given, &pubactions,
+							  &publish_using_root_schema_given,
+							  &publish_using_root_schema);
 
 	/* Everything ok, form a new tuple. */
 	memset(values, 0, sizeof(values));
@@ -270,19 +283,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
 
 	if (publish_given)
 	{
-		values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
+		values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
 		replaces[Anum_pg_publication_pubinsert - 1] = true;
 
-		values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
+		values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
 		replaces[Anum_pg_publication_pubupdate - 1] = true;
 
-		values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
+		values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
 		replaces[Anum_pg_publication_pubdelete - 1] = true;
 
-		values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
+		values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
 		replaces[Anum_pg_publication_pubtruncate - 1] = true;
 	}
 
+	if (publish_using_root_schema_given)
+	{
+		values[Anum_pg_publication_pubasroot - 1] = BoolGetDatum(publish_using_root_schema);
+		replaces[Anum_pg_publication_pubasroot - 1] = true;
+	}
+
 	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
 							replaces);
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 729025470d..c9e4214c73 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14692,7 +14692,7 @@ ATPrepChangePersistence(Relation rel, bool toLogged)
 	 * UNLOGGED as UNLOGGED tables can't be published.
 	 */
 	if (!toLogged &&
-		list_length(GetRelationPublications(RelationGetRelid(rel))) > 0)
+		list_length(GetRelationPublications(RelationGetRelid(rel), NULL)) > 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot change table \"%s\" to unlogged because it is part of a publication",
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index d71c0a4322..f71fd98be2 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -2320,8 +2320,12 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 
 	/* If modifying a partitioned table, initialize the root table info */
 	if (node->rootResultRelIndex >= 0)
+	{
 		mtstate->rootResultRelInfo = estate->es_root_result_relations +
 			node->rootResultRelIndex;
+		/* Only necessary to check replication identity. */
+		CheckValidResultRel(mtstate->rootResultRelInfo, operation);
+	}
 
 	mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans);
 	mtstate->mt_nplans = nplans;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 552a70cffa..f48a8fbb58 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -12,6 +12,8 @@
  */
 #include "postgres.h"
 
+#include "access/tupconvert.h"
+#include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "fmgr.h"
 #include "replication/logical.h"
@@ -20,6 +22,7 @@
 #include "replication/pgoutput.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -49,6 +52,7 @@ static bool publications_valid;
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
+static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
@@ -59,9 +63,33 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
 typedef struct RelationSyncEntry
 {
 	Oid			relid;			/* relation oid */
-	bool		schema_sent;	/* did we send the schema? */
+
+	/*
+	 * Did we send the schema?  If ancestor relid is set, its schema must also
+	 * have been sent for this to be true.
+	 */
+	bool		schema_sent;
 	bool		replicate_valid;
 	PublicationActions pubactions;
+
+	/*
+	 * True when publication that is matched by get_rel_sync_entry for this
+	 * relation is configured as such.
+	 */
+	bool		pubasroot;
+
+	/*
+	 * OID of the ancestor whose schema will be used when replicating changes
+	 * to a partition; InvalidOid if pubasroot is false.
+	 */
+	Oid			replicate_as_relid;
+
+	/*
+	 * Map, if any, used when replicating using an ancestor's schema to
+	 * convert the tuples from partition's type to the ancestor's; NULL if
+	 * pubasroot is false.
+	 */
+	TupleConversionMap *map;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -259,47 +287,72 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
- * Write the relation schema if the current schema hasn't been sent yet.
+ * Write the current schema of the relation and its ancestor (if any) if not
+ * done yet.
  */
 static void
 maybe_send_schema(LogicalDecodingContext *ctx,
 				  Relation relation, RelationSyncEntry *relentry)
 {
-	if (!relentry->schema_sent)
+	if (relentry->schema_sent)
+		return;
+
+	/* If needed, send the ancestor's schema first. */
+	if (OidIsValid(relentry->replicate_as_relid))
 	{
-		TupleDesc	desc;
-		int			i;
+		Relation	ancestor =
+		RelationIdGetRelation(relentry->replicate_as_relid);
+		TupleDesc	indesc = RelationGetDescr(relation);
+		TupleDesc	outdesc = RelationGetDescr(ancestor);
+		MemoryContext oldctx;
 
-		desc = RelationGetDescr(relation);
+		/* Map must live as long as the session does. */
+		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		relentry->map = convert_tuples_by_name(indesc, outdesc);
+		MemoryContextSwitchTo(oldctx);
+		send_relation_and_attrs(ancestor, ctx);
+		RelationClose(ancestor);
+	}
 
-		/*
-		 * Write out type info if needed.  We do that only for user-created
-		 * types.  We use FirstGenbkiObjectId as the cutoff, so that we only
-		 * consider objects with hand-assigned OIDs to be "built in", not for
-		 * instance any function or type defined in the information_schema.
-		 * This is important because only hand-assigned OIDs can be expected
-		 * to remain stable across major versions.
-		 */
-		for (i = 0; i < desc->natts; i++)
-		{
-			Form_pg_attribute att = TupleDescAttr(desc, i);
+	send_relation_and_attrs(relation, ctx);
+	relentry->schema_sent = true;
+}
 
-			if (att->attisdropped || att->attgenerated)
-				continue;
+/*
+ * Sends a relation
+ */
+static void
+send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
+{
+	TupleDesc	desc = RelationGetDescr(relation);
+	int			i;
 
-			if (att->atttypid < FirstGenbkiObjectId)
-				continue;
+	/*
+	 * Write out type info if needed.  We do that only for user-created types.
+	 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
+	 * objects with hand-assigned OIDs to be "built in", not for instance any
+	 * function or type defined in the information_schema. This is important
+	 * because only hand-assigned OIDs can be expected to remain stable across
+	 * major versions.
+	 */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = TupleDescAttr(desc, i);
 
-			OutputPluginPrepareWrite(ctx, false);
-			logicalrep_write_typ(ctx->out, att->atttypid);
-			OutputPluginWrite(ctx, false);
-		}
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		if (att->atttypid < FirstGenbkiObjectId)
+			continue;
 
 		OutputPluginPrepareWrite(ctx, false);
-		logicalrep_write_rel(ctx->out, relation);
+		logicalrep_write_typ(ctx->out, att->atttypid);
 		OutputPluginWrite(ctx, false);
-		relentry->schema_sent = true;
 	}
+
+	OutputPluginPrepareWrite(ctx, false);
+	logicalrep_write_rel(ctx->out, relation);
+	OutputPluginWrite(ctx, false);
 }
 
 /*
@@ -346,28 +399,68 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	switch (change->action)
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
-			OutputPluginPrepareWrite(ctx, true);
-			logicalrep_write_insert(ctx->out, relation,
-									&change->data.tp.newtuple->tuple);
-			OutputPluginWrite(ctx, true);
-			break;
+			{
+				HeapTuple	tuple = &change->data.tp.newtuple->tuple;
+
+				/* Publish as root relation change if requested. */
+				if (OidIsValid(relentry->replicate_as_relid))
+				{
+					Assert(relentry->pubasroot);
+					Assert(relation->rd_rel->relispartition);
+					relation = RelationIdGetRelation(relentry->replicate_as_relid);
+					/* Convert tuple if needed. */
+					if (relentry->map)
+						tuple = execute_attr_map_tuple(tuple, relentry->map);
+				}
+
+				OutputPluginPrepareWrite(ctx, true);
+				logicalrep_write_insert(ctx->out, relation, tuple);
+				OutputPluginWrite(ctx, true);
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			{
 				HeapTuple	oldtuple = change->data.tp.oldtuple ?
 				&change->data.tp.oldtuple->tuple : NULL;
+				HeapTuple	newtuple = &change->data.tp.newtuple->tuple;
+
+				/* Publish as root relation change if requested. */
+				if (OidIsValid(relentry->replicate_as_relid))
+				{
+					Assert(relentry->pubasroot);
+					Assert(relation->rd_rel->relispartition);
+					relation = RelationIdGetRelation(relentry->replicate_as_relid);
+					/* Convert tuples if needed. */
+					if (relentry->map)
+					{
+						oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+						newtuple = execute_attr_map_tuple(newtuple, relentry->map);
+					}
+				}
 
 				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_update(ctx->out, relation, oldtuple,
-										&change->data.tp.newtuple->tuple);
+				logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (change->data.tp.oldtuple)
 			{
+				HeapTuple	oldtuple = &change->data.tp.oldtuple->tuple;
+
+				/* Publish as root relation change if requested. */
+				if (OidIsValid(relentry->replicate_as_relid))
+				{
+					Assert(relentry->pubasroot);
+					Assert(relation->rd_rel->relispartition);
+					relation = RelationIdGetRelation(relentry->replicate_as_relid);
+					/* Convert tuple if needed. */
+					if (relentry->map)
+						oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+				}
+
 				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_delete(ctx->out, relation,
-										&change->data.tp.oldtuple->tuple);
+				logicalrep_write_delete(ctx->out, relation, oldtuple);
 				OutputPluginWrite(ctx, true);
 			}
 			else
@@ -413,9 +506,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 		/*
 		 * Don't send partitioned tables, because partitions should be sent
-		 * instead.
+		 * sent instead, unless user specified to send the former.
 		 */
-		if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+			!relentry->pubasroot)
 			continue;
 
 		relids[nrelids++] = relid;
@@ -540,7 +634,8 @@ init_rel_sync_cache(MemoryContext cachectx)
  * This looks up publications that the given relation is directly or
  * indirectly part of (the latter if it's really the relation's ancestor that
  * is part of a publication) and fills up the found entry with the information
- * about which operations to publish.
+ * about which operations to publish and whether to use an ancestor's schema
+ * when publishing.
  */
 static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Oid relid)
@@ -562,8 +657,10 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	/* Not found means schema wasn't sent */
 	if (!found || !entry->replicate_valid)
 	{
-		List	   *pubids = GetRelationPublications(relid);
+		List	   *published_rels = NIL;
+		List	   *pubids = GetRelationPublications(relid, &published_rels);
 		ListCell   *lc;
+		Oid			ancestor = InvalidOid;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -588,13 +685,42 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		foreach(lc, data->publications)
 		{
 			Publication *pub = lfirst(lc);
+			bool		publish = false;
 
-			if (pub->alltables || list_member_oid(pubids, pub->oid))
+			if (pub->alltables)
+			{
+				publish = true;
+				if (pub->pubasroot && get_rel_relispartition(relid))
+					ancestor = llast_oid(get_partition_ancestors(relid));
+			}
+
+			if (!publish)
+			{
+				ListCell *lc1,
+						 *lc2;
+
+				forboth(lc1, pubids, lc2, published_rels)
+				{
+					Oid		pubid = lfirst_oid(lc1);
+					Oid		pub_relid = lfirst_oid(lc2);
+					if (pubid == pub->oid)
+					{
+						publish = true;
+						if (pub->pubasroot && pub_relid != relid)
+							ancestor = pub_relid;
+						break;
+					}
+				}
+			}
+
+			if (publish)
 			{
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
-				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				if (!OidIsValid(ancestor))
+					entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+				entry->pubasroot = pub->pubasroot;
 			}
 
 			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
@@ -604,6 +730,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
 		list_free(pubids);
 
+		entry->replicate_as_relid = ancestor;
 		entry->replicate_valid = true;
 	}
 
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 76f41dbe36..8792217a26 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -43,6 +43,7 @@
 #include "catalog/catalog.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
+#include "catalog/partition.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_amproc.h"
 #include "catalog/pg_attrdef.h"
@@ -5138,7 +5139,7 @@ GetRelationPublicationActions(Relation relation)
 					  sizeof(PublicationActions));
 
 	/* Fetch the publication membership info. */
-	puboids = GetRelationPublications(RelationGetRelid(relation));
+	puboids = GetRelationPublications(RelationGetRelid(relation), NULL);
 	puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
 
 	foreach(lc, puboids)
@@ -5157,7 +5158,9 @@ GetRelationPublicationActions(Relation relation)
 		pubactions->pubinsert |= pubform->pubinsert;
 		pubactions->pubupdate |= pubform->pubupdate;
 		pubactions->pubdelete |= pubform->pubdelete;
-		pubactions->pubtruncate |= pubform->pubtruncate;
+		if (!pubform->pubasroot ||
+			!is_leaf_partition(RelationGetRelid(relation)))
+			pubactions->pubtruncate |= pubform->pubtruncate;
 
 		ReleaseSysCache(tup);
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 959b36a95c..d703f17dc6 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3792,6 +3792,7 @@ getPublications(Archive *fout)
 	int			i_pubupdate;
 	int			i_pubdelete;
 	int			i_pubtruncate;
+	int			i_pubasroot;
 	int			i,
 				ntups;
 
@@ -3803,11 +3804,18 @@ getPublications(Archive *fout)
 	resetPQExpBuffer(query);
 
 	/* Get the publications. */
-	if (fout->remoteVersion >= 110000)
+	if (fout->remoteVersion >= 130000)
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "(%s p.pubowner) AS rolname, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubasroot "
+						  "FROM pg_publication p",
+						  username_subquery);
+	else if (fout->remoteVersion >= 110000)
+		appendPQExpBuffer(query,
+						  "SELECT p.tableoid, p.oid, p.pubname, "
+						  "(%s p.pubowner) AS rolname, "
+						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false as pubasroot "
 						  "FROM pg_publication p",
 						  username_subquery);
 	else
@@ -3831,6 +3839,7 @@ getPublications(Archive *fout)
 	i_pubupdate = PQfnumber(res, "pubupdate");
 	i_pubdelete = PQfnumber(res, "pubdelete");
 	i_pubtruncate = PQfnumber(res, "pubtruncate");
+	i_pubasroot = PQfnumber(res, "pubasroot");
 
 	pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3853,6 +3862,8 @@ getPublications(Archive *fout)
 			(strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
 		pubinfo[i].pubtruncate =
 			(strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
+		pubinfo[i].pubasroot =
+			(strcmp(PQgetvalue(res, i, i_pubasroot), "t") == 0);
 
 		if (strlen(pubinfo[i].rolname) == 0)
 			pg_log_warning("owner of publication \"%s\" appears to be invalid",
@@ -3929,7 +3940,12 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
 		first = false;
 	}
 
-	appendPQExpBufferStr(query, "');\n");
+	appendPQExpBufferStr(query, "'");
+
+	if (pubinfo->pubasroot)
+		appendPQExpBufferStr(query, ", publish_using_root_schema = true");
+
+	appendPQExpBufferStr(query, ");\n");
 
 	ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
 				 ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e0c6444ef6..44e964fa24 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -601,6 +601,7 @@ typedef struct _PublicationInfo
 	bool		pubupdate;
 	bool		pubdelete;
 	bool		pubtruncate;
+	bool		pubasroot;
 } PublicationInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 109245fea7..cbd69942f4 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5707,7 +5707,7 @@ listPublications(const char *pattern)
 	PQExpBufferData buf;
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
-	static const bool translate_columns[] = {false, false, false, false, false, false, false};
+	static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -5738,6 +5738,10 @@ listPublications(const char *pattern)
 		appendPQExpBuffer(&buf,
 						  ",\n  pubtruncate AS \"%s\"",
 						  gettext_noop("Truncates"));
+	if (pset.sversion >= 130000)
+		appendPQExpBuffer(&buf,
+						  ",\n  pubasroot AS \"%s\"",
+						  gettext_noop("Publishes Using Root Schema"));
 
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
@@ -5779,6 +5783,7 @@ describePublications(const char *pattern)
 	int			i;
 	PGresult   *res;
 	bool		has_pubtruncate;
+	bool		has_pubasroot;
 
 	if (pset.sversion < 100000)
 	{
@@ -5791,6 +5796,7 @@ describePublications(const char *pattern)
 	}
 
 	has_pubtruncate = (pset.sversion >= 110000);
+	has_pubasroot = (pset.sversion >= 130000);
 
 	initPQExpBuffer(&buf);
 
@@ -5801,6 +5807,9 @@ describePublications(const char *pattern)
 	if (has_pubtruncate)
 		appendPQExpBufferStr(&buf,
 							 ", pubtruncate");
+	if (has_pubasroot)
+		appendPQExpBufferStr(&buf,
+							 ", pubasroot");
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
 
@@ -5850,6 +5859,8 @@ describePublications(const char *pattern)
 
 		if (has_pubtruncate)
 			ncols++;
+		if (has_pubasroot)
+			ncols++;
 
 		initPQExpBuffer(&title);
 		printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -5862,6 +5873,8 @@ describePublications(const char *pattern)
 		printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
 		if (has_pubtruncate)
 			printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
+		if (has_pubasroot)
+			printTableAddHeader(&cont, gettext_noop("Publishes Using Root Schema"), true, align);
 
 		printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
 		printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -5870,6 +5883,8 @@ describePublications(const char *pattern)
 		printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
 		if (has_pubtruncate)
 			printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
+		if (has_pubasroot)
+			printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
 
 		if (!puballtables)
 		{
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index 27873aff6e..c6c19119ca 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -21,6 +21,7 @@
 
 extern Oid	get_partition_parent(Oid relid);
 extern List *get_partition_ancestors(Oid relid);
+extern bool is_leaf_partition(Oid relid);
 extern Oid	index_get_partition(Relation partition, Oid indexId);
 extern List *map_partition_varattnos(List *expr, int fromrel_varno,
 									 Relation to_rel, Relation from_rel);
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index bb52e8c5e0..a85a6c8991 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -52,6 +52,8 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 	/* true if truncates are published */
 	bool		pubtruncate;
 
+	/* true if partition changes are published using root schema */
+	bool		pubasroot;
 } FormData_pg_publication;
 
 /* ----------------
@@ -74,12 +76,13 @@ typedef struct Publication
 	Oid			oid;
 	char	   *name;
 	bool		alltables;
+	bool		pubasroot;
 	PublicationActions pubactions;
 } Publication;
 
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
-extern List *GetRelationPublications(Oid relid);
+extern List *GetRelationPublications(Oid relid, List **published_rels);
 
 /*---------
  * Expected values for pub_partopt parameter of GetRelationPublications(),
@@ -99,7 +102,7 @@ typedef enum PublicationPartOpt
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(void);
+extern List *GetAllTablesPublicationRelations(bool pubasroot);
 
 extern bool is_publishable_relation(Relation rel);
 extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 2634d2c1e1..d2d269b11b 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -25,21 +25,23 @@ CREATE PUBLICATION testpub_xxx WITH (foo);
 ERROR:  unrecognized publication parameter: "foo"
 CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
 ERROR:  unrecognized "publish" value: "cluster"
+CREATE PUBLICATION testpub_xxx WITH (publish_using_root_schema = 'true', publish_using_root_schema = '0');
+ERROR:  conflicting or redundant options
 \dRp
-                                         List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------+--------------------------+------------+---------+---------+---------+-----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f
- testpub_default    | regress_publication_user | f          | f       | t       | f       | f
+                                                        List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                         List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------+--------------------------+------------+---------+---------+---------+-----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f
- testpub_default    | regress_publication_user | f          | t       | t       | t       | f
+                                                        List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f
 (2 rows)
 
 --- adding tables
@@ -83,10 +85,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                        Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | t          | t       | t       | f       | f
+                                       Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ regress_publication_user | t          | t       | t       | f       | f         | f
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -98,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                              Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                             Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                              Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                             Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_tbl3"
 
@@ -129,10 +131,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                          Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                         Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_parted"
 
@@ -143,6 +145,15 @@ HINT:  To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
 ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 -- works again, because parent's publication is no longer considered
 UPDATE testpub_parted1 SET a = 1;
+ALTER PUBLICATION testpub_forparted SET (publish_using_root_schema = true);
+\dRp+ testpub_forparted
+                                         Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ regress_publication_user | f          | t       | t       | t       | t         | t
+Tables:
+    "public.testpub_parted"
+
 DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 -- fail - view
@@ -159,10 +170,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                           Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                          Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -200,10 +211,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                           Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | f
+                                          Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ regress_publication_user | f          | t       | t       | t       | f         | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -247,10 +258,10 @@ DROP TABLE testpub_parted;
 DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                           Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | f
+                                          Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ regress_publication_user | f          | t       | t       | t       | f         | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -260,20 +271,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                                     List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
--------------+--------------------------+------------+---------+---------+---------+-----------
- testpub_foo | regress_publication_user | f          | t       | t       | t       | f
+                                                    List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+-------------+--------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                        List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates 
------------------+---------------------------+------------+---------+---------+---------+-----------
- testpub_default | regress_publication_user2 | f          | t       | t       | t       | f
+                                                       List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema 
+-----------------+---------------------------+------------+---------+---------+---------+-----------+-----------------------------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f
 (1 row)
 
 DROP PUBLICATION testpub_default;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 219e04129d..9742aef802 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -23,6 +23,7 @@ ALTER PUBLICATION testpub_default SET (publish = update);
 -- error cases
 CREATE PUBLICATION testpub_xxx WITH (foo);
 CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
+CREATE PUBLICATION testpub_xxx WITH (publish_using_root_schema = 'true', publish_using_root_schema = '0');
 
 \dRp
 
@@ -87,6 +88,8 @@ UPDATE testpub_parted1 SET a = 1;
 ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 -- works again, because parent's publication is no longer considered
 UPDATE testpub_parted1 SET a = 1;
+ALTER PUBLICATION testpub_forparted SET (publish_using_root_schema = true);
+\dRp+ testpub_forparted
 DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 8ac55102ec..2d033047a0 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 17;
+use Test::More tests => 36;
 
 # setup
 
@@ -25,7 +25,11 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
 $node_publisher->safe_psql('postgres',
 	"CREATE PUBLICATION pub1");
 $node_publisher->safe_psql('postgres',
-	"CREATE PUBLICATION pub_all FOR ALL TABLES");
+	"CREATE PUBLICATION pub_all FOR ALL TABLES WITH (publish_using_root_schema = true)");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub2");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub3 WITH (publish_using_root_schema = true)");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
 $node_publisher->safe_psql('postgres',
@@ -34,8 +38,24 @@ $node_publisher->safe_psql('postgres',
 	"ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2_1 (b text, a int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (1, 2, 3, 5, 6)");
 $node_publisher->safe_psql('postgres',
 	"ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1");
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub2 ADD TABLE tab1_1, tab1_2");
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub3 ADD TABLE tab2, tab3_1");
 
 # subscriber1
 $node_subscriber1->safe_psql('postgres',
@@ -51,18 +71,42 @@ $node_subscriber1->safe_psql('postgres',
 	"CREATE TABLE tab1_2_1 PARTITION OF tab1_2 FOR VALUES IN (5)");
 $node_subscriber1->safe_psql('postgres',
 	"CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (6)");
+$node_subscriber1->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)");
+$node_subscriber1->safe_psql('postgres',
+	"CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)");
+$node_subscriber1->safe_psql('postgres',
+	"CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)");
+$node_subscriber1->safe_psql('postgres',
+	"ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (1) TO (10)");
 $node_subscriber1->safe_psql('postgres',
 	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1");
+$node_subscriber1->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub4 CONNECTION '$publisher_connstr' PUBLICATION pub3");
 
 # subscriber 2
 $node_subscriber2->safe_psql('postgres',
-	"CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)");
+	"CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)");
+$node_subscriber2->safe_psql('postgres',
+	"ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)");
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)");
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)");
 $node_subscriber2->safe_psql('postgres',
 	"CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all");
+$node_subscriber2->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub2");
 
 # Wait for initial sync of all subscriptions
 my $synced_query =
@@ -79,9 +123,15 @@ $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab1_1 (a) VALUES (3)");
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab1_2 VALUES (5)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1), (3), (5)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab3 VALUES (1), (3), (5)");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
+$node_publisher->wait_for_catchup('sub3');
+$node_publisher->wait_for_catchup('sub4');
 
 my $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
@@ -91,6 +141,14 @@ $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT tableoid::regclass FROM tab1 WHERE a = 5");
 is($result, qq(tab1_2_1), 'inserts into tab1_2 replicated into correct partition');
 
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab2 GROUP BY 1");
+is($result, qq(sub1_tab2|3|1|5), 'insert into tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab3_1 GROUP BY 1");
+is($result, qq(sub1_tab3_1|3|1|5), 'insert into tab3_1 replicated');
+
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
 is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated');
@@ -99,27 +157,55 @@ $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
 is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated');
 
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub2_tab1|3|1|5), 'inserts into tab1 replicated');
+
 # update (no partition change)
 $node_publisher->safe_psql('postgres',
 	"UPDATE tab1 SET a = 2 WHERE a = 1");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab2 SET a = 2 WHERE a = 1");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab3 SET a = 2 WHERE a = 1");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
+$node_publisher->wait_for_catchup('sub3');
+$node_publisher->wait_for_catchup('sub4');
 
 $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
 is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated');
 
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab2 GROUP BY 1");
+is($result, qq(sub1_tab2|3|2|5), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab3_1 GROUP BY 1");
+is($result, qq(sub1_tab3_1|3|2|5), 'update of tab3_1 replicated');
+
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
 is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated');
 
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub2_tab1|3|2|5), 'update of tab1 replicated');
+
 # update (partition changes)
 $node_publisher->safe_psql('postgres',
 	"UPDATE tab1 SET a = 6 WHERE a = 2");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab2 SET a = 6 WHERE a = 2");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab3 SET a = 6 WHERE a = 2");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
+$node_publisher->wait_for_catchup('sub3');
+$node_publisher->wait_for_catchup('sub4');
 
 $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
@@ -129,6 +215,14 @@ $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT tableoid::regclass FROM tab1 WHERE a = 6");
 is($result, qq(tab1_2_2), 'update of tab1_2 correctly replicated as cross-partition update');
 
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab2 GROUP BY 1");
+is($result, qq(sub1_tab2|3|3|6), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab3_1 GROUP BY 1");
+is($result, qq(sub1_tab3_1|3|3|6), 'update of tab3_1 replicated');
+
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
 is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated');
@@ -137,19 +231,41 @@ $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
 is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated');
 
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
+is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub2_tab1|3|3|6), 'update of tab1 replicated');
+
 # delete
 $node_publisher->safe_psql('postgres',
 	"DELETE FROM tab1 WHERE a IN (3, 5)");
 $node_publisher->safe_psql('postgres',
 	"DELETE FROM tab1_2");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM tab2 WHERE a IN (3, 5)");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM tab3 WHERE a IN (3, 5)");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
+$node_publisher->wait_for_catchup('sub3');
+$node_publisher->wait_for_catchup('sub4');
 
 $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab1");
 is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated');
 
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(1|6|6), 'delete from tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab3_1");
+is($result, qq(1|6|6), 'delete from tab3_1 replicated');
+
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab1_1");
 is($result, qq(0||), 'delete from tab1_1 replicated');
@@ -158,34 +274,80 @@ $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab1_2");
 is($result, qq(0||), 'delete from tab1_2 replicated');
 
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab1_1");
+is($result, qq(0||), 'delete from tab1_1, tab_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'delete from tab1 replicated');
+
 # truncate
 $node_subscriber1->safe_psql('postgres',
 	"INSERT INTO tab1 VALUES (1), (2), (5)");
+$node_subscriber1->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1), (2), (5)");
+$node_subscriber1->safe_psql('postgres',
+	"INSERT INTO tab3_1 (a) VALUES (1), (2), (5)");
 $node_subscriber2->safe_psql('postgres',
 	"INSERT INTO tab1_2 VALUES (2)");
+$node_subscriber2->safe_psql('postgres',
+	"INSERT INTO tab1_1 VALUES (1)");
+$node_subscriber2->safe_psql('postgres',
+	"INSERT INTO tab1 VALUES (1), (2), (5)");
+
 $node_publisher->safe_psql('postgres',
 	"TRUNCATE tab1_2");
+$node_publisher->safe_psql('postgres',
+	"TRUNCATE tab2_1");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
+$node_publisher->wait_for_catchup('sub3');
+$node_publisher->wait_for_catchup('sub4');
 
 $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab1");
 is($result, qq(2|1|2), 'truncate of tab1_2 replicated');
 
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(4|1|6), 'truncate of tab2_2 NOT replicated');
+
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab1_2");
 is($result, qq(0||), 'truncate of tab1_2 replicated');
 
+$node_subscriber2->safe_psql('postgres',
+	"DROP SUBSCRIPTION sub3");
+$node_subscriber2->safe_psql('postgres',
+	"INSERT INTO tab1_2 VALUES (2)");
 $node_publisher->safe_psql('postgres',
 	"TRUNCATE tab1");
+$node_publisher->safe_psql('postgres',
+	"TRUNCATE tab2");
+$node_publisher->safe_psql('postgres',
+	"TRUNCATE tab3");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
+$node_publisher->wait_for_catchup('sub4');
 
 $result = $node_subscriber1->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab1");
 is($result, qq(0||), 'truncate of tab1_1 replicated');
 $result = $node_subscriber2->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(0||), 'truncate of tab1_1 replicated');
+is($result, qq(0||), 'truncate of tab1 replicated');
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab1_1");
+is($result, qq(1|1|1), 'tab1_1 unchanged');
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(0||), 'truncate of tab2 replicated');
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab3_1");
+is($result, qq(0||), 'truncate of tab3_1 replicated');
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab1_2");
+is($result, qq(1|2|2), 'tab1_2 unchanged');
-- 
2.20.1 (Apple Git-117)

