From cd232d91618010f332dfd8e6b265c769204d70da Mon Sep 17 00:00:00 2001
From: amit <amitlangote09@gmail.com>
Date: Thu, 7 Nov 2019 18:10:44 +0900
Subject: [PATCH v4 2/2] Support adding partitioned tables to publication

---
 doc/src/sgml/logical-replication.sgml       | 23 +++++++++----
 doc/src/sgml/ref/create_publication.sgml    | 13 ++++----
 src/backend/catalog/pg_publication.c        | 51 ++++++++++++++++-------------
 src/backend/commands/publicationcmds.c      | 12 +++++--
 src/backend/commands/subscriptioncmds.c     | 27 +++++++++++++--
 src/backend/executor/execReplication.c      | 19 +++++------
 src/backend/replication/logical/relation.c  |  1 +
 src/backend/replication/logical/tablesync.c | 24 +++++++++++---
 src/bin/pg_dump/pg_dump.c                   |  5 +--
 src/include/replication/logicalproto.h      |  1 +
 src/test/regress/expected/publication.out   |  3 --
 11 files changed, 118 insertions(+), 61 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index f657d1d06e..d67015e160 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -402,13 +402,22 @@
 
    <listitem>
     <para>
-     Replication is only possible from base tables to base tables.  That is,
-     the tables on the publication and on the subscription side must be normal
-     tables, not views, materialized views, partition root tables, or foreign
-     tables.  In the case of partitions, you can therefore replicate a
-     partition hierarchy one-to-one, but you cannot currently replicate to a
-     differently partitioned setup.  Attempts to replicate tables other than
-     base tables will result in an error.
+     Replication is only supported by regular and partitioned tables.
+     Attempts to replicate relations other than regular and partitioned tables,
+     such as views, materialized views, or foreign tables, will result in an
+     error.  However, note that replicating from a regular table to partitioned
+     table or vice versa is not supported.
+    </para>
+
+    <para>
+     When a partitioned table is added to a publication, all of its existing
+     and future partitions are implicitly considered to be part of the
+     publication.  Any changes made to the leaf partitions are sent to the
+     subscription server which must contain a partitioned table with partition
+     hierarchy matching one-to-one with the publication side partitioned
+     table.  For partitioned tables on the two sides to match one-to-one, each
+     partition with a given partition constraint must have the same name on
+     both sides.
     </para>
    </listitem>
   </itemizedlist>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index 99f87ca393..5e11868989 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -68,15 +68,16 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       that table is added to the publication.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are added.
       Optionally, <literal>*</literal> can be specified after the table name to
-      explicitly indicate that descendant tables are included.
+      explicitly indicate that descendant tables are included.  However, adding
+      a partitioned table to a publication never explicitly adds its partitions,
+      because partitions are implicitly published due to the partitioned table
+      being added to the publication.
      </para>
 
      <para>
-      Only persistent base tables can be part of a publication.  Temporary
-      tables, unlogged tables, foreign tables, materialized views, regular
-      views, and partitioned tables cannot be part of a publication.  To
-      replicate a partitioned table, add the individual partitions to the
-      publication.
+      Only persistent base tables and partitioned tables can be part of a
+      publication. Temporary tables, unlogged tables, foreign tables,
+      materialized views, regular views cannot be part of a publication.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 80b98e2c3c..d4cc805499 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -30,6 +30,7 @@
 #include "catalog/namespace.h"
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
+#include "catalog/partition.h"
 #include "catalog/pg_type.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
@@ -50,17 +51,9 @@
 static void
 check_publication_add_relation(Relation targetrel)
 {
-	/* Give more specific error for partitioned tables */
-	if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("\"%s\" is a partitioned table",
-						RelationGetRelationName(targetrel)),
-				 errdetail("Adding partitioned tables to publications is not supported."),
-				 errhint("You can add the table partitions individually.")));
-
-	/* Must be table */
-	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
+	/* Must be a regular or partitioned table */
+	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
+		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
 				 errmsg("\"%s\" is not a table",
@@ -106,7 +99,8 @@ check_publication_add_relation(Relation targetrel)
 static bool
 is_publishable_class(Oid relid, Form_pg_class reltuple)
 {
-	return reltuple->relkind == RELKIND_RELATION &&
+	return (reltuple->relkind == RELKIND_RELATION ||
+			reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
 		!IsCatalogRelationOid(relid) &&
 		reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
 		relid >= FirstNormalObjectId;
@@ -224,7 +218,8 @@ publication_add_relation(Oid pubid, Relation targetrel,
 
 
 /*
- * Finds all publications associated with the relation.
+ * Finds all publications associated with the relation and if it's a
+ * partition, also with any of its ancestors.
  */
 List *
 GetRelationPublications(Relation rel)
@@ -233,20 +228,32 @@ GetRelationPublications(Relation rel)
 	List	   *result = NIL;
 	CatCList   *pubrellist;
 	int			i;
+	ListCell   *lc;
+	List	   *target_rels = NIL;
 
-	/* Find all publications associated with the relation. */
-	pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
-									 ObjectIdGetDatum(relid));
-	for (i = 0; i < pubrellist->n_members; i++)
+	/* For a partition, include its ancestors' publications, if any. */
+	if (rel->rd_rel->relispartition)
+		target_rels = get_partition_ancestors(RelationGetRelid(rel));
+
+	target_rels = lappend_oid(target_rels, relid);
+
+	foreach(lc, target_rels)
 	{
-		HeapTuple	tup = &pubrellist->members[i]->tuple;
-		Oid			pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
+		Oid		relid = lfirst_oid(lc);
 
-		result = lappend_oid(result, pubid);
+		pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
+										 ObjectIdGetDatum(relid));
+		for (i = 0; i < pubrellist->n_members; i++)
+		{
+			HeapTuple	tup = &pubrellist->members[i]->tuple;
+			Oid			pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
+
+			result = lappend_oid(result, pubid);
+		}
+
+		ReleaseSysCacheList(pubrellist);
 	}
 
-	ReleaseSysCacheList(pubrellist);
-
 	return result;
 }
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index f115d4bf80..db17c47495 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -502,7 +502,8 @@ RemovePublicationRelById(Oid proid)
 
 /*
  * Open relations specified by a RangeVar list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
  */
 static List *
 OpenTableList(List *tables)
@@ -543,8 +544,13 @@ OpenTableList(List *tables)
 		rels = lappend(rels, rel);
 		relids = lappend_oid(relids, myrelid);
 
-		/* Add children of this rel, if requested */
-		if (recurse)
+		/*
+		 * Add children of this rel, if requested, so that they too are added
+		 * to the publication.  A partitioned table can't have any inheritance
+		 * children other than its partitions, which need not be explicitly
+		 * added to the publication.
+		 */
+		if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
 		{
 			List	   *children;
 			ListCell   *child;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 11c0f305ff..eb555f1d64 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1149,11 +1149,20 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 	Assert(list_length(publications) > 0);
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, c.relkind\n"
+	appendStringInfoString(&cmd, "SELECT s.schemaname, s.tablename, s.relkind FROM (\n"
+						   "  SELECT DISTINCT t.pubname, t.schemaname, t.tablename, c.relkind\n"
 						   "  FROM pg_catalog.pg_publication_tables t\n"
 						   "  JOIN pg_class c ON t.schemaname = c.relnamespace::regnamespace::name\n"
 						   "  AND t.tablename = c.relname\n"
-						   " WHERE t.pubname IN (");
+						   "  UNION\n"
+						   "  SELECT DISTINCT t.pubname, s.schemaname, s.tablename, s.relkind\n"
+						   "  FROM pg_catalog.pg_publication_tables t,\n"
+						   "  LATERAL (SELECT c.relnamespace::regnamespace::name, c.relname, c.relkind\n"
+						   "		   FROM pg_class c\n"
+						   "		   JOIN pg_partition_tree(t.schemaname || '.' || t.tablename) p\n"
+						   "		   ON p.relid = c.oid\n"
+						   "		   WHERE p.level > 0) AS s(schemaname, tablename, relkind)) s\n"
+						   " WHERE s.pubname IN (");
 
 	first = true;
 	foreach(lc, publications)
@@ -1224,5 +1233,19 @@ ValidateSubscriptionRel(PublicationTable *pt)
 	local_relkind = get_rel_relkind(relid);
 	CheckSubscriptionRelkind(local_relkind, rv->schemaname, rv->relname);
 
+	/*
+	 * Cannot replicate from a regular to a partitioned table or vice
+	 * versa.
+	 */
+	if (local_relkind != pt->relkind)
+		ereport(ERROR,
+				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
+						rv->schemaname, rv->relname),
+				 errdetail("\"%s.%s\" is a %s on subscription side whereas a %s on publication side",
+						   rv->schemaname, rv->relname,
+						   local_relkind == RELKIND_RELATION ? "regular table" : "partitioned table",
+						   pt->relkind == RELKIND_RELATION ? "regular table" : "partitioned table")));
+
 	return relid;
 }
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 95e027c970..f05f44c99f 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -591,17 +591,10 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
 						 const char *relname)
 {
 	/*
-	 * We currently only support writing to regular tables.  However, give a
-	 * more specific error for partitioned and foreign tables.
+	 * We currently only support writing to regular and partitioned tables.
+	 * However, give a more specific error for foreign tables.
 	 */
-	if (relkind == RELKIND_PARTITIONED_TABLE)
-		ereport(ERROR,
-				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
-						nspname, relname),
-				 errdetail("\"%s.%s\" is a partitioned table.",
-						   nspname, relname)));
-	else if (relkind == RELKIND_FOREIGN_TABLE)
+	if (relkind == RELKIND_FOREIGN_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
@@ -609,7 +602,11 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
 				 errdetail("\"%s.%s\" is a foreign table.",
 						   nspname, relname)));
 
-	if (relkind != RELKIND_RELATION)
+	/*
+	 * Subscription for partitioned tables are really placeholder objects, as
+	 * replication itself occurs on the individual partition level.
+	 */
+	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index b386f8460d..aac2af71b7 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -177,6 +177,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel)
 	entry->remoterel.remoteid = remoterel->remoteid;
 	entry->remoterel.nspname = pstrdup(remoterel->nspname);
 	entry->remoterel.relname = pstrdup(remoterel->relname);
+	entry->remoterel.relkind = remoterel->relkind;
 	entry->remoterel.natts = remoterel->natts;
 	entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
 	entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 7881079e96..fa469c8c67 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -637,7 +637,8 @@ copy_read_data(void *outbuf, int minread, int maxread)
 
 /*
  * Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * message provides during replication.  XXX - while we fetch relkind too
+ * here, the RELATION message doesn't provide it
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
@@ -646,7 +647,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[2] = {OIDOID, CHAROID};
+	Oid			tableRow[3] = {OIDOID, CHAROID, CHAROID};
 	Oid			attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
 	bool		isnull;
 	int			natt;
@@ -656,16 +657,16 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
+	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
 					 "  FROM pg_catalog.pg_class c"
 					 "  INNER JOIN pg_catalog.pg_namespace n"
 					 "        ON (c.relnamespace = n.oid)"
 					 " WHERE n.nspname = %s"
 					 "   AND c.relname = %s"
-					 "   AND c.relkind = 'r'",
+					 "   AND pg_relation_is_publishable(c.oid)",
 					 quote_literal_cstr(nspname),
 					 quote_literal_cstr(relname));
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, tableRow);
 
 	if (res->status != WALRCV_OK_TUPLES)
 		ereport(ERROR,
@@ -682,6 +683,8 @@ fetch_remote_table_info(char *nspname, char *relname,
 	Assert(!isnull);
 	lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
 	Assert(!isnull);
+	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
+	Assert(!isnull);
 
 	ExecDropSingleTupleTableSlot(slot);
 	walrcv_clear_result(res);
@@ -769,6 +772,17 @@ copy_table(Relation rel)
 	relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
 	Assert(rel == relmapentry->localrel);
 
+	/*
+	 * If either table is partitioned, skip copying.  Individual partitions
+	 * will be copied instead.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ||
+		lrel.relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		logicalrep_rel_close(relmapentry, NoLock);
+		return;
+	}
+
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 	appendStringInfo(&cmd, "COPY %s TO STDOUT",
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index bf69adc2f4..b0e87b9075 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -3969,8 +3969,9 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	{
 		TableInfo  *tbinfo = &tblinfo[i];
 
-		/* Only plain tables can be aded to publications. */
-		if (tbinfo->relkind != RELKIND_RELATION)
+		/* Only plain and partitioned tables can be aded to publications. */
+		if (tbinfo->relkind != RELKIND_RELATION &&
+			tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
 			continue;
 
 		/*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 3fc430af01..0fea368d99 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -45,6 +45,7 @@ typedef struct LogicalRepRelation
 	LogicalRepRelId remoteid;	/* unique id of the relation */
 	char	   *nspname;		/* schema name */
 	char	   *relname;		/* relation name */
+	char		relkind;		/* relation kind */
 	int			natts;			/* number of columns */
 	char	  **attnames;		/* column names */
 	Oid		   *atttyps;		/* column types */
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index feb51e4add..ee0db9b07b 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -144,9 +144,6 @@ ERROR:  "testpub_view" is not a table
 DETAIL:  Only tables can be added to publications.
 -- fail - partitioned table
 ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted;
-ERROR:  "testpub_parted" is a partitioned table
-DETAIL:  Adding partitioned tables to publications is not supported.
-HINT:  You can add the table partitions individually.
 ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
-- 
2.11.0

