From e721cb83be93727e8e874a74708cb13b36c7d114 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Tue, 16 Feb 2021 07:16:54 +0530
Subject: [PATCH v6] Introduce a new syntax to add/drop publications

At present, if we want to update publications in subscription, we can
use SET PUBLICATION, however, it requires supply all publications that
exists and the new publications if we want to add new publications, it's
inconvenient.  The new syntax only supply the new publications.  When
the refresh is true, it only refresh the new publications.
---
 src/backend/commands/subscriptioncmds.c | 146 ++++++++++++++++++++++++
 src/backend/parser/gram.y               |  20 ++++
 src/include/nodes/parsenodes.h          |   2 +
 3 files changed, 168 insertions(+)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e5ae4534ae..15d432278c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -47,6 +47,7 @@
 #include "utils/syscache.h"
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *merge_subpublications(HeapTuple tuple, List *newpublist, bool addpub);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 
 
@@ -964,6 +965,53 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+		case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
+			{
+				bool    copy_data = false;
+				bool    isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+				bool    refresh;
+				List   *publist = NIL;
+
+				publist = merge_subpublications(tup, stmt->publication, isadd);
+
+				parse_subscription_options(stmt->options,
+										   NULL,	/* no "connect" */
+										   NULL, NULL,	/* no "enabled" */
+										   NULL,	/* no "create_slot" */
+										   NULL, NULL,	/* no "slot_name" */
+										   isadd ? &copy_data : NULL,	/* for drop, no "copy_data" */
+										   NULL,	/* no "synchronous_commit" */
+										   &refresh,
+										   NULL, NULL,	/* no "binary" */
+										   NULL, NULL); /* no "streaming" */
+
+				values[Anum_pg_subscription_subpublications - 1] =
+					publicationListToArray(publist);
+				replaces[Anum_pg_subscription_subpublications - 1] = true;
+
+				update_tuple = true;
+
+				/* Refresh if user asked us to. */
+				if (refresh)
+				{
+					if (!sub->enabled)
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
+								 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
+
+					PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
+
+					/* Only refresh the added/dropped list of publications. */
+					sub->publications = stmt->publication;
+
+					AlterSubscription_refresh(sub, copy_data);
+				}
+
+				break;
+			}
+
 		case ALTER_SUBSCRIPTION_REFRESH:
 			{
 				bool		copy_data;
@@ -1551,3 +1599,101 @@ ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
 			 errhint("Use %s to disassociate the subscription from the slot.",
 					 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
 }
+
+/*
+ * Merge ccurrent subscription's publications and user specified publications
+ * by ADD/DROP PUBLICATIONS.
+ *
+ * If isadd == true, we will add the list of publications into current
+ * subscription's publications.  Otherwise, we will delete the list of
+ * publications from current subscription's publications.
+ */
+static List *
+merge_subpublications(HeapTuple tuple, List *newpublist, bool addpub)
+{
+	Datum	datum;
+	bool	isnull;
+	List   *publist = NIL;
+	StringInfoData	errstr;
+	int		errstrcnt = 0;
+	ListCell	*lc;
+
+	/* Get publications */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tuple,
+							Anum_pg_subscription_subpublications,
+							&isnull);
+	Assert(!isnull);
+
+	publist = textarray_to_stringlist(DatumGetArrayTypeP(datum));
+	initStringInfo(&errstr);
+
+	foreach(lc, newpublist)
+	{
+		char		*name = strVal(lfirst(lc));
+		ListCell	*cell = NULL;
+
+		foreach(cell, publist)
+		{
+			char	*pubname = strVal(lfirst(cell));
+
+			if (strcmp(name, pubname) == 0)
+			{
+				if (addpub)
+				{
+					errstrcnt++;
+
+					if (errstrcnt == 1)
+						appendStringInfo(&errstr, _("\"%s\""), name);
+					else
+						appendStringInfo(&errstr, _(", \"%s\""), name);
+				}
+				else
+					publist = list_delete_cell(publist, cell);
+
+				break;
+			}
+		}
+
+		if (addpub && cell == NULL)
+			publist = lappend(publist, makeString(name));
+		else if (!addpub && cell == NULL)
+		{
+			errstrcnt++;
+
+			if (errstrcnt == 1)
+				appendStringInfo(&errstr, _("\"%s\""), name);
+			else
+				appendStringInfo(&errstr, _(", \"%s\""), name);
+		}
+	}
+
+	if (errstrcnt >= 1)
+	{
+		if (addpub)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg_plural("publication %s is already present in the subscription",
+								   "publications %s are already present in the subscription",
+									errstrcnt,
+									errstr.data)));
+		}
+		else
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg_plural("publication %s doesn't exist in the subscription",
+								   "publications %s do not exist in the subscription",
+									errstrcnt,
+									errstr.data)));
+		}
+	}
+
+	if (publist == NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("subscription must contain at least one publication")));
+
+	return publist;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index dd72a9fc3c..e45f98d353 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9609,6 +9609,26 @@ AlterSubscriptionStmt:
 					n->options = $6;
 					$$ = (Node *)n;
 				}
+			| ALTER SUBSCRIPTION name ADD_P PUBLICATION name_list opt_definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_ADD_PUBLICATION;
+					n->subname = $3;
+					n->publication = $6;
+					n->options = $7;
+					$$ = (Node *)n;
+				}
+			| ALTER SUBSCRIPTION name DROP PUBLICATION name_list opt_definition
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+					n->kind = ALTER_SUBSCRIPTION_DROP_PUBLICATION;
+					n->subname = $3;
+					n->publication = $6;
+					n->options = $7;
+					$$ = (Node *)n;
+				}
 			| ALTER SUBSCRIPTION name SET PUBLICATION name_list opt_definition
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 236832a2ca..e109607936 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3580,6 +3580,8 @@ typedef enum AlterSubscriptionType
 	ALTER_SUBSCRIPTION_OPTIONS,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_PUBLICATION,
+	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
+	ALTER_SUBSCRIPTION_DROP_PUBLICATION,
 	ALTER_SUBSCRIPTION_REFRESH,
 	ALTER_SUBSCRIPTION_ENABLED
 } AlterSubscriptionType;
-- 
2.25.1

