From 482dcd54aa6f2d46c96036ce325e75a4750d9e7a Mon Sep 17 00:00:00 2001
From: rahila <rahilasyed.90@gmail.com>
Date: Mon, 7 Jun 2021 16:27:21 +0530
Subject: [PATCH] Add column filtering to logical replication

Add capability to specifiy column names at while linking
the table to a publication at the time of CREATE or ALTER
publication. This will allows replicating only the specified
columns. Rest of the columns on the subscriber will be populated
locally. If column names are not specified all columns are
replicated. REPLICA IDENTITY columns are always replicated
irrespective of column names specification.
Add a tap test for the same in subscription folder.
---
 src/backend/catalog/pg_publication.c         | 20 ++++--
 src/backend/commands/publicationcmds.c       | 25 +++++--
 src/backend/parser/gram.y                    | 23 ++++--
 src/backend/replication/logical/proto.c      | 22 +++---
 src/backend/replication/pgoutput/pgoutput.c  | 62 +++++++++++++---
 src/include/catalog/pg_publication.h         |  9 ++-
 src/include/catalog/pg_publication_rel.h     |  4 ++
 src/include/nodes/nodes.h                    |  1 +
 src/include/nodes/parsenodes.h               |  6 ++
 src/include/replication/logicalproto.h       |  4 +-
 src/test/subscription/t/021_column_filter.pl | 76 ++++++++++++++++++++
 11 files changed, 216 insertions(+), 36 deletions(-)
 create mode 100644 src/test/subscription/t/021_column_filter.pl

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 86e415af89..0948998f5e 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -141,18 +141,20 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel);
+	Oid			relid = RelationGetRelid(targetrel->relation);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ListCell *lc;
+	List *target_cols = NIL;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -172,10 +174,10 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel), pub->name)));
+						RelationGetRelationName(targetrel->relation), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel);
+	check_publication_add_relation(targetrel->relation);
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -188,6 +190,14 @@ publication_add_relation(Oid pubid, Relation targetrel,
 		ObjectIdGetDatum(pubid);
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
+	foreach(lc, targetrel->columns)
+	{
+		char *colname;
+		colname = strVal(lfirst(lc));
+		target_cols = lappend(target_cols, colname);
+	}
+	values[Anum_pg_publication_rel_prattrs - 1] =
+		PointerGetDatum(strlist_to_textarray(target_cols));
 
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -209,7 +219,7 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	table_close(rel, RowExclusiveLock);
 
 	/* Invalidate relcache so that publication info is rebuilt. */
-	CacheInvalidateRelcache(targetrel);
+	CacheInvalidateRelcache(targetrel->relation);
 
 	return myself;
 }
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 95c253c8e0..c8abdbe1d6 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -515,10 +515,12 @@ OpenTableList(List *tables)
 	 */
 	foreach(lc, tables)
 	{
-		RangeVar   *rv = castNode(RangeVar, lfirst(lc));
+		PublicationTable *t = lfirst(lc);
+		RangeVar   *rv = castNode(RangeVar, t->relation);
 		bool		recurse = rv->inh;
 		Relation	rel;
 		Oid			myrelid;
+		PublicationRelationInfo	*pub_rel;
 
 		/* Allow query cancel in case this takes a long time */
 		CHECK_FOR_INTERRUPTS();
@@ -539,7 +541,11 @@ OpenTableList(List *tables)
 			continue;
 		}
 
-		rels = lappend(rels, rel);
+		pub_rel = palloc(sizeof(PublicationRelationInfo));
+		pub_rel->relation = rel;
+		pub_rel->relid = myrelid;
+		pub_rel->columns = t->columns;
+		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
 		/*
@@ -572,7 +578,11 @@ OpenTableList(List *tables)
 
 				/* find_all_inheritors already got lock */
 				rel = table_open(childrelid, NoLock);
-				rels = lappend(rels, rel);
+				pub_rel = palloc(sizeof(PublicationRelationInfo));
+				pub_rel->relation = rel;
+				pub_rel->relid = childrelid;
+				pub_rel->columns = t->columns;
+				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
 		}
@@ -593,9 +603,9 @@ CloseTableList(List *rels)
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationInfo *pub_rel = (PublicationRelationInfo *)lfirst(lc);
 
-		table_close(rel, NoLock);
+		table_close(pub_rel->relation, NoLock);
 	}
 }
 
@@ -612,7 +622,8 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 
 	foreach(lc, rels)
 	{
-		Relation	rel = (Relation) lfirst(lc);
+		PublicationRelationInfo *pub_rel = (PublicationRelationInfo *)lfirst(lc);
+		Relation	rel = pub_rel->relation;
 		ObjectAddress obj;
 
 		/* Must be owner of the table or superuser. */
@@ -620,7 +631,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 			aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
 						   RelationGetRelationName(rel));
 
-		obj = publication_add_relation(pubid, rel, if_not_exists);
+		obj = publication_add_relation(pubid, pub_rel, if_not_exists);
 		if (stmt)
 		{
 			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index eb24195438..3615ef4a46 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -426,14 +426,14 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
 				vacuum_relation_list opt_vacuum_relation_list
-				drop_option_list
+				drop_option_list publication_table_list
 
 %type <node>	opt_routine_body
 %type <groupclause> group_clause
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
+%type <node>	opt_publication_for_tables publication_for_tables publication_table
 
 %type <list>	opt_fdw_options fdw_options
 %type <defelt>	fdw_option
@@ -9612,7 +9612,7 @@ opt_publication_for_tables:
 		;
 
 publication_for_tables:
-			FOR TABLE relation_expr_list
+			FOR TABLE publication_table_list
 				{
 					$$ = (Node *) $3;
 				}
@@ -9622,6 +9622,21 @@ publication_for_tables:
 				}
 		;
 
+publication_table_list:
+			publication_table
+					{ $$ = list_make1($1); }
+		| publication_table_list ',' publication_table
+				{ $$ = lappend($1, $3); }
+		;
+
+publication_table: relation_expr opt_column_list
+		{
+			PublicationTable *n = makeNode(PublicationTable);
+			n->relation = $1;
+			n->columns = $2;
+			$$ = (Node *) n;
+		}
+	;
 
 /*****************************************************************************
  *
@@ -9643,7 +9658,7 @@ AlterPublicationStmt:
 					n->options = $5;
 					$$ = (Node *)n;
 				}
-			| ALTER PUBLICATION name ADD_P TABLE relation_expr_list
+			| ALTER PUBLICATION name ADD_P TABLE publication_table_list
 				{
 					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
 					n->pubname = $3;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1cf59e0fb0..d783d8e7c3 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -31,7 +31,7 @@
 
 static void logicalrep_write_attrs(StringInfo out, Relation rel);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
-								   HeapTuple tuple, bool binary);
+								   HeapTuple tuple, bool binary, Bitmapset *att_list);
 
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
@@ -140,7 +140,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple newtuple, bool binary)
+						HeapTuple newtuple, bool binary, Bitmapset *att_list)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -152,7 +152,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_list);
 }
 
 /*
@@ -184,7 +184,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  */
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
-						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+						HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_list)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -205,11 +205,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldtuple, binary);
+		logicalrep_write_tuple(out, rel, oldtuple, binary, att_list);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newtuple, binary);
+	logicalrep_write_tuple(out, rel, newtuple, binary, att_list);
 }
 
 /*
@@ -278,7 +278,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldtuple, binary);
+	logicalrep_write_tuple(out, rel, oldtuple, binary, NULL);
 }
 
 /*
@@ -491,7 +491,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  * Write a tuple to the outputstream, in the most efficient format possible.
  */
 static void
-logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary, Bitmapset *att_list)
 {
 	TupleDesc	desc;
 	Datum		values[MaxTupleAttributeNumber];
@@ -542,6 +542,12 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar
 			continue;
 		}
 
+		if (att_list != NULL && !(bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, att_list)))
+		{
+			pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
+			continue;
+		}
+
 		typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
 		if (!HeapTupleIsValid(typtup))
 			elog(ERROR, "cache lookup failed for type %u", att->atttypid);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index abd5217ab1..a04e307f4d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,12 +15,14 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel_d.h"
 #include "commands/defrem.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
@@ -70,6 +72,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
 										uint32 hashvalue);
 static void send_relation_and_attrs(Relation relation, TransactionId xid,
 									LogicalDecodingContext *ctx);
+static Bitmapset* get_table_columnset(Oid relid, List *columns, Bitmapset *att_list);
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
@@ -115,6 +118,7 @@ typedef struct RelationSyncEntry
 	 * having identical TupleDesc.
 	 */
 	TupleConversionMap *map;
+	Bitmapset *att_list;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -590,10 +594,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					if (relentry->map)
 						tuple = execute_attr_map_tuple(tuple, relentry->map);
 				}
-
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_insert(ctx->out, xid, relation, tuple,
-										data->binary);
+										data->binary, relentry->att_list);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -619,10 +622,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 														  relentry->map);
 					}
 				}
-
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_update(ctx->out, xid, relation, oldtuple,
-										newtuple, data->binary);
+										newtuple, data->binary, relentry->att_list);
 				OutputPluginWrite(ctx, true);
 				break;
 			}
@@ -1031,8 +1033,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->publish_as_relid = InvalidOid;
-		entry->map = NULL;		/* will be set by maybe_send_schema() if
-								 * needed */
+		entry->att_list = NULL;
+		entry->map = NULL;	/* will be set by maybe_send_schema() if needed */
 	}
 
 	/* Validate the entry */
@@ -1116,15 +1118,38 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			if (publish &&
 				(relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
 			{
+				int                             nelems, i;
+				bool isnull;
+				Datum *elems;
+				HeapTuple pub_rel_tuple;
+				Datum pub_rel_cols;
+				List *columns = NIL;
+
+				pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid),
+							ObjectIdGetDatum(pub->oid));
+				if (HeapTupleIsValid(pub_rel_tuple))
+				{
+					pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, pub_rel_tuple, Anum_pg_publication_rel_prattrs, &isnull);
+					if (!isnull)
+					{
+						oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+						deconstruct_array(DatumGetArrayTypePCopy(pub_rel_cols),
+									TEXTOID, -1, false, 'i',
+								&elems, NULL, &nelems);
+						for (i = 0; i < nelems; i++)
+							columns = lappend(columns, TextDatumGetCString(elems[i]));
+						entry->att_list = get_table_columnset(publish_as_relid, columns, entry->att_list);
+						MemoryContextSwitchTo(oldctx);
+					}
+					ReleaseSysCache(pub_rel_tuple);
+				}
 				entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
 				entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
 				entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
+
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1136,6 +1161,23 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 	return entry;
 }
 
+static Bitmapset*
+get_table_columnset(Oid relid, List *columns, Bitmapset *att_list)
+{
+	ListCell *cell;
+	foreach(cell, columns)
+	{
+		const char *attname = lfirst(cell);
+		int attnum = get_attnum(relid, attname);
+
+		if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, att_list))
+			att_list = bms_add_member(att_list,
+					attnum - FirstLowInvalidHeapAttributeNumber);
+
+	}
+	return att_list;
+}
+
 /*
  * Cleanup list of streamed transactions and update the schema_sent flag.
  *
@@ -1220,6 +1262,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
+		bms_free(entry->att_list);
+		entry->att_list = NULL;
 		if (entry->map)
 		{
 			/*
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index f332bad4d4..7bdc9bb9b8 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -83,6 +83,13 @@ typedef struct Publication
 	PublicationActions pubactions;
 } Publication;
 
+typedef struct PublicationRelationInfo
+{
+	Oid			relid;
+	Relation	relation;
+	List	*columns;
+} PublicationRelationInfo;
+
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
@@ -108,7 +115,7 @@ extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel,
 											  bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..d1d4eec2c0 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+#ifdef CATALOG_VARLEN
+	text			prattrs[1]; /* Variable length field starts here */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
@@ -40,6 +43,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
  */
 typedef FormData_pg_publication_rel *Form_pg_publication_rel;
 
+DECLARE_TOAST(pg_publication_rel, 8895, 8896);
 DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops));
 DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
 
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index d9e417bcd7..2037705f45 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -491,6 +491,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_PublicationTable,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index def9651b34..a17c1aa9f7 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3623,6 +3623,12 @@ typedef struct AlterTSConfigurationStmt
 	bool		missing_ok;		/* for DROP - skip error if missing? */
 } AlterTSConfigurationStmt;
 
+typedef struct PublicationTable
+{
+	NodeTag		type;
+	RangeVar	*relation;   /* relation to be published */
+	List	*columns;	/* List of columns in a publication table */
+} PublicationTable;
 
 typedef struct CreatePublicationStmt
 {
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 55b90c03ea..879c58c497 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -134,11 +134,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin,
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple newtuple,
-									bool binary);
+									bool binary, Bitmapset *att_list);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel, HeapTuple oldtuple,
-									HeapTuple newtuple, bool binary);
+									HeapTuple newtuple, bool binary, Bitmapset *att_list);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl
new file mode 100644
index 0000000000..b5c73bfc7d
--- /dev/null
+++ b/src/test/subscription/t/021_column_filter.pl
@@ -0,0 +1,76 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, b int, c int)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab1 (a int PRIMARY KEY, b int, c int)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int)");
+
+#Test create publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub1 FOR TABLE tab1(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+#Initial sync
+$node_publisher->wait_for_catchup('sub1');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab1 VALUES (1,2,3)");
+
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'insert on column c is not replicated');
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab1 SET c = 5 where a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab1");
+is($result, qq(1|2|), 'update on column c is not replicated');
+
+#Test alter publication with column filtering
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"
+);
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 VALUES (1,'abc',3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM tab2");
+is($result, qq(1|abc|), 'insert on column c is not replicated');
-- 
2.17.2 (Apple Git-113)

