On 2021-Dec-31, Justin Pryzby wrote:

> > @@ -5963,8 +5967,20 @@ describePublications(const char *pattern)
> >             {
> > +                                                                    "      
> >  CASE WHEN pr.prattrs IS NOT NULL THEN\n"
> > +                                                                    "      
> >  pg_catalog.array_to_string"
> > +                                                                    
> > "(ARRAY(SELECT attname\n"
> > +                                                                    "      
> >    FROM pg_catalog.generate_series(0, 
> > pg_catalog.array_upper(pr.prattrs::int[], 1)) s,\n"
> > +                                                                    "      
> >         pg_catalog.pg_attribute\n"
> > +                                                                    "      
> >   WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
> > +                                                                    "      
> >  ELSE NULL END AS columns");

> I suppose this should use pr.prattrs::pg_catalog.int2[] ?

True.  Changed that.

Another change in this v15 is that I renamed the test file from ".patch"
to ".pl". I suppose I mistyped the extension when renumbering from 021
to 028.

> Did the DatumGetBool issue expose a deficiency in testing ?
> I think the !am_partition path was never being hit.

Hmm, the TAP test creates a subscription that contains both types of
tables.  I tried adding an assert for each case, and they were both hit
on running the test.

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
"La persona que no quería pecar / estaba obligada a sentarse
 en duras y empinadas sillas    / desprovistas, por cierto
 de blandos atenuantes"                          (Patricio Vogel)
>From ad2766290e7011481813ce24c1947bff70415211 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 6 Sep 2021 10:34:29 -0300
Subject: [PATCH v15] Support column lists for logical replication of tables

Add the capability of specifying a column list for individual tables as
part of a publication.  Columns not in the list are not published.  This
enables replicating to a table with only a subset of the columns.

If no column list is specified, all the columns are replicated, as
previously

Author: Rahila Syed <rahilasye...@gmail.com>
Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektnrh8nj1jf...@mail.gmail.com
---
 doc/src/sgml/protocol.sgml                  |   4 +-
 doc/src/sgml/ref/alter_publication.sgml     |  20 +-
 doc/src/sgml/ref/create_publication.sgml    |  11 +-
 src/backend/catalog/pg_publication.c        | 306 +++++++++++++++++++-
 src/backend/commands/publicationcmds.c      |  67 ++++-
 src/backend/commands/tablecmds.c            |  79 ++++-
 src/backend/nodes/copyfuncs.c               |   1 +
 src/backend/nodes/equalfuncs.c              |   1 +
 src/backend/parser/gram.y                   |  60 +++-
 src/backend/replication/logical/proto.c     |  66 +++--
 src/backend/replication/logical/tablesync.c | 120 +++++++-
 src/backend/replication/pgoutput/pgoutput.c |  78 ++++-
 src/bin/pg_dump/pg_dump.c                   |  41 ++-
 src/bin/pg_dump/pg_dump.h                   |   1 +
 src/bin/psql/describe.c                     |  26 +-
 src/bin/psql/tab-complete.c                 |   2 +
 src/include/catalog/pg_publication.h        |   5 +
 src/include/catalog/pg_publication_rel.h    |   3 +
 src/include/nodes/parsenodes.h              |   4 +-
 src/include/replication/logicalproto.h      |   6 +-
 src/test/regress/expected/publication.out   |  33 ++-
 src/test/regress/sql/publication.sql        |  20 +-
 src/test/subscription/t/028_column_list.pl  | 164 +++++++++++
 23 files changed, 1034 insertions(+), 84 deletions(-)
 create mode 100644 src/test/subscription/t/028_column_list.pl

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 34a7034282..5bc2e7a591 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -6877,7 +6877,9 @@ Relation
 </listitem>
 </varlistentry>
 </variablelist>
-        Next, the following message part appears for each column (except generated columns):
+        Next, the following message part appears for each column (except
+        generated columns and other columns that don't appear in the column
+        filter list, for tables that have one):
 <variablelist>
 <varlistentry>
 <term>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..16a12b44b9 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -25,12 +25,13 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD <replace
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET <replaceable class="parameter">publication_object</replaceable> [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP <replaceable class="parameter">publication_object</replaceable> [, ...]
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )
+ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ALTER TABLE <replaceable class="parameter">publication_object</replaceable> SET COLUMNS { ( <replaceable class="parameter">name</replaceable> [, ...] ) | ALL }
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -62,6 +63,11 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
    command retain their previous settings.
   </para>
 
+  <para>
+   The <literal>ALTER TABLE ... SET COLUMNS</literal> variant allows to change
+   the set of columns that are included in the publication.
+  </para>
+
   <para>
    The remaining variants change the owner and the name of the publication.
   </para>
@@ -110,6 +116,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
       name to explicitly indicate that descendant tables are included.
+      Optionally, a column list can be specified.  See <xref
+      linkend="sql-createpublication"/> for details.
      </para>
     </listitem>
    </varlistentry>
@@ -164,9 +172,15 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete');
   </para>
 
   <para>
-   Add some tables to the publication:
+   Add tables to the publication:
 <programlisting>
-ALTER PUBLICATION mypublication ADD TABLE users, departments;
+ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments;
+</programlisting></para>
+
+  <para>
+   Change the set of columns published for a table:
+<programlisting>
+ALTER PUBLICATION mypublication ALTER TABLE users SET COLUMNS (user_id, firstname, lastname);
 </programlisting></para>
 
   <para>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..73a23cbb02 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       publication, so they are never explicitly added to the publication.
      </para>
 
+     <para>
+      When a column list is specified, only the listed columns are replicated;
+      any other columns are ignored for the purpose of replication through
+      this publication.  If no column list is specified, all columns of the
+      table are replicated through this publication, including any columns
+      added later.  If a column list is specified, it must include the replica
+      identity columns.
+     </para>
+
      <para>
       Only persistent base tables and partitioned tables can be part of a
       publication.  Temporary tables, unlogged tables, foreign tables,
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index b307bc2ed5..af5d1a281f 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -45,13 +45,23 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
+
+static void check_publication_columns(Relation targetrel, Bitmapset *columns);
+static AttrNumber *publication_translate_columns(Relation targetrel, List *columns,
+												 int *natts, Bitmapset **attset);
+
 /*
- * Check if relation can be in given publication and throws appropriate
- * error if not.
+ * Check if relation can be in given publication and that the column
+ * filter is sensible, and throws appropriate error if not.
+ *
+ * targetcols is the bitmapset of column specified as column filter, or NULL if
+ * no column filter was specified.
  */
 static void
-check_publication_add_relation(Relation targetrel)
+check_publication_add_relation(Relation targetrel, Bitmapset *columns)
 {
+	bool		replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+
 	/* Must be a regular or partitioned table */
 	if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
 		RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
@@ -82,6 +92,63 @@ check_publication_add_relation(Relation targetrel)
 				 errmsg("cannot add relation \"%s\" to publication",
 						RelationGetRelationName(targetrel)),
 				 errdetail("This operation is not supported for unlogged tables.")));
+
+	/* Make sure the column list checks out */
+	if (columns != NULL)
+	{
+		/*
+		 * Even if the user listed all columns in the column list, we cannot
+		 * allow a column list to be specified when REPLICA IDENTITY is FULL;
+		 * that would cause problems if a new column is added later, because
+		 * the new column would have to be included (because of being part of
+		 * the replica identity) but it's technically not allowed (because of
+		 * not being in the publication's column list yet).  So reject this
+		 * case altogether.
+		 */
+		if (replidentfull)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("invalid column list for publishing relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+
+		check_publication_columns(targetrel, columns);
+	}
+}
+
+/*
+ * Enforce that the column filter can only leave out columns that aren't
+ * forced to be sent.
+ *
+ * No column can be excluded if REPLICA IDENTITY is FULL (since all the
+ * columns need to be sent regardless); and in other cases, the columns in
+ * the REPLICA IDENTITY cannot be left out.
+ */
+static void
+check_publication_columns(Relation targetrel, Bitmapset *columns)
+{
+	Bitmapset  *idattrs;
+	int			x;
+
+	idattrs = RelationGetIndexAttrBitmap(targetrel,
+										 INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+	/*
+	 * We have to test membership the hard way, because the values returned by
+	 * RelationGetIndexAttrBitmap are offset.
+	 */
+	x = -1;
+	while ((x = bms_next_member(idattrs, x)) >= 0)
+	{
+		if (!bms_is_member(x + FirstLowInvalidHeapAttributeNumber, columns))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+					errmsg("invalid column list for publishing relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("All columns in REPLICA IDENTITY must be present in the column list."));
+	}
+
+	bms_free(idattrs);
 }
 
 /*
@@ -289,6 +356,9 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			pubreloid;
 	Publication *pub = GetPublication(pubid);
+	Bitmapset  *attset = NULL;
+	AttrNumber *attarray;
+	int			natts = 0;
 	ObjectAddress myself,
 				referenced;
 	List	   *relids = NIL;
@@ -314,7 +384,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel->relation);
+	/* Translate column names to numbers and verify suitability */
+	attarray = publication_translate_columns(targetrel->relation,
+											 targetrel->columns,
+											 &natts, &attset);
+
+	check_publication_add_relation(targetrel->relation, attset);
+
+	bms_free(attset);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -327,6 +404,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
+	if (targetrel->columns)
+	{
+		int2vector *prattrs;
+
+		prattrs = buildint2vector(attarray, natts);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+	}
+	else
+		nulls[Anum_pg_publication_rel_prattrs - 1] = true;
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -337,6 +423,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	/* Register dependencies as needed */
 	ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
 
+	/* Add dependency on the columns, if any are listed */
+	for (int i = 0; i < natts; i++)
+	{
+		ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+	}
+	pfree(attarray);
 	/* Add dependency on the publication */
 	ObjectAddressSet(referenced, PublicationRelationId, pubid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
@@ -364,6 +457,143 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	return myself;
 }
 
+/*
+ * Update the column list for a relation in a publication.
+ */
+void
+publication_set_table_columns(Relation pubrel, HeapTuple pubreltup,
+							  Relation targetrel, List *columns)
+{
+	Bitmapset  *attset;
+	AttrNumber *attarray;
+	HeapTuple	copytup;
+	int			natts;
+	bool		nulls[Natts_pg_publication_rel];
+	bool		replaces[Natts_pg_publication_rel];
+	Datum		values[Natts_pg_publication_rel];
+
+	memset(values, 0, sizeof(values));
+	memset(nulls, 0, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	replaces[Anum_pg_publication_rel_prattrs - 1] = true;
+
+	deleteDependencyRecordsForClass(PublicationRelationId,
+									((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid,
+									RelationRelationId,
+									DEPENDENCY_AUTO);
+
+	if (columns == NULL)
+	{
+		nulls[Anum_pg_publication_rel_prattrs - 1] = true;
+	}
+	else
+	{
+		ObjectAddress myself,
+					referenced;
+		int2vector *prattrs;
+
+		if (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+			ereport(ERROR,
+					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("cannot change column set for relation \"%s\"",
+						   RelationGetRelationName(targetrel)),
+					errdetail("Cannot have column filter on relations with REPLICA IDENTITY FULL."));
+
+		attarray = publication_translate_columns(targetrel, columns,
+												 &natts, &attset);
+
+		/*
+		 * Make sure the column list checks out.  XXX this should occur at
+		 * caller in publicationcmds.c, not here.
+		 */
+		check_publication_columns(targetrel, attset);
+		bms_free(attset);
+
+		prattrs = buildint2vector(attarray, natts);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs);
+
+		/* Add dependencies on the new list of columns */
+		ObjectAddressSet(myself, PublicationRelRelationId,
+						 ((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid);
+		for (int i = 0; i < natts; i++)
+		{
+			ObjectAddressSubSet(referenced, RelationRelationId,
+								RelationGetRelid(targetrel), attarray[i]);
+			recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+		}
+	}
+
+	copytup = heap_modify_tuple(pubreltup, RelationGetDescr(pubrel),
+								values, nulls, replaces);
+
+	CatalogTupleUpdate(pubrel, &pubreltup->t_self, copytup);
+
+	heap_freetuple(copytup);
+}
+
+/*
+ * Translate a list of column names to an array of attribute numbers
+ * and a Bitmapset with them; verify that each attribute is appropriate
+ * to have in a publication column list.  Other checks are done later;
+ * see check_publication_columns.
+ *
+ * Note that the attribute numbers are *not* offset by
+ * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
+ * is okay.
+ */
+static AttrNumber *
+publication_translate_columns(Relation targetrel, List *columns, int *natts,
+							  Bitmapset **attset)
+{
+	AttrNumber *attarray;
+	Bitmapset  *set = NULL;
+	ListCell   *lc;
+	int			n = 0;
+
+	/*
+	 * Translate list of columns to attnums. We prohibit system attributes and
+	 * make sure there are no duplicate columns.
+	 */
+	attarray = palloc(sizeof(AttrNumber) * list_length(columns));
+	foreach(lc, columns)
+	{
+		char	   *colname = strVal(lfirst(lc));
+		AttrNumber	attnum = get_attnum(RelationGetRelid(targetrel), colname);
+
+		if (attnum == InvalidAttrNumber)
+			ereport(ERROR,
+					errcode(ERRCODE_UNDEFINED_COLUMN),
+					errmsg("column \"%s\" of relation \"%s\" does not exist",
+						   colname, RelationGetRelationName(targetrel)));
+
+		if (!AttrNumberIsForUserDefinedAttr(attnum))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+					errmsg("cannot reference system column \"%s\" in publication column list",
+						   colname));
+
+		if (bms_is_member(attnum, set))
+			ereport(ERROR,
+					errcode(ERRCODE_DUPLICATE_OBJECT),
+					errmsg("duplicate column \"%s\" in publication column list",
+						   colname));
+
+		set = bms_add_member(set, attnum);
+		attarray[n++] = attnum;
+	}
+
+	/*
+	 * XXX qsort the array here, or maybe build just the bitmapset above and
+	 * then scan that in order to produce the array?  Do we care about the
+	 * array being unsorted?
+	 */
+
+	*natts = n;
+	*attset = set;
+	return attarray;
+}
+
 /*
  * Insert new publication / schema mapping.
  */
@@ -471,6 +701,74 @@ GetRelationPublications(Oid relid)
 	return result;
 }
 
+/*
+ * Gets a list of OIDs of all partial-column publications of the given
+ * relation, that is, those that specify a column list.
+ */
+List *
+GetRelationColumnPartialPublications(Oid relid)
+{
+	CatCList   *pubrellist;
+	List	   *pubs = NIL;
+
+	pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
+									 ObjectIdGetDatum(relid));
+	for (int i = 0; i < pubrellist->n_members; i++)
+	{
+		HeapTuple	tup = &pubrellist->members[i]->tuple;
+		bool		isnull;
+
+		(void) SysCacheGetAttr(PUBLICATIONRELMAP, tup,
+							   Anum_pg_publication_rel_prattrs,
+							   &isnull);
+		if (isnull)
+			continue;
+
+		pubs = lappend_oid(pubs,
+						   ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid);
+	}
+
+	ReleaseSysCacheList(pubrellist);
+
+	return pubs;
+}
+
+/*
+ * For a relation in a publication that is known to have a non-null column
+ * list, return the list of attribute numbers that are in it.
+ */
+List *
+GetRelationColumnListInPublication(Oid relid, Oid pubid)
+{
+	HeapTuple	tup;
+	Datum		adatum;
+	bool		isnull;
+	ArrayType  *arr;
+	int			nelems;
+	int16	   *elems;
+	List	   *attnos = NIL;
+
+	tup = SearchSysCache2(PUBLICATIONRELMAP,
+						  ObjectIdGetDatum(relid),
+						  ObjectIdGetDatum(pubid));
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for rel %u of publication %u", relid, pubid);
+	adatum = SysCacheGetAttr(PUBLICATIONRELMAP, tup,
+							 Anum_pg_publication_rel_prattrs, &isnull);
+	if (isnull)
+		elog(ERROR, "found unexpected null in pg_publication_rel.prattrs");
+	arr = DatumGetArrayTypeP(adatum);
+	nelems = ARR_DIMS(arr)[0];
+	elems = (int16 *) ARR_DATA_PTR(arr);
+
+	for (int i = 0; i < nelems; i++)
+		attnos = lappend_oid(attnos, elems[i]);
+
+	ReleaseSysCache(tup);
+
+	return attnos;
+}
+
 /*
  * Gets list of relation oids for a publication.
  *
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 0f04969fd6..657374c0d1 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -376,6 +376,46 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 	return myself;
 }
 
+/*
+ * Change the column list of a relation in a publication
+ */
+static void
+PublicationSetColumns(AlterPublicationStmt *stmt,
+					  Form_pg_publication pubform, PublicationTable *table)
+{
+	Relation	rel,
+				urel;
+	HeapTuple	tup;
+	ObjectAddress obj,
+				secondary;
+
+	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
+	urel = table_openrv(table->relation, ShareUpdateExclusiveLock);
+
+	tup = SearchSysCache2(PUBLICATIONRELMAP,
+						  ObjectIdGetDatum(RelationGetRelid(urel)),
+						  ObjectIdGetDatum(pubform->oid));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errmsg("relation \"%s\" is not already in publication \"%s\"",
+					   table->relation->relname,
+					   NameStr(pubform->pubname)));
+
+	publication_set_table_columns(rel, tup, urel, table->columns);
+
+	ObjectAddressSet(obj, PublicationRelationId,
+					 ((Form_pg_publication_rel) GETSTRUCT(tup))->oid);
+	ObjectAddressSet(secondary, RelationRelationId, RelationGetRelid(urel));
+	EventTriggerCollectSimpleCommand(obj, secondary, (Node *) stmt);
+
+	ReleaseSysCache(tup);
+
+	table_close(rel, RowExclusiveLock);
+	table_close(urel, NoLock);
+
+	InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
+}
+
 /*
  * Change options of a publication.
  */
@@ -523,6 +563,14 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 	}
 	else if (stmt->action == AP_DropObjects)
 		PublicationDropTables(pubid, rels, false);
+	else if (stmt->action == AP_SetColumns)
+	{
+		Assert(schemaidlist == NIL);
+		Assert(list_length(tables) == 1);
+
+		PublicationSetColumns(stmt, pubform,
+							  linitial_node(PublicationTable, tables));
+	}
 	else						/* AP_SetObjects */
 	{
 		List	   *oldrelids = GetPublicationRelations(pubid,
@@ -562,7 +610,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 
 				pubrel = palloc(sizeof(PublicationRelInfo));
 				pubrel->relation = oldrel;
-
+				/* This is not needed to delete a table */
+				pubrel->columns = NIL;
 				delrels = lappend(delrels, pubrel);
 			}
 		}
@@ -622,7 +671,7 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 	}
 	else if (stmt->action == AP_DropObjects)
 		PublicationDropSchemas(pubform->oid, schemaidlist, false);
-	else						/* AP_SetObjects */
+	else if (stmt->action == AP_SetObjects)
 	{
 		List	   *oldschemaids = GetPublicationSchemas(pubform->oid);
 		List	   *delschemas = NIL;
@@ -645,6 +694,10 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt,
 		 */
 		PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
 	}
+	else
+	{
+		/* Nothing to do for AP_SetColumns */
+	}
 }
 
 /*
@@ -934,6 +987,8 @@ OpenTableList(List *tables)
 
 		pub_rel = palloc(sizeof(PublicationRelInfo));
 		pub_rel->relation = rel;
+		pub_rel->columns = t->columns;
+
 		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
@@ -967,8 +1022,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
+
 				pub_rel = palloc(sizeof(PublicationRelInfo));
 				pub_rel->relation = rel;
+				pub_rel->columns = t->columns;
+
 				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
@@ -1076,6 +1134,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 		Relation	rel = pubrel->relation;
 		Oid			relid = RelationGetRelid(rel);
 
+		if (pubrel->columns)
+			ereport(ERROR,
+					errcode(ERRCODE_SYNTAX_ERROR),
+					errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
+
 		prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
 							   ObjectIdGetDatum(relid),
 							   ObjectIdGetDatum(pubid));
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3631b8a929..232a068613 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -8347,6 +8347,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 				 bool missing_ok, LOCKMODE lockmode,
 				 ObjectAddresses *addrs)
 {
+	Oid			relid = RelationGetRelid(rel);
 	HeapTuple	tuple;
 	Form_pg_attribute targetatt;
 	AttrNumber	attnum;
@@ -8366,7 +8367,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 	/*
 	 * get the number of the attribute
 	 */
-	tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName);
+	tuple = SearchSysCacheAttName(relid, colName);
 	if (!HeapTupleIsValid(tuple))
 	{
 		if (!missing_ok)
@@ -8420,13 +8421,42 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	ReleaseSysCache(tuple);
 
+	/*
+	 * Also, if the column is used in the column list of a publication,
+	 * disallow the drop if the DROP is RESTRICT.  We don't do anything if the
+	 * DROP is CASCADE, which means that the dependency mechanism will remove
+	 * the relation from the publication.
+	 */
+	if (behavior == DROP_RESTRICT)
+	{
+		List	   *pubs;
+		ListCell   *lc;
+
+		pubs = GetRelationColumnPartialPublications(relid);
+		foreach(lc, pubs)
+		{
+			Oid			pubid = lfirst_oid(lc);
+			List	   *published_cols;
+
+			published_cols =
+				GetRelationColumnListInPublication(relid, pubid);
+
+			if (list_member_oid(published_cols, attnum))
+				ereport(ERROR,
+						errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+						errmsg("cannot drop column \"%s\" because it is part of publication \"%s\"",
+							   colName, get_publication_name(pubid, false)),
+						errhint("Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication."));
+		}
+	}
+
 	/*
 	 * Propagate to children as appropriate.  Unlike most other ALTER
 	 * routines, we have to do this one level of recursion at a time; we can't
 	 * use find_all_inheritors to do it in one pass.
 	 */
 	children =
-		find_inheritance_children(RelationGetRelid(rel), lockmode);
+		find_inheritance_children(relid, lockmode);
 
 	if (children)
 	{
@@ -8514,7 +8544,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	/* Add object to delete */
 	object.classId = RelationRelationId;
-	object.objectId = RelationGetRelid(rel);
+	object.objectId = relid;
 	object.objectSubId = attnum;
 	add_exact_object_address(&object, addrs);
 
@@ -15581,6 +15611,7 @@ relation_mark_replica_identity(Relation rel, char ri_type, Oid indexOid,
 			CatalogTupleUpdate(pg_index, &pg_index_tuple->t_self, pg_index_tuple);
 			InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0,
 										 InvalidOid, is_internal);
+
 			/*
 			 * Invalidate the relcache for the table, so that after we commit
 			 * all sessions will refresh the table's replica identity index
@@ -15603,6 +15634,11 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 	Oid			indexOid;
 	Relation	indexRel;
 	int			key;
+	List	   *pubs;
+	Bitmapset  *indexed_cols = NULL;
+	ListCell   *lc;
+
+	pubs = GetRelationColumnPartialPublications(RelationGetRelid(rel));
 
 	if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT)
 	{
@@ -15611,11 +15647,16 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 	}
 	else if (stmt->identity_type == REPLICA_IDENTITY_FULL)
 	{
+		if (pubs != NIL)
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("cannot set REPLICA IDENTITY FULL when publications contain relations that specify column lists"));
 		relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
 		return;
 	}
 	else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING)
 	{
+		/* XXX not sure what's the right check for publications here */
 		relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true);
 		return;
 	}
@@ -15700,6 +15741,38 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 					 errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable",
 							RelationGetRelationName(indexRel),
 							NameStr(attr->attname))));
+
+		/*
+		 * Collect columns used, in case we have any publications that we need
+		 * to vet.  System attributes are disallowed so no need to subtract
+		 * FirstLowInvalidHeapAttributeNumber.
+		 */
+		indexed_cols = bms_add_member(indexed_cols, attno);
+	}
+
+	/*
+	 * Check partial-column publications.  All publications have to include
+	 * all key columns of the new index.
+	 */
+	foreach(lc, pubs)
+	{
+		Oid			pubid = lfirst_oid(lc);
+		List	   *published_cols;
+
+		published_cols =
+			GetRelationColumnListInPublication(RelationGetRelid(rel), pubid);
+
+		for (key = 0; key < IndexRelationGetNumberOfKeyAttributes(indexRel); key++)
+		{
+			int16	attno = indexRel->rd_index->indkey.values[key];
+
+			if (!list_member_oid(published_cols, attno))
+				ereport(ERROR,
+						errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						errmsg("index \"%s\" cannot be used because publication \"%s\" does not include all indexed columns",
+							   RelationGetRelationName(indexRel),
+							   get_publication_name(pubid, false)));
+		}
 	}
 
 	/* This index is suitable for use as a replica identity. Mark it. */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..0ff4c1ceac 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from)
 	PublicationTable *newnode = makeNode(PublicationTable);
 
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(columns);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..d786a688ac 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(columns);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 6dddc07947..068f67998c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9740,12 +9740,13 @@ CreatePublicationStmt:
  * relation_expr here.
  */
 PublicationObjSpec:
-			TABLE relation_expr
+			TABLE relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $2;
+					$$->pubtable->columns = $3;
 				}
 			| ALL TABLES IN_P SCHEMA ColId
 				{
@@ -9760,28 +9761,38 @@ PublicationObjSpec:
 					$$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA;
 					$$->location = @5;
 				}
-			| ColId
+			| ColId opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
-					$$->name = $1;
+					if ($2 != NULL)
+					{
+						$$->pubtable = makeNode(PublicationTable);
+						$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+						$$->pubtable->columns = $2;
+						$$->name = NULL;
+					}
+					else
+						$$->name = $1;
 					$$->location = @1;
 				}
-			| ColId indirection
+			| ColId indirection opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+					$$->pubtable->columns = $3;
 					$$->location = @1;
 				}
 			/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
-			| extended_relation_expr
+			| extended_relation_expr opt_column_list
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $1;
+					$$->pubtable->columns = $2;
 				}
 			| CURRENT_SCHEMA
 				{
@@ -9807,6 +9818,9 @@ pub_obj_list: 	PublicationObjSpec
  *
  * ALTER PUBLICATION name SET pub_obj [, ...]
  *
+ * ALTER PUBLICATION name SET COLUMNS table_name (column[, ...])
+ * ALTER PUBLICATION name SET COLUMNS table_name ALL
+ *
  * pub_obj is one of:
  *
  *		TABLE table_name [, ...]
@@ -9840,6 +9854,32 @@ AlterPublicationStmt:
 					n->action = AP_SetObjects;
 					$$ = (Node *)n;
 				}
+			| ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS '(' columnList ')'
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					PublicationObjSpec *obj = makeNode(PublicationObjSpec);
+					obj->pubobjtype = PUBLICATIONOBJ_TABLE;
+					obj->pubtable = makeNode(PublicationTable);
+					obj->pubtable->relation = $6;
+					obj->pubtable->columns = $10;
+					n->pubname = $3;
+					n->pubobjects = list_make1(obj);
+					n->action = AP_SetColumns;
+					$$ = (Node *) n;
+				}
+			| ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS ALL
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					PublicationObjSpec *obj = makeNode(PublicationObjSpec);
+					obj->pubobjtype = PUBLICATIONOBJ_TABLE;
+					obj->pubtable = makeNode(PublicationTable);
+					obj->pubtable->relation = $6;
+					obj->pubtable->columns = NIL;
+					n->pubname = $3;
+					n->pubobjects = list_make1(obj);
+					n->action = AP_SetColumns;
+					$$ = (Node *) n;
+				}
 			| ALTER PUBLICATION name DROP pub_obj_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
@@ -17444,6 +17484,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 		else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA ||
 				 pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA)
 		{
+			/*
+			 * This can happen if a column list is specified in a continuation
+			 * for a schema entry; reject it.
+			 */
+			if (pubobj->pubtable)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("column specification not allowed for schemas"),
+						parser_errposition(pubobj->location));
+
 			/*
 			 * We can distinguish between the different type of schema
 			 * objects based on whether name and pubtable is set.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9f5bf4b639..3428984130 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -29,9 +29,11 @@
 #define TRUNCATE_CASCADE		(1<<0)
 #define TRUNCATE_RESTART_SEQS	(1<<1)
 
-static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_attrs(StringInfo out, Relation rel,
+								   Bitmapset *columns);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple, bool binary);
+								   HeapTuple tuple, bool binary,
+								   Bitmapset *columns);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -398,7 +400,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple newtuple, bool binary)
+						HeapTuple newtuple, bool binary, Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -410,7 +412,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, columns);
 }
 
 /*
@@ -442,7 +444,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+						HeapTuple oldtuple, HeapTuple newtuple, bool binary,
+						Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -463,11 +466,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple, binary);
+		logicalrep_write_tuple(out, rel, oldtuple, binary, columns);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, columns);
 }
 
 /*
@@ -536,7 +539,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple, binary);
+	logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
 }
 
 /*
@@ -651,7 +654,8 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
  * Write relation description to the output stream.
  */
 void
-logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
+					 Bitmapset *columns)
 {
 	char	   *relname;
 
@@ -673,7 +677,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 	pq_sendbyte(out, rel->rd_rel->relreplident);
 
 	/* send the attribute info */
-	logicalrep_write_attrs(out, rel);
+	logicalrep_write_attrs(out, rel, columns);
 }
 
 /*
@@ -749,7 +753,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple,
+					   bool binary, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
@@ -761,7 +766,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 
 	for (i = 0; i < desc->natts; i++)
 	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+
+		/* Don't count attributes that are not to be sent. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
 			continue;
 		nliveatts++;
 	}
@@ -783,6 +794,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
+		/* Ignore attributes that are not to be sent. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+
 		if (isnull[i])
 		{
 			pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
@@ -904,7 +919,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
  * Write relation attribute metadata to the stream.
  */
 static void
-logicalrep_write_attrs(StringInfo out, Relation rel)
+logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	int			i;
@@ -914,20 +929,24 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 	desc = RelationGetDescr(rel);
 
-	/* send number of live attributes */
-	for (i = 0; i < desc->natts; i++)
-	{
-		if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
-			continue;
-		nliveatts++;
-	}
-	pq_sendint16(out, nliveatts);
-
 	/* fetch bitmap of REPLICATION IDENTITY attributes */
 	replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
 	if (!replidentfull)
 		idattrs = RelationGetIdentityKeyBitmap(rel);
 
+	/* send number of live attributes */
+	for (i = 0; i < desc->natts; i++)
+	{
+		Form_pg_attribute att = TupleDescAttr(desc, i);
+
+		if (att->attisdropped || att->attgenerated)
+			continue;
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+		nliveatts++;
+	}
+	pq_sendint16(out, nliveatts);
+
 	/* send the attributes */
 	for (i = 0; i < desc->natts; i++)
 	{
@@ -936,7 +955,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel)
 
 		if (att->attisdropped || att->attgenerated)
 			continue;
-
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
 		/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
 		if (replidentfull ||
 			bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..35f1294ae4 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -111,6 +111,7 @@
 #include "replication/origin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -697,17 +698,20 @@ fetch_remote_table_info(char *nspname, char *relname,
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
-	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID};
+	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
 	bool		isnull;
 	int			natt;
+	ListCell   *lc;
+	bool		am_partition;
+	Bitmapset  *included_cols = NULL;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
 
 	/* First fetch Oid and replica identity. */
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
+	appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition"
 					 "  FROM pg_catalog.pg_class c"
 					 "  INNER JOIN pg_catalog.pg_namespace n"
 					 "        ON (c.relnamespace = n.oid)"
@@ -737,14 +741,19 @@ fetch_remote_table_info(char *nspname, char *relname,
 	Assert(!isnull);
 	lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
 	Assert(!isnull);
+	am_partition = DatumGetBool(slot_getattr(slot, 4, &isnull));
+	Assert(!isnull);
 
 	ExecDropSingleTupleTableSlot(slot);
 	walrcv_clear_result(res);
 
-	/* Now fetch columns. */
+	/*
+	 * Now fetch column names and types.
+	 */
 	resetStringInfo(&cmd);
 	appendStringInfo(&cmd,
-					 "SELECT a.attname,"
+					 "SELECT a.attnum,"
+					 "       a.attname,"
 					 "       a.atttypid,"
 					 "       a.attnum = ANY(i.indkey)"
 					 "  FROM pg_catalog.pg_attribute a"
@@ -772,16 +781,92 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
 	lrel->attkeys = NULL;
 
+	/*
+	 * In server versions 15 and higher, obtain the applicable column filter,
+	 * if any.
+	 */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	{
+		WalRcvExecResult *pubres;
+		TupleTableSlot *slot;
+		Oid			attrsRow[] = {INT2OID};
+		StringInfoData publications;
+		bool		first = true;
+
+		initStringInfo(&publications);
+		foreach(lc, MySubscription->publications)
+		{
+			if (!first)
+				appendStringInfo(&publications, ", ");
+			appendStringInfoString(&publications, quote_literal_cstr(strVal(lfirst(lc))));
+			first = false;
+		}
+
+		resetStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "  SELECT pg_catalog.unnest(prattrs)\n"
+						 "    FROM pg_catalog.pg_publication p JOIN\n"
+						 "         pg_catalog.pg_publication_rel pr ON (p.oid = pr.prpubid)\n"
+						 "   WHERE p.pubname IN (%s) AND\n",
+						 publications.data);
+		if (!am_partition)
+			appendStringInfo(&cmd, "prrelid = %u", lrel->remoteid);
+		else
+			appendStringInfo(&cmd,
+							 "prrelid IN (SELECT relid\n"
+							 "    FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))",
+							 lrel->remoteid);
+
+		pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+							 lengthof(attrsRow), attrsRow);
+
+		if (pubres->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("could not fetch attribute info for table \"%s.%s\" from publisher: %s",
+							nspname, relname, pubres->err)));
+
+		slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot))
+		{
+			AttrNumber	attnum;
+
+			attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
+			if (isnull)
+				continue;
+			included_cols = bms_add_member(included_cols, attnum);
+		}
+		ExecDropSingleTupleTableSlot(slot);
+		pfree(publications.data);
+		walrcv_clear_result(pubres);
+	}
+
+	/*
+	 * Store the column names only if they are contained in column filter.
+	 * LogicalRepRelation will only contain attributes corresponding to those
+	 * specified in column filters.
+	 */
 	natt = 0;
 	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 	{
-		lrel->attnames[natt] =
-			TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		char	   *rel_colname;
+		AttrNumber	attnum;
+
+		attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
 		Assert(!isnull);
-		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull));
+
+		if (included_cols != NULL && !bms_is_member(attnum, included_cols))
+			continue;
+
+		rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 		Assert(!isnull);
-		if (DatumGetBool(slot_getattr(slot, 3, &isnull)))
+
+		lrel->attnames[natt] = rel_colname;
+		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
+		Assert(!isnull);
+
+		if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 			lrel->attkeys = bms_add_member(lrel->attkeys, natt);
 
 		/* Should never happen. */
@@ -791,12 +876,12 @@ fetch_remote_table_info(char *nspname, char *relname,
 
 		ExecClearTuple(slot);
 	}
+
 	ExecDropSingleTupleTableSlot(slot);
-
-	lrel->natts = natt;
-
 	walrcv_clear_result(res);
 	pfree(cmd.data);
+
+	lrel->natts = natt;
 }
 
 /*
@@ -829,8 +914,17 @@ copy_table(Relation rel)
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
 	if (lrel.relkind == RELKIND_RELATION)
-		appendStringInfo(&cmd, "COPY %s TO STDOUT",
+	{
+		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		for (int i = 0; i < lrel.natts; i++)
+		{
+			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			if (i < lrel.natts - 1)
+				appendStringInfoString(&cmd, ", ");
+		}
+		appendStringInfo(&cmd, ") TO STDOUT");
+	}
 	else
 	{
 		/*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..34df5d4956 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,19 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
 #include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
-									LogicalDecodingContext *ctx);
+									LogicalDecodingContext *ctx,
+									Bitmapset *columns);
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
@@ -130,6 +134,13 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+
+	/*
+	 * Set of columns included in the publication, or NULL if all columns are
+	 * included implicitly.  Note that the attnums in this list are not
+	 * shifted by FirstLowInvalidHeapAttributeNumber.
+	 */
+	Bitmapset  *columns;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -570,11 +581,11 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		}
 
 		MemoryContextSwitchTo(oldctx);
-		send_relation_and_attrs(ancestor, xid, ctx);
+		send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
 		RelationClose(ancestor);
 	}
 
-	send_relation_and_attrs(relation, xid, ctx);
+	send_relation_and_attrs(relation, xid, ctx, relentry->columns);
 
 	if (in_streaming)
 		set_schema_sent_in_streamed_txn(relentry, topxid);
@@ -587,7 +598,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
  */
 static void
 send_relation_and_attrs(Relation relation, TransactionId xid,
-						LogicalDecodingContext *ctx)
+						LogicalDecodingContext *ctx,
+						Bitmapset *columns)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	int			i;
@@ -610,13 +622,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 		if (att->atttypid < FirstGenbkiObjectId)
 			continue;
 
+		/* Skip if attribute is not present in column filter. */
+		if (columns != NULL && !bms_is_member(att->attnum, columns))
+			continue;
+
 		OutputPluginPrepareWrite(ctx, false);
 		logicalrep_write_typ(ctx->out, xid, att->atttypid);
 		OutputPluginWrite(ctx, false);
 	}
 
 	OutputPluginPrepareWrite(ctx, false);
-	logicalrep_write_rel(ctx->out, xid, relation);
+	logicalrep_write_rel(ctx->out, xid, relation, columns);
 	OutputPluginWrite(ctx, false);
 }
 
@@ -693,7 +709,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_insert(ctx->out, xid, relation, tuple,
-										data->binary);
+										data->binary, relentry->columns);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -722,7 +738,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-										newtuple, data->binary);
+										newtuple, data->binary, relentry->columns);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -1122,6 +1138,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
+	Oid			ancestor_id;
 	MemoryContext oldctx;
 
 	Assert(RelationSyncCache != NULL);
@@ -1142,6 +1159,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
+		entry->columns = NULL;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
 	}
@@ -1182,6 +1200,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
+			bool		ancestor_published = false;
 
 			if (pub->alltables)
 			{
@@ -1192,8 +1211,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
 			if (!publish)
 			{
-				bool		ancestor_published = false;
-
 				/*
 				 * For a partition, check if any of the ancestors are
 				 * published.  If so, note down the topmost ancestor that is
@@ -1219,6 +1236,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 											pub->oid))
 						{
 							ancestor_published = true;
+							ancestor_id = ancestor;
 							if (pub->pubviaroot)
 								publish_as_relid = ancestor;
 						}
@@ -1239,15 +1257,47 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			if (publish &&
 				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
 			{
+				Oid			relid;
+				HeapTuple	pub_rel_tuple;
+
+				relid = ancestor_published ? ancestor_id : publish_as_relid;
+				pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP,
+												ObjectIdGetDatum(relid),
+												ObjectIdGetDatum(pub->oid));
+
+				if (HeapTupleIsValid(pub_rel_tuple))
+				{
+					Datum		pub_rel_cols;
+					bool		isnull;
+
+					pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP,
+												   pub_rel_tuple,
+												   Anum_pg_publication_rel_prattrs,
+												   &isnull);
+					if (!isnull)
+					{
+						ArrayType  *arr;
+						int			nelems;
+						int16	   *elems;
+
+						arr = DatumGetArrayTypeP(pub_rel_cols);
+						nelems = ARR_DIMS(arr)[0];
+						elems = (int16 *) ARR_DATA_PTR(arr);
+
+						/* XXX is there a danger of memory leak here? beware */
+						oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+						for (int i = 0; i < nelems; i++)
+							entry->columns = bms_add_member(entry->columns,
+															elems[i]);
+						MemoryContextSwitchTo(oldctx);
+					}
+					ReleaseSysCache(pub_rel_tuple);
+				}
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
-
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1343,6 +1393,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
+		bms_free(entry->columns);
+		entry->columns = NULL;
 		if (entry->map)
 		{
 			/*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7af6dfa575..c7c5f3de66 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4055,6 +4055,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_oid;
 	int			i_prpubid;
 	int			i_prrelid;
+	int			i_prattrs;
 	int			i,
 				j,
 				ntups;
@@ -4066,8 +4067,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 
 	/* Collect all publication membership info. */
 	appendPQExpBufferStr(query,
-						 "SELECT tableoid, oid, prpubid, prrelid "
-						 "FROM pg_catalog.pg_publication_rel");
+						 "SELECT tableoid, oid, prpubid, prrelid");
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, ", prattrs");
+	else
+		appendPQExpBufferStr(query, ", NULL as prattrs");
+	appendPQExpBufferStr(query,
+						 " FROM pg_catalog.pg_publication_rel");
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
 	ntups = PQntuples(res);
@@ -4076,6 +4082,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	i_oid = PQfnumber(res, "oid");
 	i_prpubid = PQfnumber(res, "prpubid");
 	i_prrelid = PQfnumber(res, "prrelid");
+	i_prattrs = PQfnumber(res, "prattrs");
 
 	/* this allocation may be more than we need */
 	pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4117,6 +4124,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		pubrinfo[j].publication = pubinfo;
 		pubrinfo[j].pubtable = tbinfo;
 
+		if (!PQgetisnull(res, i, i_prattrs))
+		{
+			char	  **attnames;
+			int			nattnames;
+			PQExpBuffer attribs;
+
+			if (!parsePGArray(PQgetvalue(res, i, i_prattrs),
+							  &attnames, &nattnames))
+				fatal("could not parse %s array", "prattrs");
+			attribs = createPQExpBuffer();
+			for (int k = 0; k < nattnames; k++)
+			{
+				if (k > 0)
+					appendPQExpBufferStr(attribs, ", ");
+
+				appendPQExpBufferStr(attribs, fmtId(attnames[k]));
+			}
+			pubrinfo[i].pubrattrs = attribs->data;
+		}
+		else
+			pubrinfo[j].pubrattrs = NULL;
+
 		/* Decide whether we want to dump it */
 		selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
 
@@ -4191,10 +4220,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
 
 	query = createPQExpBuffer();
 
-	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
+	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ",
 					  fmtId(pubinfo->dobj.name));
-	appendPQExpBuffer(query, " %s;\n",
-					  fmtQualifiedDumpable(tbinfo));
+	appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrattrs)
+		appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index f9deb321ac..f3d3689ac9 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -630,6 +630,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	PublicationInfo *publication;
 	TableInfo  *pubtable;
+	char	   *pubrattrs;
 } PublicationRelInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c28788e84f..e9c2650b49 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -5815,7 +5815,7 @@ listPublications(const char *pattern)
  */
 static bool
 addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
-						   bool singlecol, printTableContent *cont)
+						   bool as_schema, printTableContent *cont)
 {
 	PGresult   *res;
 	int			count = 0;
@@ -5832,10 +5832,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
 
 	for (i = 0; i < count; i++)
 	{
-		if (!singlecol)
+		if (!as_schema)			/* as table */
+		{
 			printfPQExpBuffer(buf, "    \"%s.%s\"", PQgetvalue(res, i, 0),
 							  PQgetvalue(res, i, 1));
-		else
+			if (!PQgetisnull(res, i, 2))
+				appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2));
+		}
+		else					/* as schema */
 			printfPQExpBuffer(buf, "    \"%s\"", PQgetvalue(res, i, 0));
 
 		printTableAddFooter(cont, buf->data);
@@ -5963,8 +5967,20 @@ describePublications(const char *pattern)
 		{
 			/* Get the tables for the specified publication */
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
-							  "FROM pg_catalog.pg_class c,\n"
+							  "SELECT n.nspname, c.relname, \n");
+			if (pset.sversion >= 150000)
+				appendPQExpBufferStr(&buf,
+									 "       CASE WHEN pr.prattrs IS NOT NULL THEN\n"
+									 "       pg_catalog.array_to_string"
+									 "(ARRAY(SELECT attname\n"
+									 "         FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n"
+									 "              pg_catalog.pg_attribute\n"
+									 "        WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n"
+									 "       ELSE NULL END AS columns");
+			else
+				appendPQExpBufferStr(&buf, "NULL as columns");
+			appendPQExpBuffer(&buf,
+							  "\nFROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
 							  "WHERE c.relnamespace = n.oid\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index cf30239f6d..25c7c08040 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER PUBLICATION <name> ADD */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE"))
+		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
 	/* ALTER PUBLICATION <name> DROP */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 902f2f2f0d..edd4f0c63c 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
 typedef struct PublicationRelInfo
 {
 	Relation	relation;
+	List	   *columns;
 } PublicationRelInfo;
 
 extern Publication *GetPublication(Oid pubid);
@@ -109,6 +110,8 @@ typedef enum PublicationPartOpt
 } PublicationPartOpt;
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
+extern List *GetRelationColumnPartialPublications(Oid relid);
+extern List *GetRelationColumnListInPublication(Oid relid, Oid pubid);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 extern List *GetPublicationSchemas(Oid pubid);
@@ -127,6 +130,8 @@ extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *tar
 											  bool if_not_exists);
 extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
 											bool if_not_exists);
+extern void publication_set_table_columns(Relation pubrel, HeapTuple pubreltup,
+										  Relation targetrel, List *columns);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..51946cce59 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+#ifdef CATALOG_VARLEN			/* variable-length fields start here */
+	int2vector	prattrs;
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 593e301f7a..f98a78c016 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3642,6 +3642,7 @@ typedef struct PublicationTable
 {
 	NodeTag		type;
 	RangeVar   *relation;		/* relation to be published */
+	List	   *columns;		/* List of columns in a publication table */
 } PublicationTable;
 
 /*
@@ -3678,7 +3679,8 @@ typedef enum AlterPublicationAction
 {
 	AP_AddObjects,				/* add objects to publication */
 	AP_DropObjects,				/* remove objects from publication */
-	AP_SetObjects				/* set list of objects */
+	AP_SetObjects,				/* set list of objects */
+	AP_SetColumns				/* change list of columns for a table */
 } AlterPublicationAction;
 
 typedef struct AlterPublicationStmt
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 83741dcf42..7a5cb9871d 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple newtuple,
-									bool binary);
+									bool binary, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple, bool binary);
+									HeapTuple newtuple, bool binary, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
@@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
-								 Relation rel);
+								 Relation rel, Bitmapset *columns);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
 extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
 								 Oid typoid);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 12c5f67080..44de5fa8a2 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -165,7 +165,29 @@ Publications:
  regress_publication_user | t          | t       | t       | f       | f         | f
 (1 row)
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl5"
+DETAIL:  All columns in REPLICA IDENTITY must be present in the column list.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+ERROR:  cannot drop column "c" because it is part of publication "testpub_fortable"
+HINT:  Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication.
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5 (a);
+ERROR:  column list must not be specified in ALTER PUBLICATION ... DROP
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+ERROR:  invalid column list for publishing relation "testpub_tbl6"
+DETAIL:  Cannot have column filter on relations with REPLICA IDENTITY FULL.
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
+ALTER PUBLICATION testpub_fortable
+  ALTER TABLE testpub_tbl6 SET COLUMNS (a, b, c);	-- error
+ERROR:  cannot change column set for relation "testpub_tbl6"
+DETAIL:  Cannot have column filter on relations with REPLICA IDENTITY FULL.
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 CREATE TABLE testpub_tbl3 (a int);
 CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3);
@@ -670,6 +692,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes
 Tables from schemas:
     "pub_test1"
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ERROR:  syntax error at or near "("
+LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+                                                                ^
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+ERROR:  column specification not allowed for schemas
+LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)...
+                                                             ^
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 56dd358554..b625d161cb 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -89,7 +89,21 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall
 \d+ testpub_tbl2
 \dRp+ testpub_foralltables
 
-DROP TABLE testpub_tbl2;
+CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5 (a);
+
+CREATE TABLE testpub_tbl6 (a int, b text, c text);
+ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok
+ALTER PUBLICATION testpub_fortable
+  ALTER TABLE testpub_tbl6 SET COLUMNS (a, b, c);	-- error
+
+DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 
 CREATE TABLE testpub_tbl3 (a int);
@@ -362,6 +376,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem
 ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1;
 \dRp+ testpub1_forschema
 
+-- Verify that it fails to add a schema with a column specification
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b);
+ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b);
+
 -- cleanup pub_test1 schema for invalidation tests
 ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1;
 DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable;
diff --git a/src/test/subscription/t/028_column_list.pl b/src/test/subscription/t/028_column_list.pl
new file mode 100644
index 0000000000..f2b26176ed
--- /dev/null
+++ b/src/test/subscription/t/028_column_list.pl
@@ -0,0 +1,164 @@
+# Copyright (c) 2022, PostgreSQL Global Development Group
+
+# Test partial-column publication of tables
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 9;
+
+# setup
+
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int);
+	INSERT INTO tab2 VALUES (2, 'foo', 2);");
+# Test with weird column names
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)");
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+#Test replication with multi-level partition
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)");
+
+my $result = $node_publisher->safe_psql('postgres',
+	"select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
+is($result, qq(tab1|1 2
+tab3|1 3
+test_part|1 2), 'publication relation updated');
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab1 VALUES (1,2,3)");
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab3 VALUES (1,2,3)");
+#Test for replication of partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')");
+#Test for replication of multi-level partition data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')");
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column tab1.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab3");
+is($result, qq(1|3), 'insert on column tab3.b is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part");
+is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column tab1.c is not replicated');
+
+# Verify user-defined types
+$node_publisher->safe_psql('postgres',
+	qq{CREATE TYPE test_typ AS ENUM ('blue', 'red');
+	CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, c int, d text);
+	ALTER PUBLICATION pub1 ADD TABLE test_tab4 (a, b, d);
+	});
+$node_subscriber->safe_psql('postgres',
+	qq{CREATE TYPE test_typ AS ENUM ('blue', 'red');
+	CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, d text);
+	});
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_tab4 VALUES (1, 'red', 3, 'oh my');");
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1,'abc',3)");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab2 SET c = 5 where a = 2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2 WHERE a = 1");
+is($result, qq(1|abc), 'insert on column tab2.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2 WHERE a = 2");
+is($result, qq(2|foo), 'update on column tab2.c is not replicated');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_tab4");
+is($result, qq(1|red|oh my), 'insert on table with user-defined type');
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b)");
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d)");
+$node_subscriber->safe_psql('postgres',    "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3");
+$node_publisher->wait_for_catchup('sub2');
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 11, 111, 1111)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (2, 22, 222, 2222)");
+$node_publisher->wait_for_catchup('sub2');
+is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5;"),
+   qq(1|11|1111
+2|22|2222),
+   'overlapping publications with overlapping column lists');
-- 
2.30.2

Reply via email to