Hi,

Creating/altering subscription is successful when we specify a
publication which does not exist in the publisher. I felt we should
throw an error in this case, that will help the user to check if there
is any typo in the create subscription command or to create the
publication before the subscription is created.
If the above analysis looks correct, then please find a patch that
checks if the specified publications are present in the publisher and
throws an error if any of the publications is missing in the
publisher.
Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
From 2e7a6e41f789f7f1717058e9c78441ae8d5faf9e Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@enterprisedb.com>
Date: Thu, 21 Jan 2021 18:38:54 +0530
Subject: [PATCH v1] Identify missing publications from publisher while
 create/alter subscription.

Creating/altering subscription is successful when we specify a publication which
does not exist in the publisher. This patch checks if the specified publications
are present in the publisher and throws an error if any of the publication is
missing in the publisher.
---
 src/backend/commands/subscriptioncmds.c | 87 +++++++++++++++++++++++++++++++++
 src/test/subscription/t/100_bugs.pl     | 46 ++++++++++++++++-
 2 files changed, 132 insertions(+), 1 deletion(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 082f785..16c89cb 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -46,6 +46,7 @@
 #include "utils/syscache.h"
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_publications(WalReceiverConn *wrconn, List *publications);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -490,6 +491,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 
 		PG_TRY();
 		{
+			check_publications(wrconn, publications);
 			/*
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
@@ -576,6 +578,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
 		ereport(ERROR,
 				(errmsg("could not connect to the publisher: %s", err)));
 
+	check_publications(wrconn, sub->publications);
+
 	/* Get the table list from publisher. */
 	pubrel_names = fetch_table_list(wrconn, sub->publications);
 
@@ -1211,6 +1215,89 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 }
 
 /*
+ * Verify that the specified publication(s) exists in the publisher.
+ */
+static void
+check_publications(WalReceiverConn *wrconn, List *publications)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	StringInfoData nonExistentPublications;
+	bool		first;
+	TupleTableSlot *slot;
+	ListCell   *lc;
+	List	   *publicationsCopy = NIL;
+	Oid			tableRow[1] = {TEXTOID};
+
+	Assert(list_length(publications) > 0);
+	initStringInfo(&cmd);
+	appendStringInfoString(&cmd, "SELECT t.pubname\n"
+						   "  FROM pg_catalog.pg_publication t\n"
+						   " WHERE t.pubname IN (");
+	first = true;
+	foreach(lc, publications)
+	{
+		char	   *pubname = strVal(lfirst(lc));
+
+		if (first)
+			first = false;
+		else
+			appendStringInfoString(&cmd, ", ");
+
+		appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+	}
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errmsg("could not receive list of publications from the publisher: %s",
+						res->err)));
+
+	publicationsCopy = list_copy(publications);
+
+	/* Process publications. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *pubname;
+		bool		isnull;
+
+		pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+
+		/* Delete the publication present in publisher from the list. */
+		publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
+		ExecClearTuple(slot);
+	}
+
+	walrcv_clear_result(res);
+	if (list_length(publicationsCopy))
+	{
+		first = true;
+
+		/* Convert the publications which does not exist into a string. */
+		initStringInfo(&nonExistentPublications);
+		foreach(lc, publicationsCopy)
+		{
+			char	   *pubname = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&nonExistentPublications, ", ");
+			appendStringInfoString(&nonExistentPublications, pubname);
+		}
+
+		ereport(ERROR,
+				(errmsg("publication(s) %s does not exist in the publisher",
+						nonExistentPublications.data)));
+	}
+}
+
+/*
  * Get the list of tables which belong to specified publications on the
  * publisher connection.
  */
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index d1e407a..54e082d 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 5;
+use Test::More tests => 9;
 
 # Bug #15114
 
@@ -153,3 +153,47 @@ is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
 	$rows * 2, "2x$rows rows in t");
 is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
 	$rows * 2, "2x$rows rows in t2");
+
+# Create subcription for a publication which does not exist.
+$node_publisher = get_new_node('testpublisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+$node_subscriber = get_new_node('testsubscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+        "CREATE PUBLICATION testpub1 FOR ALL TABLES");
+
+$node_subscriber->safe_psql('postgres',
+        "CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher_connstr' PUBLICATION testpub1");
+
+# Specified publication does not exist.
+my ($ret, $stdout, $stderr) = $node_subscriber->psql(
+        'postgres', "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist");
+ok($stderr =~ /ERROR:  publication\(s\) pub_doesnt_exist does not exist in the publisher/,
+        "Create subscription for non existent publication fails");
+
+# One of the specified publication exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql(
+        'postgres', "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION testpub1, pub_doesnt_exist");
+ok($stderr =~ /ERROR:  publication\(s\) pub_doesnt_exist does not exist in the publisher/,
+        "Create subscription for non existent publication fails");
+
+# Multiple publications does not exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql(
+        'postgres', "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher_connstr' PUBLICATION pub_doesnt_exist, pub_doesnt_exist1");
+ok($stderr =~ /ERROR:  publication\(s\) pub_doesnt_exist, pub_doesnt_exist1 does not exist in the publisher/,
+        "Create subscription for non existent publication fails");
+
+# Specified publication does not exist.
+($ret, $stdout, $stderr) = $node_subscriber->psql(
+        'postgres', "ALTER SUBSCRIPTION testsub1 SET PUBLICATION pub_doesnt_exist");
+ok($stderr =~ /ERROR:  publication\(s\) pub_doesnt_exist does not exist in the publisher/,
+        "Alter subscription for non existent publication fails");
+
+$node_publisher->stop('fast');
+$node_subscriber->stop('fast');
-- 
1.8.3.1

Reply via email to