On 2021-Dec-10, Alvaro Herrera wrote:

> Actually it's not so easy to implement.

So I needed to add "sub object id" support for pg_publication_rel
objects in pg_depend / dependency.c.  What I have now is partial (the
describe routines need patched) but it's sufficient to show what's
needed.  In essence, we now set these depend entries with column
numbers, so that they can be dropped independently; when the drop comes,
the existing pg_publication_rel row is modified to cover the remaining
columns.  As far as I can tell, it works correctly.

There is one policy decision to make: what if ALTER TABLE drops the last
remaining column in the publication?  I opted to raise a specific error
in this case, though we could just the same opt to drop the relation
from the publication.  Are there opinions on this?

This version incorporates the fixups Peter submitted, plus some other
fixes of my own.  Notably, as Peter also mentioned, I changed
pg_publication_rel.prattrs to store int2vector rather than an array of
column names.  This makes for better behavior if columns are renamed and
things like that, and also we don't need to be so cautious about
quoting.  It does mean we need a slightly more complicated query in a
couple of spots, but that should be okay.

-- 
Álvaro Herrera           39°49'30"S 73°17'W  —  https://www.EnterpriseDB.com/
"Always assume the user will do much worse than the stupidest thing
you can imagine."                                (Julien PUYDT)
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index fe9c714257..a88d12e8ae 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -1472,7 +1472,7 @@ doDeletion(const ObjectAddress *object, int flags)
 			break;
 
 		case OCLASS_PUBLICATION_REL:
-			RemovePublicationRelById(object->objectId);
+			RemovePublicationRelById(object->objectId, object->objectSubId);
 			break;
 
 		case OCLASS_PUBLICATION:
@@ -2754,8 +2754,12 @@ free_object_addresses(ObjectAddresses *addrs)
 ObjectClass
 getObjectClass(const ObjectAddress *object)
 {
-	/* only pg_class entries can have nonzero objectSubId */
+	/*
+	 * only pg_class and pg_publication_rel entries can have nonzero
+	 * objectSubId
+	 */
 	if (object->classId != RelationRelationId &&
+		object->classId != PublicationRelRelationId &&
 		object->objectSubId != 0)
 		elog(ERROR, "invalid non-zero objectSubId for object class %u",
 			 object->classId);
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c
index 2bae3fbb17..5eed248dcb 100644
--- a/src/backend/catalog/objectaddress.c
+++ b/src/backend/catalog/objectaddress.c
@@ -4019,6 +4019,7 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok)
 				/* translator: first %s is, e.g., "table %s" */
 				appendStringInfo(&buffer, _("publication of %s in publication %s"),
 								 rel.data, pubname);
+				/* FIXME add objectSubId support */
 				pfree(rel.data);
 				ReleaseSysCache(tup);
 				break;
@@ -5853,9 +5854,16 @@ getObjectIdentityParts(const ObjectAddress *object,
 
 				getRelationIdentity(&buffer, prform->prrelid, objname, false);
 				appendStringInfo(&buffer, " in publication %s", pubname);
+				if (object->objectSubId)	/* FIXME maybe get_attname */
+					appendStringInfo(&buffer, " column %d", object->objectSubId);
 
 				if (objargs)
+				{
 					*objargs = list_make1(pubname);
+					if (object->objectSubId)
+						*objargs = lappend(*objargs,
+										   psprintf("%d", object->objectSubId));
+				}
 
 				ReleaseSysCache(tup);
 				break;
diff --git a/src/backend/catalog/pg_depend.c b/src/backend/catalog/pg_depend.c
index 07bcdc463a..462c8efe70 100644
--- a/src/backend/catalog/pg_depend.c
+++ b/src/backend/catalog/pg_depend.c
@@ -658,6 +658,56 @@ isObjectPinned(const ObjectAddress *object)
  * Various special-purpose lookups and manipulations of pg_depend.
  */
 
+/*
+ * Find all objects of the given class that reference the specified object,
+ * and add them to the given ObjectAddresses.
+ */
+void
+findAndAddAddresses(ObjectAddresses *addrs, Oid classId,
+					Oid refclassId, Oid refobjectId, int32 refobjsubId)
+{
+	Relation	depRel;
+	ScanKeyData	key[3];
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	depRel = table_open(DependRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_depend_refclassid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(refclassId));
+	ScanKeyInit(&key[1],
+				Anum_pg_depend_refobjid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(refobjectId));
+	ScanKeyInit(&key[2],
+				Anum_pg_depend_refobjsubid,
+				BTEqualStrategyNumber, F_INT4EQ,
+				Int32GetDatum(refobjsubId));
+
+	scan = systable_beginscan(depRel, DependReferenceIndexId, true,
+							  NULL, 3, key);
+
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_depend depform = (Form_pg_depend) GETSTRUCT(tup);
+		ObjectAddress	object;
+
+		if (depform->classid != classId)
+			continue;
+
+		ObjectAddressSubSet(object, depform->classid, depform->objid,
+							depform->refobjsubid);
+
+		add_exact_object_address(&object, addrs);
+	}
+
+	systable_endscan(scan);
+
+	table_close(depRel, AccessShareLock);
+}
+
 
 /*
  * Find the extension containing the specified object, if any
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index a5229aea51..b99b3748c9 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -328,6 +328,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	{
 		table_close(rel, RowExclusiveLock);
 
+		/* FIXME need to handle the case of different column list */
+
 		if (if_not_exists)
 			return InvalidObjectAddress;
 
@@ -395,12 +397,6 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 
 	ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
 
-	while ((attnum = bms_first_member(attmap)) >= 0)
-	{
-		/* Add dependency on the column */
-		ObjectAddressSubSet(referenced, RelationRelationId, relid, attnum);
-		recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
-	}
 	/* Add dependency on the publication */
 	ObjectAddressSet(referenced, PublicationRelationId, pubid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
@@ -409,6 +405,21 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/*
+	 * If there's an explicit column list, make one dependency entry for each
+	 * column.  Note that the referencing side of the dependency is also
+	 * specific to one column, so that it can be dropped separately if the
+	 * column is dropped.
+	 */
+	while ((attnum = bms_first_member(attmap)) >= 0)
+	{
+		ObjectAddressSubSet(referenced, RelationRelationId, relid,
+							attnum + FirstLowInvalidHeapAttributeNumber);
+		myself.objectSubId = attnum + FirstLowInvalidHeapAttributeNumber;
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+	myself.objectSubId = 0;		/* need to undo this bit */
+
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 753df44613..0078c5986c 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -561,7 +561,6 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 
 				pubrel = palloc(sizeof(PublicationRelInfo));
 				pubrel->relation = oldrel;
-				/* This is not needed to delete a table */
 				pubrel->columns = NIL;
 				delrels = lappend(delrels, pubrel);
 			}
@@ -758,10 +757,11 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
 }
 
 /*
- * Remove relation from publication by mapping OID.
+ * Remove relation from publication by mapping OID, or publication status
+ * of one column of that relation in the publication if an attnum is given.
  */
 void
-RemovePublicationRelById(Oid proid)
+RemovePublicationRelById(Oid proid, int32 attnum)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -791,7 +791,81 @@ RemovePublicationRelById(Oid proid)
 
 	InvalidatePublicationRels(relids);
 
-	CatalogTupleDelete(rel, &tup->t_self);
+	/*
+	 * If no column is given, simply delete the relation from the publication.
+	 *
+	 * If a column is given, what we do instead is to remove that column from
+	 * the column list.  The relation remains in the publication, with the
+	 * other columns.  However, dropping the last column is disallowed.
+	 */
+	if (attnum == 0)
+	{
+		CatalogTupleDelete(rel, &tup->t_self);
+	}
+	else
+	{
+		Datum		adatum;
+		ArrayType  *arr;
+		int			nelems;
+		int16	   *elems;
+		int16	   *newelems;
+		int2vector *newvec;
+		Datum		values[Natts_pg_publication_rel];
+		bool		nulls[Natts_pg_publication_rel];
+		bool		replace[Natts_pg_publication_rel];
+		HeapTuple	newtup;
+		int			i,
+					j;
+		bool		isnull;
+
+		/* Obtain the original column list */
+		adatum = SysCacheGetAttr(PUBLICATIONRELMAP,
+								 tup,
+								 Anum_pg_publication_rel_prattrs,
+								 &isnull);
+		if (isnull)			/* shouldn't happen */
+			elog(ERROR, "can't drop column from publication without a column list");
+		arr = DatumGetArrayTypeP(adatum);
+		nelems = ARR_DIMS(arr)[0];
+		elems = (int16 *) ARR_DATA_PTR(arr);
+
+		/* Construct a list excluding the given column */
+		newelems = palloc(sizeof(int16) * nelems - 1);
+		for (i = 0, j = 0; i < nelems - 1; i++)
+		{
+			if (elems[i] == attnum)
+				continue;
+			newelems[j++] = elems[i];
+		}
+
+		/*
+		 * If this is the last column used in the publication, disallow the
+		 * command. We could alternatively just drop the relation from the
+		 * publication.
+		 */
+		if (j == 0)
+		{
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("cannot drop the last column in publication \"%s\"",
+						   get_publication_name(pubrel->prpubid, false)),
+					errhint("Remove table \"%s\" from the publication first.",
+							get_rel_name(pubrel->prrelid)));
+		}
+
+		/* Build the updated tuple */
+		MemSet(values, 0, sizeof(values));
+		MemSet(nulls, false, sizeof(nulls));
+		MemSet(replace, false, sizeof(replace));
+		newvec = buildint2vector(newelems, j);
+		values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(newvec);
+		replace[Anum_pg_publication_rel_prattrs - 1] = true;
+
+		/* Execute the update */
+		newtup = heap_modify_tuple(tup, RelationGetDescr(rel),
+								   values, nulls, replace);
+		CatalogTupleUpdate(rel, &tup->t_self, newtup);
+	}
 
 	ReleaseSysCache(tup);
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c821271306..705bddc773 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -40,8 +40,9 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
-#include "catalog/pg_tablespace.h"
+#include "catalog/pg_publication_rel.h"
 #include "catalog/pg_statistic_ext.h"
+#include "catalog/pg_tablespace.h"
 #include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
 #include "catalog/storage.h"
@@ -8415,6 +8416,13 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
 
 	ReleaseSysCache(tuple);
 
+	/*
+	 * If the column is part of a replication column list, arrange to get that
+	 * removed too.
+	 */
+	findAndAddAddresses(addrs, PublicationRelRelationId,
+						RelationRelationId, RelationGetRelid(rel), attnum);
+
 	/*
 	 * Propagate to children as appropriate.  Unlike most other ALTER
 	 * routines, we have to do this one level of recursion at a time; we can't
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index dfb5d0430c..f9f9ecd0c0 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -621,6 +621,8 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 		 * filter. XXX Allow sending type information for REPLICA IDENTITY
 		 * COLUMNS with user created type. even when they are not mentioned in
 		 * column filters.
+		 *
+		 * FIXME -- this code seems not verified by tests.
 		 */
 		if (att_map != NULL &&
 			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 2f412ca3db..84ee807e0b 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1648,6 +1648,8 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER PUBLICATION <name> ADD */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE"))
+		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
 	/* ALTER PUBLICATION <name> DROP */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
 		COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE");
diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h
index 3eca295ff4..76d421e09e 100644
--- a/src/include/catalog/dependency.h
+++ b/src/include/catalog/dependency.h
@@ -214,6 +214,9 @@ extern long changeDependenciesOf(Oid classId, Oid oldObjectId,
 extern long changeDependenciesOn(Oid refClassId, Oid oldRefObjectId,
 								 Oid newRefObjectId);
 
+extern void findAndAddAddresses(ObjectAddresses *addrs, Oid classId,
+					Oid refclassId, Oid refobjectId, int32 refobjsubId);
+
 extern Oid	getExtensionOfObject(Oid classId, Oid objectId);
 extern List *getAutoExtensionsOfObject(Oid classId, Oid objectId);
 
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index 4ba68c70ee..23f037df7f 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -25,7 +25,7 @@
 extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt);
 extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt);
 extern void RemovePublicationById(Oid pubid);
-extern void RemovePublicationRelById(Oid proid);
+extern void RemovePublicationRelById(Oid proid, int32 attnum);
 extern void RemovePublicationSchemaById(Oid psoid);
 
 extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 93ef6e21eb..aef2f905a1 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -167,18 +167,32 @@ Publications:
 
 CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z);  -- error
-ERROR:  cannot add relation "testpub_tbl5" to publication
-DETAIL:  Column filter must include REPLICA IDENTITY columns
-ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error FIXME
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
+ERROR:  column "x" of relation "testpub_tbl5" does not exist
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
-ERROR:  relation "testpub_tbl5" is already member of publication "testpub_fortable"
+ERROR:  invalid column list for publishing relation "testpub_tbl5"
+DETAIL:  All columns in REPLICA IDENTITY must be present in the column list.
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
-ERROR:  relation "testpub_tbl5" is already member of publication "testpub_fortable"
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+\dRp+ testpub_fortable
+                                Publication testpub_fortable
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_tbl5" (a)
+Tables from schemas:
+    "pub_test"
+
+ALTER TABLE testpub_tbl5 DROP COLUMN a;
+ERROR:  cannot drop the last column in publication "testpub_fortable"
+HINT:  Remove table "testpub_tbl5" from the publication first.
 CREATE TABLE testpub_tbl6 (a int, b text, c text);
 ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c);  -- error
-ERROR:  cannot add relation "testpub_tbl6" to publication
-DETAIL:  Cannot have column filter with REPLICA IDENTITY FULL
+ERROR:  invalid column list for publishing relation "testpub_tbl6"
+DETAIL:  Cannot have column filter on relations with REPLICA IDENTITY FULL.
 DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6;
 DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema;
 CREATE TABLE testpub_tbl3 (a int);
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index eb0e71ea62..18b87803c0 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -91,9 +91,12 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall
 
 CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text);
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (x, y, z);  -- error
-ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error FIXME
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x);  -- error
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);  -- error
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);  -- ok
+ALTER TABLE testpub_tbl5 DROP COLUMN c;
+\dRp+ testpub_fortable
+ALTER TABLE testpub_tbl5 DROP COLUMN a;
 
 CREATE TABLE testpub_tbl6 (a int, b text, c text);
 ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL;
diff --git a/src/test/subscription/t/021_column_filter.pl b/src/test/subscription/t/021_column_filter.pl
index 27d0537621..354e6ac363 100644
--- a/src/test/subscription/t/021_column_filter.pl
+++ b/src/test/subscription/t/021_column_filter.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
-use Test::More tests => 8;
+use Test::More tests => 10;
 
 # setup
 
@@ -129,14 +129,23 @@ $node_publisher->safe_psql('postgres',
 	"UPDATE tab2 SET c = 5 where a = 1");
 is($result, qq(1|abc), 'update on column c is not replicated');
 
-# Test error conditions
+# Test behavior when a column is dropped
 $node_publisher->safe_psql('postgres',
 	"ALTER TABLE test_part DROP COLUMN b");
 $result = $node_publisher->safe_psql('postgres',
-	"select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;");
-is($result, qq(tab1|1 2
+	"select prrelid::regclass, prattrs from pg_publication_rel pb;");
+is($result,
+	q(tab1|1 2
+tab3|1 3
 tab2|1 2
-tab3|1 3), 'publication relation test_part removed');
+test_part|1), 'column test_part.b removed');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_part VALUES (3, '2021-12-13 12:13:14')");
+$node_publisher->wait_for_catchup('sub1');
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT * FROM test_part WHERE a = 3");
+is($result, "3|", 'only column a is replicated');
 
 $node_publisher->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, c int, d int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY, b int, d int)");

Reply via email to