Hi,

The subscription worker was not getting invalidated when the
subscription owner changed from superuser to non-superuser. Here is a
test case for the same:
Publisher:
   CREATE USER repl REPLICATION PASSWORD 'secret';
   CREATE TABLE t(i INT);
   INSERT INTO t VALUES(1);
   GRANT SELECT ON t TO repl;
   CREATE PUBLICATION p1 FOR TABLE t;

Subscriber (has a PGPASSFILE for user "repl"):
   CREATE USER u1 SUPERUSER;
   CREATE TABLE t(i INT);
   ALTER TABLE t OWNER TO u1;
   -- no password specified
   CREATE SUBSCRIPTION s1
     CONNECTION 'dbname=postgres host=127.0.0.1 port=5432 user=repl'
     PUBLICATION p1;

   ALTER USER u1 NOSUPERUSER: -- Change u1 user to non-superuser

Publisher:
INSERT INTO t VALUES(1);

Subscriber:
SELECT COUNT(*) FROM t; -- should have been 1 but is 2, the apply
worker has not exited after changing from superuser to non-superuser.

Fixed this issue by checking if the subscription owner has changed
from superuser to non-superuser in case the pg_authid rows changes.
The attached patch has the changes for the same.
Thanks to Jeff Davis for identifying this issue and reporting it at [1].

[1] - 
https://www.postgresql.org/message-id/5dff4caf26f45ce224a33a5e18e110b93a351b2f.camel%40j-davis.com

Regards,
Vignesh
From 9347e863cc78d328f7556db0e357787d14feae75 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Fri, 22 Sep 2023 15:12:23 +0530
Subject: [PATCH v1] Restart the apply worker if the subscription owner has
 changed from superuser to non-superuser.

Restart the apply worker if the subscription owner has changed from
superuser to non-superuser. This is required so that the subscription
connection string gets revalidated to identify cases where the
password option is not specified as part of the connection string for
non-superuser.
---
 src/backend/replication/logical/worker.c   | 23 +++++++++++++++++++---
 src/include/catalog/pg_subscription.h      |  1 +
 src/test/subscription/t/027_nosuperuser.pl | 21 ++++++++++++++++++++
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 597947410f..29197d86cb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3942,7 +3942,8 @@ maybe_reread_subscription(void)
 	 * The launcher will start a new worker but note that the parallel apply
 	 * worker won't restart if the streaming option's value is changed from
 	 * 'parallel' to any other value or the server decides not to stream the
-	 * in-progress transaction.
+	 * in-progress transaction. Exit if the owner of the subscription has
+	 * changed from superuser to a non-superuser.
 	 */
 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
 		strcmp(newsub->name, MySubscription->name) != 0 ||
@@ -3952,7 +3953,9 @@ maybe_reread_subscription(void)
 		newsub->passwordrequired != MySubscription->passwordrequired ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		(!superuser_arg(MySubscription->owner) &&
+		 MySubscription->isownersuperuser))
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -4605,6 +4608,13 @@ InitializeLogRepWorker(void)
 		proc_exit(0);
 	}
 
+	/*
+	 * Fetch subscription owner is a superuser. This value will be later
+	 * checked to see when there is any change with this role and the worker
+	 * will be restarted if required.
+	 */
+	MySubscription->isownersuperuser = superuser_arg(MySubscription->owner);
+
 	MySubscriptionValid = true;
 	MemoryContextSwitchTo(oldctx);
 
@@ -4621,11 +4631,18 @@ InitializeLogRepWorker(void)
 	SetConfigOption("synchronous_commit", MySubscription->synccommit,
 					PGC_BACKEND, PGC_S_OVERRIDE);
 
-	/* Keep us informed about subscription changes. */
+	/*
+	 * Keep us informed about subscription changes or pg_authid rows.
+	 * (superuser can become non-superuser.)
+	 */
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
 
+	CacheRegisterSyscacheCallback(AUTHOID,
+								  subscription_change_cb,
+								  (Datum) 0);
+
 	if (am_tablesync_worker())
 		ereport(LOG,
 				(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index be36c4a820..87f6f644a9 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -144,6 +144,7 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	bool		isownersuperuser;	/* Is subscription owner superuser? */
 } Subscription;
 
 /* Disallow streaming in-progress transactions. */
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index d7a7e3ef5b..27f85537f0 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber)
   CREATE ROLE regress_admin SUPERUSER LOGIN;
   CREATE ROLE regress_alice NOSUPERUSER LOGIN;
   GRANT CREATE ON DATABASE postgres TO regress_alice;
+  GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
   SET SESSION AUTHORIZATION regress_alice;
   CREATE SCHEMA alice;
   GRANT USAGE ON SCHEMA alice TO regress_admin;
@@ -303,4 +304,24 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice;
 expect_replication("alice.unpartitioned", 3, 17, 21,
 	"restoring SELECT permission permits replication to continue");
 
+# The apply worker should get restarted after the superuser prvileges are
+# revoked for subscription owner alice.
+grant_superuser("regress_alice");
+$node_subscriber->safe_psql(
+	'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+CREATE SUBSCRIPTION regression_sub1 CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+revoke_superuser("regress_alice");
+
+# After the user becomes non-superuser the apply worker should be restarted and
+# it should fail with 'password is required' error as password option is not
+# part of the connection string.
+$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? password is required/,
+	$offset);
+
 done_testing();
-- 
2.34.1

Reply via email to