From 4d7bd76bdea2552ad03846f2bf365c8492165e69 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 26 May 2023 17:19:16 +0900
Subject: [PATCH v3] Fix tablesync worker missed using run_as_owner option

Commit 482675987 introduced "run_as_owner" subscription option so that
subscription runs with either the permissions of the subscription
owner or the permission of the table owner. However, tablesync workers
did not use this option for the initial data copy.

With this change, tablesync workers run with appropriate permissions
based on "run_as_owner" option.

Ajin Cherian, with changes and regression tests added by me.

Reported-By: Amit Kapila
Author: Ajin Cherian, Masahiko Sawada
Reviewed-by: Ajin Cherian, Amit Kapila
Discussion: https://postgr.es/m/CAA4eK1KfZcRq7hUqQ7WknP+u=08+6MevVm+2W5RrAb+DTxrdww@mail.gmail.com
---
 src/backend/replication/logical/tablesync.c   | 24 ++++++++++--
 .../subscription/t/033_run_as_table_owner.pl  | 38 ++++++++++++++++++-
 2 files changed, 57 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c56d42dcd2..b8541be49a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -120,6 +120,7 @@
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
+#include "utils/usercontext.h"
 
 static bool table_states_valid = false;
 static List *table_states_not_ready = NIL;
@@ -1252,7 +1253,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	WalRcvExecResult *res;
 	char		originname[NAMEDATALEN];
 	RepOriginId originid;
+	UserContext	ucxt;
+	Oid			checkowner;
 	bool		must_use_password;
+	bool		run_as_owner;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -1374,11 +1378,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 */
 	rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
 
+	/* get the owner for ACL and RLS checks */
+	run_as_owner = MySubscription->runasowner;
+	checkowner = run_as_owner ? MySubscription->owner : rel->rd_rel->relowner;
+
 	/*
 	 * Check that our table sync worker has permission to insert into the
 	 * target table.
 	 */
-	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
+	aclresult = pg_class_aclcheck(RelationGetRelid(rel), checkowner,
 								  ACL_INSERT);
 	if (aclresult != ACLCHECK_OK)
 		aclcheck_error(aclresult,
@@ -1392,11 +1400,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	 * circumvent RLS.  Disallow logical replication into RLS enabled
 	 * relations for such roles.
 	 */
-	if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
+	if (check_enable_rls(RelationGetRelid(rel), checkowner, false) == RLS_ENABLED)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
-						GetUserNameFromId(GetUserId(), true),
+						GetUserNameFromId(checkowner, true),
 						RelationGetRelationName(rel))));
 
 	/*
@@ -1456,6 +1464,13 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						originname)));
 	}
 
+	/*
+	 * Make sure that the copy command runs as the table owner, unless
+	 * the user has opted out of that behaviour.
+	 */
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
+
 	/* Now do the initial data copy */
 	PushActiveSnapshot(GetTransactionSnapshot());
 	copy_table(rel);
@@ -1469,6 +1484,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 						res->err)));
 	walrcv_clear_result(res);
 
+	if(!run_as_owner)
+		RestoreUserContext(&ucxt);
+
 	table_close(rel, NoLock);
 
 	/* Make the copy visible. */
diff --git a/src/test/subscription/t/033_run_as_table_owner.pl b/src/test/subscription/t/033_run_as_table_owner.pl
index 0aa8a093ef..2d0e0e78e3 100644
--- a/src/test/subscription/t/033_run_as_table_owner.pl
+++ b/src/test/subscription/t/033_run_as_table_owner.pl
@@ -70,8 +70,8 @@ sub revoke_superuser
 
 # Create publisher and subscriber nodes with schemas owned and published by
 # "regress_alice" but subscribed and replicated by different role
-# "regress_admin".  For partitioned tables, layout the partitions differently
-# on the publisher than on the subscriber.
+# "regress_admin" and "regress_admin2". For partitioned tables, layout the
+# partitions differently on the publisher than on the subscriber.
 #
 $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
 $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
@@ -86,6 +86,7 @@ for my $node ($node_publisher, $node_subscriber)
 	$node->safe_psql(
 		'postgres', qq(
   CREATE ROLE regress_admin SUPERUSER LOGIN;
+  CREATE ROLE regress_admin2 SUPERUSER LOGIN;
   CREATE ROLE regress_alice NOSUPERUSER LOGIN;
   GRANT CREATE ON DATABASE postgres TO regress_alice;
   SET SESSION AUTHORIZATION regress_alice;
@@ -192,4 +193,37 @@ GRANT regress_alice TO regress_admin WITH INHERIT TRUE, SET FALSE;
 expect_replication("alice.unpartitioned", 3, 7, 13,
 	"with INHERIT but not SET ROLE can replicate");
 
+# Remove the subscrition and truncate the table for the initial data sync
+# tests.
+$node_subscriber->safe_psql(
+	    'postgres', qq(
+DROP SUBSCRIPTION admin_sub;
+TRUNCATE alice.unpartitioned;
+));
+
+# Create a new subscription "admin_sub" owned by regress_admin2. It's
+# disabled so that we revoke superuser privilege after creation.
+$node_subscriber->safe_psql(
+    'postgres', qq(
+SET SESSION AUTHORIZATION regress_admin2;
+CREATE SUBSCRIPTION admin_sub CONNECTION '$publisher_connstr' PUBLICATION alice
+WITH (run_as_owner = false, password_required = false, copy_data = true, enabled = false);
+));
+
+# Revoke superuser privilege for "regress_admin2", and give it the
+# ability to SET ROLE. Then enable the subscription "admin_sub".
+revoke_superuser("regress_admin2");
+$node_subscriber->safe_psql(
+    'postgres', qq(
+GRANT regress_alice TO regress_admin2 WITH INHERIT FALSE, SET TRUE;
+ALTER SUBSCRIPTION admin_sub ENABLE;
+));
+
+# Because the initial data sync is working as the table owner, all
+# data should be copied.
+$node_subscriber->wait_for_subscription_sync($node_publisher,
+					     'admin_sub');
+expect_replication("alice.unpartitioned", 3, 7, 13,
+		   "table owner can do the initial data copy");
+
 done_testing();
-- 
2.31.1

