Hi, The subscription worker was not getting invalidated when the subscription owner changed from superuser to non-superuser. Here is a test case for the same: Publisher: CREATE USER repl REPLICATION PASSWORD 'secret'; CREATE TABLE t(i INT); INSERT INTO t VALUES(1); GRANT SELECT ON t TO repl; CREATE PUBLICATION p1 FOR TABLE t;
Subscriber (has a PGPASSFILE for user "repl"): CREATE USER u1 SUPERUSER; CREATE TABLE t(i INT); ALTER TABLE t OWNER TO u1; -- no password specified CREATE SUBSCRIPTION s1 CONNECTION 'dbname=postgres host=127.0.0.1 port=5432 user=repl' PUBLICATION p1; ALTER USER u1 NOSUPERUSER: -- Change u1 user to non-superuser Publisher: INSERT INTO t VALUES(1); Subscriber: SELECT COUNT(*) FROM t; -- should have been 1 but is 2, the apply worker has not exited after changing from superuser to non-superuser. Fixed this issue by checking if the subscription owner has changed from superuser to non-superuser in case the pg_authid rows changes. The attached patch has the changes for the same. Thanks to Jeff Davis for identifying this issue and reporting it at [1]. [1] - https://www.postgresql.org/message-id/5dff4caf26f45ce224a33a5e18e110b93a351b2f.camel%40j-davis.com Regards, Vignesh
From 9347e863cc78d328f7556db0e357787d14feae75 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@gmail.com> Date: Fri, 22 Sep 2023 15:12:23 +0530 Subject: [PATCH v1] Restart the apply worker if the subscription owner has changed from superuser to non-superuser. Restart the apply worker if the subscription owner has changed from superuser to non-superuser. This is required so that the subscription connection string gets revalidated to identify cases where the password option is not specified as part of the connection string for non-superuser. --- src/backend/replication/logical/worker.c | 23 +++++++++++++++++++--- src/include/catalog/pg_subscription.h | 1 + src/test/subscription/t/027_nosuperuser.pl | 21 ++++++++++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 597947410f..29197d86cb 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3942,7 +3942,8 @@ maybe_reread_subscription(void) * The launcher will start a new worker but note that the parallel apply * worker won't restart if the streaming option's value is changed from * 'parallel' to any other value or the server decides not to stream the - * in-progress transaction. + * in-progress transaction. Exit if the owner of the subscription has + * changed from superuser to a non-superuser. */ if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 || strcmp(newsub->name, MySubscription->name) != 0 || @@ -3952,7 +3953,9 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || - !equal(newsub->publications, MySubscription->publications)) + !equal(newsub->publications, MySubscription->publications) || + (!superuser_arg(MySubscription->owner) && + MySubscription->isownersuperuser)) { if (am_parallel_apply_worker()) ereport(LOG, @@ -4605,6 +4608,13 @@ InitializeLogRepWorker(void) proc_exit(0); } + /* + * Fetch subscription owner is a superuser. This value will be later + * checked to see when there is any change with this role and the worker + * will be restarted if required. + */ + MySubscription->isownersuperuser = superuser_arg(MySubscription->owner); + MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); @@ -4621,11 +4631,18 @@ InitializeLogRepWorker(void) SetConfigOption("synchronous_commit", MySubscription->synccommit, PGC_BACKEND, PGC_S_OVERRIDE); - /* Keep us informed about subscription changes. */ + /* + * Keep us informed about subscription changes or pg_authid rows. + * (superuser can become non-superuser.) + */ CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, subscription_change_cb, (Datum) 0); + CacheRegisterSyscacheCallback(AUTHOID, + subscription_change_cb, + (Datum) 0); + if (am_tablesync_worker()) ereport(LOG, (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index be36c4a820..87f6f644a9 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -144,6 +144,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + bool isownersuperuser; /* Is subscription owner superuser? */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl index d7a7e3ef5b..27f85537f0 100644 --- a/src/test/subscription/t/027_nosuperuser.pl +++ b/src/test/subscription/t/027_nosuperuser.pl @@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber) CREATE ROLE regress_admin SUPERUSER LOGIN; CREATE ROLE regress_alice NOSUPERUSER LOGIN; GRANT CREATE ON DATABASE postgres TO regress_alice; + GRANT PG_CREATE_SUBSCRIPTION TO regress_alice; SET SESSION AUTHORIZATION regress_alice; CREATE SCHEMA alice; GRANT USAGE ON SCHEMA alice TO regress_admin; @@ -303,4 +304,24 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice; expect_replication("alice.unpartitioned", 3, 17, 21, "restoring SELECT permission permits replication to continue"); +# The apply worker should get restarted after the superuser prvileges are +# revoked for subscription owner alice. +grant_superuser("regress_alice"); +$node_subscriber->safe_psql( + 'postgres', qq( +SET SESSION AUTHORIZATION regress_alice; +CREATE SUBSCRIPTION regression_sub1 CONNECTION '$publisher_connstr' PUBLICATION alice; +)); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +revoke_superuser("regress_alice"); + +# After the user becomes non-superuser the apply worker should be restarted and +# it should fail with 'password is required' error as password option is not +# part of the connection string. +$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? password is required/, + $offset); + done_testing(); -- 2.34.1