From ae3cf3b4653c8e79f7a00e3c26908bbf09299dd6 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Wed, 1 Mar 2023 12:59:02 -0500
Subject: [PATCH v2] Perform logical replication actions as the table owner.

Up until now, logical replication actions have been performed as the
subscription owner, who will generally be a superuser.  Commit
cec57b1a0fbcd3833086ba686897c5883e0a2afc documented hazards
associated with that situation, namely, that any user who owns a
table on the subscriber side could assume the privileges of the
subscription owner by attaching a trigger, expression index, or
some other kind of executable code to it. As a remedy, it suggested
not creating configurations where users who are not fully trusted
own tables on the subscriber.

Although that will work, it basically precludes using logical
replication in the way that people typically want to use it,
namely, to replicate a database from one node to another
without necessarily having any restrictions on which database
users can own tables. So, instead, change logical replication to
execute INSERT, UPDATE, DELETE, and TRUNCATE operations as the
table owner when they are replicated.

Since this involves switching the active user frequently within
a session that is authenticated as the subscription user, also
impose SECURITY_RESTRICTED_OPEATION restrictions on logical
replication code. As an exception, if the table owner can SET
ROLE to the subscription owner, these restrictions have no
security value, so don't impose them in that case.

Subscription owners are now required to have the ability to
SET ROLE to every role that owns a table that the subscription
is replicating. If they don't, replication will fail. Superusers,
who normally own subscriptions, satisfy this property by default.
Non-superusers users who own subscriptions will needed to be
granted the roles that own relevant tables.
---
 src/backend/commands/tablecmds.c           |  20 ++-
 src/backend/replication/logical/worker.c   |  22 ++-
 src/backend/utils/init/Makefile            |   3 +-
 src/backend/utils/init/meson.build         |   3 +-
 src/backend/utils/init/usercontext.c       |  91 ++++++++++++
 src/include/commands/tablecmds.h           |   3 +-
 src/include/utils/usercontext.h            |  26 ++++
 src/test/subscription/t/027_nosuperuser.pl | 153 ++++++++++++++++++++-
 8 files changed, 311 insertions(+), 10 deletions(-)
 create mode 100644 src/backend/utils/init/usercontext.c
 create mode 100644 src/include/utils/usercontext.h

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 9b0a0142d3b..ec22d8eba42 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -103,6 +103,7 @@
 #include "utils/syscache.h"
 #include "utils/timestamp.h"
 #include "utils/typcache.h"
+#include "utils/usercontext.h"
 
 /*
  * ON COMMIT action list
@@ -1762,7 +1763,7 @@ ExecuteTruncate(TruncateStmt *stmt)
 	}
 
 	ExecuteTruncateGuts(rels, relids, relids_logged,
-						stmt->behavior, stmt->restart_seqs);
+						stmt->behavior, stmt->restart_seqs, false);
 
 	/* And close the rels */
 	foreach(cell, rels)
@@ -1790,7 +1791,8 @@ void
 ExecuteTruncateGuts(List *explicit_rels,
 					List *relids,
 					List *relids_logged,
-					DropBehavior behavior, bool restart_seqs)
+					DropBehavior behavior, bool restart_seqs,
+					bool run_as_table_owner)
 {
 	List	   *rels;
 	List	   *seq_relids = NIL;
@@ -1929,7 +1931,14 @@ ExecuteTruncateGuts(List *explicit_rels,
 	resultRelInfo = resultRelInfos;
 	foreach(cell, rels)
 	{
+		UserContext	ucxt;
+
+		if (run_as_table_owner)
+			SwitchToUntrustedUser(resultRelInfo->ri_RelationDesc->rd_rel->relowner,
+								  &ucxt);
 		ExecBSTruncateTriggers(estate, resultRelInfo);
+		if (run_as_table_owner)
+			RestoreUserContext(&ucxt);
 		resultRelInfo++;
 	}
 
@@ -2134,7 +2143,14 @@ ExecuteTruncateGuts(List *explicit_rels,
 	resultRelInfo = resultRelInfos;
 	foreach(cell, rels)
 	{
+		UserContext	ucxt;
+
+		if (run_as_table_owner)
+			SwitchToUntrustedUser(resultRelInfo->ri_RelationDesc->rd_rel->relowner,
+								  &ucxt);
 		ExecASTruncateTriggers(estate, resultRelInfo);
+		if (run_as_table_owner)
+			RestoreUserContext(&ucxt);
 		resultRelInfo++;
 	}
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 10f97119727..8cf84d0d5e8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -207,6 +207,7 @@
 #include "utils/rls.h"
 #include "utils/syscache.h"
 #include "utils/timeout.h"
+#include "utils/usercontext.h"
 
 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
 
@@ -2395,6 +2396,7 @@ apply_handle_insert(StringInfo s)
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
 	LogicalRepRelId relid;
+	UserContext		ucxt;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
@@ -2423,6 +2425,9 @@ apply_handle_insert(StringInfo s)
 		return;
 	}
 
+	/* Make sure that any user-supplied code runs as the table owner. */
+	SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
 	/* Set relation for error callback */
 	apply_error_callback_arg.rel = rel;
 
@@ -2452,6 +2457,8 @@ apply_handle_insert(StringInfo s)
 	/* Reset relation for error callback */
 	apply_error_callback_arg.rel = NULL;
 
+	RestoreUserContext(&ucxt);
+
 	logicalrep_rel_close(rel, NoLock);
 
 	end_replication_step();
@@ -2530,6 +2537,7 @@ apply_handle_update(StringInfo s)
 {
 	LogicalRepRelMapEntry *rel;
 	LogicalRepRelId relid;
+	UserContext		ucxt;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	LogicalRepTupleData oldtup;
@@ -2569,6 +2577,9 @@ apply_handle_update(StringInfo s)
 	/* Check if we can do the update. */
 	check_relation_updatable(rel);
 
+	/* Make sure that any user-supplied code runs as the table owner. */
+	SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
 	/* Initialize the executor state. */
 	edata = create_edata_for_relation(rel);
 	estate = edata->estate;
@@ -2619,6 +2630,8 @@ apply_handle_update(StringInfo s)
 	/* Reset relation for error callback */
 	apply_error_callback_arg.rel = NULL;
 
+	RestoreUserContext(&ucxt);
+
 	logicalrep_rel_close(rel, NoLock);
 
 	end_replication_step();
@@ -2702,6 +2715,7 @@ apply_handle_delete(StringInfo s)
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData oldtup;
 	LogicalRepRelId relid;
+	UserContext		ucxt;
 	ApplyExecutionData *edata;
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
@@ -2736,6 +2750,9 @@ apply_handle_delete(StringInfo s)
 	/* Check if we can do the delete. */
 	check_relation_updatable(rel);
 
+	/* Make sure that any user-supplied code runs as the table owner. */
+	SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
 	/* Initialize the executor state. */
 	edata = create_edata_for_relation(rel);
 	estate = edata->estate;
@@ -2761,6 +2778,8 @@ apply_handle_delete(StringInfo s)
 	/* Reset relation for error callback */
 	apply_error_callback_arg.rel = NULL;
 
+	RestoreUserContext(&ucxt);
+
 	logicalrep_rel_close(rel, NoLock);
 
 	end_replication_step();
@@ -3211,7 +3230,8 @@ apply_handle_truncate(StringInfo s)
 						relids,
 						relids_logged,
 						DROP_RESTRICT,
-						restart_seqs);
+						restart_seqs,
+						true);
 	foreach(lc, remote_rels)
 	{
 		LogicalRepRelMapEntry *rel = lfirst(lc);
diff --git a/src/backend/utils/init/Makefile b/src/backend/utils/init/Makefile
index 362569393b5..18c947464f4 100644
--- a/src/backend/utils/init/Makefile
+++ b/src/backend/utils/init/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	globals.o \
 	miscinit.o \
-	postinit.o
+	postinit.o \
+	usercontext.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/utils/init/meson.build b/src/backend/utils/init/meson.build
index b5ab154055e..186be1381d8 100644
--- a/src/backend/utils/init/meson.build
+++ b/src/backend/utils/init/meson.build
@@ -3,4 +3,5 @@
 backend_sources += files(
   'globals.c',
   'miscinit.c',
-  'postinit.c')
+  'postinit.c',
+  'usercontext.c')
diff --git a/src/backend/utils/init/usercontext.c b/src/backend/utils/init/usercontext.c
new file mode 100644
index 00000000000..f4b2d4708d3
--- /dev/null
+++ b/src/backend/utils/init/usercontext.c
@@ -0,0 +1,91 @@
+/*-------------------------------------------------------------------------
+ *
+ * usercontext.c
+ *	  Convenience functions for running code as a different database user.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/utils/init/usercontext.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "utils/acl.h"
+#include "utils/guc.h"
+#include "utils/usercontext.h"
+
+/*
+ * Temporarily switch to a new user ID.
+ *
+ * If the current user doesn't have permission to SET ROLE to the new user,
+ * an ERROR occurs.
+ *
+ * If the new user doesn't have permission to SET ROLE to the current user,
+ * SECURITY_RESTRICTED_OPERATION is imposed and a new GUC nest level is
+ * created so that any settings changes can be rolled back.
+ */
+void
+SwitchToUntrustedUser(Oid userid, UserContext *context)
+{
+	/* Get the current user ID and security context. */
+	GetUserIdAndSecContext(&context->save_userid,
+						   &context->save_sec_context);
+
+	/* Check that we have sufficient privileges to assume the target role. */
+	if (!member_can_set_role(context->save_userid, userid))
+	{
+		context->save_nestlevel = -1;
+		return;
+	}
+
+	/*
+	 * Try to prevent the user to which we're switching from assuming the
+	 * privileges of the current user, unless they can SET ROLE to that user
+	 * anyway.
+	 */
+	if (member_can_set_role(userid, context->save_userid))
+	{
+		/*
+		 * Each user can SET ROLE to the other, so there's no point in
+		 * imposing any security restrictions. Just let the user do whatever
+		 * they want.
+		 */
+		SetUserIdAndSecContext(userid, context->save_sec_context);
+		context->save_nestlevel = -1;
+	}
+	else
+	{
+		int	sec_context = context->save_sec_context;
+
+		/*
+		 * This user can SET ROLE to the target user, but not the other way
+		 * around, so protect ourselves against the target user by setting
+		 * SECURITY_RESTRICTED_OPERATION to prevent certain changes to the
+		 * session state. Also set up a new GUC nest level, so that we can roll
+		 * back any GUC changes that may be made by code running as the target
+		 * user, inasmuch as they could be malicious.
+		 */
+		sec_context |= SECURITY_RESTRICTED_OPERATION;
+		SetUserIdAndSecContext(userid, sec_context);
+		context->save_nestlevel = NewGUCNestLevel();
+	}
+}
+
+/*
+ * Switch back to the original user ID.
+ *
+ * If we created a new GUC nest level, also role back any changes that were
+ * made within it.
+ */
+void
+RestoreUserContext(UserContext *context)
+{
+	if (context->save_nestlevel != -1)
+		AtEOXact_GUC(false, context->save_nestlevel);
+	SetUserIdAndSecContext(context->save_userid, context->save_sec_context);
+}
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index e7c2b91a583..17b94049371 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -60,7 +60,8 @@ extern void ExecuteTruncateGuts(List *explicit_rels,
 								List *relids,
 								List *relids_logged,
 								DropBehavior behavior,
-								bool restart_seqs);
+								bool restart_seqs,
+								bool run_as_table_owner);
 
 extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
 
diff --git a/src/include/utils/usercontext.h b/src/include/utils/usercontext.h
new file mode 100644
index 00000000000..a8195c194de
--- /dev/null
+++ b/src/include/utils/usercontext.h
@@ -0,0 +1,26 @@
+/*-------------------------------------------------------------------------
+ *
+ * usercontext.h
+ *	  Convenience functions for running code as a different database user.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef USERCONTEXT_H
+#define USERCONTEXT_H
+
+/*
+ * When temporarily changing to run as a different user, this structure
+ * holds the details needed to restore the original state.
+ */
+typedef struct UserContext
+{
+	Oid			save_userid;
+	int			save_sec_context;
+	int			save_nestlevel;
+} UserContext;
+
+/* Function prototypes. */
+extern void SwitchToUntrustedUser(Oid userid, UserContext *context);
+extern void RestoreUserContext(UserContext *context);
+
+#endif							/* USERCONTEXT_H */
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index 59192dbe2f6..84fcdaa65f8 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -297,6 +297,151 @@ expect_replication("alice.unpartitioned", 2, 15, 17,
 	"admin with bypassrls replicates delete into rls enabled unpartitioned");
 grant_superuser("regress_admin");
 
+# Privileges on the target role suffice for non-superuser replication.
+$node_subscriber->safe_psql(
+	'postgres', qq(
+ALTER ROLE regress_admin NOSUPERUSER;
+ALTER TABLE alice.unpartitioned DISABLE ROW LEVEL SECURITY;
+GRANT SELECT,INSERT,UPDATE,DELETE ON
+  alice.unpartitioned,
+  alice.hashpart, alice.hashpart_a, alice.hashpart_b
+  TO regress_admin;
+GRANT regress_alice TO regress_admin;
+));
+
+publish_insert("alice.unpartitioned", 19);
+expect_replication("alice.unpartitioned", 3, 15, 19,
+	"nosuperuser admin with privileges on role can replicate INSERT into unpartitioned"
+);
+
+publish_update("alice.unpartitioned", 15 => 21);
+expect_replication("alice.unpartitioned", 3, 17, 21,
+	"nosuperuser admin with privileges on role can replicate UPDATE into unpartitioned"
+);
+
+publish_delete("alice.unpartitioned", 17);
+expect_replication("alice.unpartitioned", 2, 19, 21,
+	"nosuperuser admin with privileges on role can replicate DELETE into unpartitioned"
+);
+
+# Test partitioning
+#
+publish_insert("alice.hashpart", 91);
+publish_insert("alice.hashpart", 92);
+publish_insert("alice.hashpart", 93);
+publish_update("alice.hashpart", 92 => 130);
+publish_delete("alice.hashpart", 91);
+expect_replication("alice.hashpart", 4, 93, 130,
+	"nosuperuser admin with privileges on role can replicate into hashpart"
+);
+
+# Force RLS on the target table and check that replication fails.
+$node_subscriber->safe_psql(
+	'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+ALTER TABLE alice.unpartitioned ENABLE ROW LEVEL SECURITY;
+ALTER TABLE alice.unpartitioned FORCE ROW LEVEL SECURITY;
+));
+
+publish_insert("alice.unpartitioned", 23);
+expect_failure(
+	"alice.unpartitioned",
+	2,
+	19,
+	21,
+	qr/ERROR: ( [A-Z0-9]+:)? user "regress_alice" cannot replicate into relation with row-level security enabled: "unpartitioned\w*"/msi,
+	"replication of insert into table with forced rls fails");
+
+# Since replication acts as the table owner, replication will succeed if we don't force it.
+$node_subscriber->safe_psql(
+	'postgres', qq(
+ALTER TABLE alice.unpartitioned NO FORCE ROW LEVEL SECURITY;
+));
+expect_replication("alice.unpartitioned", 3, 19, 23,
+	"non-superuser admin can replicate insert if rls is not forced"
+);
+
+$node_subscriber->safe_psql(
+	'postgres', qq(
+ALTER TABLE alice.unpartitioned FORCE ROW LEVEL SECURITY;
+));
+publish_update("alice.unpartitioned", 19 => 25);
+expect_failure(
+	"alice.unpartitioned",
+	3,
+	19,
+	23,
+	qr/ERROR: ( [A-Z0-9]+:)? user "regress_alice" cannot replicate into relation with row-level security enabled: "unpartitioned\w*"/msi,
+	"replication of update into table with forced rls fails"
+);
+$node_subscriber->safe_psql(
+	'postgres', qq(
+ALTER TABLE alice.unpartitioned NO FORCE ROW LEVEL SECURITY;
+));
+expect_replication("alice.unpartitioned", 3, 21, 25,
+	"non-superuser admin can replicate update if rls is not forced");
+
+# Remove some of alice's privileges on her own table. Then replication should fail.
+$node_subscriber->safe_psql(
+	'postgres', qq(
+REVOKE SELECT, INSERT ON alice.unpartitioned FROM regress_alice;
+));
+publish_insert("alice.unpartitioned", 27);
+expect_failure(
+	"alice.unpartitioned",
+	3,
+	21,
+	25,
+	qr/ERROR: ( [A-Z0-9]+:)? permission denied for table unpartitioned/msi,
+	"replication of insert fails if table owner lacks insert permission"
+);
+
+# alice needs INSERT but not SELECT to replicate an INSERT.
+$node_subscriber->safe_psql(
+	'postgres', qq(
+GRANT INSERT ON alice.unpartitioned TO regress_alice;
+));
+expect_replication("alice.unpartitioned", 4, 21, 27,
+	"restoring insert permission permits replication to continue");
+
+# Now let's try an UPDATE and a DELETE.
+$node_subscriber->safe_psql(
+	'postgres', qq(
+REVOKE UPDATE, DELETE ON alice.unpartitioned FROM regress_alice;
+));
+publish_update("alice.unpartitioned", 21 => 29);
+publish_delete("alice.unpartitioned", 23);
+expect_failure(
+	"alice.unpartitioned",
+	4,
+	21,
+	27,
+	qr/ERROR: ( [A-Z0-9]+:)? permission denied for table unpartitioned/msi,
+	"replication of update/delete fails if table owner lacks corresponding permission"
+);
+
+# Restoring UPDATE and DELETE is insufficient.
+$node_subscriber->safe_psql(
+	'postgres', qq(
+GRANT UPDATE, DELETE ON alice.unpartitioned TO regress_alice;
+));
+expect_failure(
+	"alice.unpartitioned",
+	4,
+	21,
+	27,
+	qr/ERROR: ( [A-Z0-9]+:)? permission denied for table unpartitioned/msi,
+	"replication of update/delete fails if table owner lacks SELECT permission"
+);
+
+# alice needs INSERT but not SELECT to replicate an INSERT.
+$node_subscriber->safe_psql(
+	'postgres', qq(
+GRANT SELECT ON alice.unpartitioned TO regress_alice;
+));
+expect_replication("alice.unpartitioned", 3, 25, 29,
+	"restoring SELECT permission permits replication to continue");
+
 # Alter the subscription owner to "regress_alice".  She has neither superuser
 # nor bypassrls, but as the table owner should be able to replicate.
 #
@@ -309,10 +454,10 @@ ALTER ROLE regress_alice NOSUPERUSER;
 ALTER SUBSCRIPTION admin_sub ENABLE;
 ));
 
-publish_insert("alice.unpartitioned", 23);
-publish_update("alice.unpartitioned", 15 => 25);
-publish_delete("alice.unpartitioned", 17);
-expect_replication("alice.unpartitioned", 2, 23, 25,
+publish_insert("alice.unpartitioned", 31);
+publish_update("alice.unpartitioned", 25 => 33);
+publish_delete("alice.unpartitioned", 27);
+expect_replication("alice.unpartitioned", 3, 29, 33,
 	"nosuperuser nobypassrls table owner can replicate delete into unpartitioned despite rls"
 );
 

base-commit: 4c8d654084700f801f48827bb3531a6779b8b90e
-- 
2.34.1

