Here is a new version of the patch.  Besides some cleanup, I added an index
on reltoastrelid for the toast-to-main-relation lookup.  Before I bother
adjusting the tests and documentation, I'm curious to hear thoughts on
whether this seems like a viable approach.

On Sat, Dec 17, 2022 at 04:39:29AM -0800, Ted Yu wrote:
> +cluster_is_permitted_for_relation(Oid relid, Oid userid)
> +{
> +       return pg_class_aclcheck(relid, userid, ACL_MAINTAIN) ==
> ACLCHECK_OK ||
> +                  has_parent_privs(relid, userid, ACL_MAINTAIN);
> 
> Since the func only contains one statement, it seems this can be defined as
> a macro instead.

In the new version, there is a bit more to this function, so I didn't
convert it to a macro.

> +       List       *ancestors = get_partition_ancestors(relid);
> +       Oid                     root = InvalidOid;
> 
> nit: it would be better if the variable `root` can be aligned with variable
> `ancestors`.

Hm.  It looked alright on my machine.  In any case, I'll be sure to run
pgindent at some point.  This patch is still in early stages.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From f0f65618d1772151b3c4aff6f8da77bd1f212278 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandboss...@gmail.com>
Date: Sun, 18 Dec 2022 14:46:21 -0800
Subject: [PATCH v2 1/1] fix maintain privs

---
 src/backend/catalog/partition.c               | 22 ++++++++++
 src/backend/catalog/toasting.c                | 32 +++++++++++++++
 src/backend/commands/cluster.c                | 35 +++++++++++-----
 src/backend/commands/indexcmds.c              | 10 ++---
 src/backend/commands/lockcmds.c               |  5 +++
 src/backend/commands/tablecmds.c              | 41 ++++++++++++++++++-
 src/backend/commands/vacuum.c                 |  7 ++--
 src/include/catalog/partition.h               |  1 +
 src/include/catalog/pg_class.h                |  1 +
 src/include/catalog/toasting.h                |  1 +
 src/include/commands/tablecmds.h              |  1 +
 .../expected/cluster-conflict-partition.out   |  6 ++-
 src/test/regress/expected/cluster.out         |  4 +-
 src/test/regress/expected/vacuum.out          | 18 --------
 14 files changed, 143 insertions(+), 41 deletions(-)

diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index 79ccddce55..9a4caa769a 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -119,6 +119,28 @@ get_partition_parent_worker(Relation inhRel, Oid relid, bool *detach_pending)
 	return result;
 }
 
+/*
+ * get_partition_root
+ *		Obtain the partitioned table of a given relation
+ *
+ * Note: Because this function assumes that the realtion whose OID is passed as
+ * an argument and each ancestor will have precisely one parent, it should only
+ * be called when it is known that the relation is a partition.
+ */
+Oid
+get_partition_root(Oid relid)
+{
+	List	   *ancestors = get_partition_ancestors(relid);
+	Oid			root = InvalidOid;
+
+	if (list_length(ancestors) > 0)
+		root = llast_oid(ancestors);
+
+	list_free(ancestors);
+
+	return root;
+}
+
 /*
  * get_partition_ancestors
  *		Obtain ancestors of given relation
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 9bc10729b0..f9d264e1e6 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -14,6 +14,7 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/heapam.h"
 #include "access/toast_compression.h"
 #include "access/xact.h"
@@ -32,6 +33,7 @@
 #include "nodes/makefuncs.h"
 #include "storage/lock.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
@@ -413,3 +415,33 @@ needs_toast_table(Relation rel)
 	/* Otherwise, let the AM decide. */
 	return table_relation_needs_toast_table(rel);
 }
+
+/*
+ * Get the main relation for the given TOAST table.
+ */
+Oid
+get_toast_parent(Oid relid)
+{
+	Relation	rel;
+	SysScanDesc scan;
+	ScanKeyData key[1];
+	Oid			result = InvalidOid;
+	HeapTuple	tuple;
+
+	rel = table_open(RelationRelationId, AccessShareLock);
+
+	ScanKeyInit(&key[0],
+				Anum_pg_class_reltoastrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(relid));
+
+	scan = systable_beginscan(rel, ClassToastRelidIndexId, true, NULL, 1, key);
+	tuple = systable_getnext(scan);
+	if (HeapTupleIsValid(tuple))
+		result = ((Form_pg_class) GETSTRUCT(tuple))->oid;
+	systable_endscan(scan);
+
+	table_close(rel, AccessShareLock);
+
+	return result;
+}
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 8966b75bd1..78b2cd62df 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -80,6 +80,7 @@ static void copy_table_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
 static List *get_tables_to_cluster(MemoryContext cluster_context);
 static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context,
 											   Oid indexOid);
+static bool cluster_is_permitted_for_relation(Oid relid, Oid userid);
 
 
 /*---------------------------------------------------------------------------
@@ -366,8 +367,7 @@ cluster_rel(Oid tableOid, Oid indexOid, ClusterParams *params)
 	if (recheck)
 	{
 		/* Check that the user still has privileges for the relation */
-		if (!object_ownercheck(RelationRelationId, tableOid, save_userid) &&
-			pg_class_aclcheck(tableOid, save_userid, ACL_MAINTAIN) != ACLCHECK_OK)
+		if (!cluster_is_permitted_for_relation(tableOid, save_userid))
 		{
 			relation_close(OldHeap, AccessExclusiveLock);
 			goto out;
@@ -1646,8 +1646,7 @@ get_tables_to_cluster(MemoryContext cluster_context)
 
 		index = (Form_pg_index) GETSTRUCT(indexTuple);
 
-		if (!object_ownercheck(RelationRelationId, index->indrelid, GetUserId()) &&
-			pg_class_aclcheck(index->indrelid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
+		if (!cluster_is_permitted_for_relation(index->indrelid, GetUserId()))
 			continue;
 
 		/* Use a permanent memory context for the result list */
@@ -1695,12 +1694,11 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid)
 		if (get_rel_relkind(indexrelid) != RELKIND_INDEX)
 			continue;
 
-		/* Silently skip partitions which the user has no access to. */
-		if (!object_ownercheck(RelationRelationId, relid, GetUserId()) &&
-			pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
-			(!object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) ||
-			 IsSharedRelation(relid)))
-			continue;
+		/*
+		 * We already checked that the user has privileges to CLUSTER the
+		 * partitioned table when we locked it earlier, so there's no need to
+		 * check the privileges again here.
+		 */
 
 		/* Use a permanent memory context for the result list */
 		old_context = MemoryContextSwitchTo(cluster_context);
@@ -1715,3 +1713,20 @@ get_tables_to_cluster_partitioned(MemoryContext cluster_context, Oid indexOid)
 
 	return rtcs;
 }
+
+/*
+ * Return whether userid has privileges to CLUSTER relid.  If not, this
+ * function emits a WARNING.
+ */
+static bool
+cluster_is_permitted_for_relation(Oid relid, Oid userid)
+{
+	if (pg_class_aclcheck(relid, userid, ACL_MAINTAIN) == ACLCHECK_OK ||
+		has_parent_privs(relid, userid, ACL_MAINTAIN))
+		return true;
+
+	ereport(WARNING,
+			(errmsg("permission denied to cluster \"%s\", skipping it",
+					get_rel_name(relid))));
+	return false;
+}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 7dc1aca8fe..3739e05401 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -2796,9 +2796,9 @@ RangeVarCallbackForReindexIndex(const RangeVar *relation,
 
 	/* Check permissions */
 	table_oid = IndexGetRelation(relId, true);
-	if (!object_ownercheck(RelationRelationId, relId, GetUserId()) &&
-		OidIsValid(table_oid) &&
-		pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
+	if (OidIsValid(table_oid) &&
+		pg_class_aclcheck(table_oid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
+		!has_parent_privs(table_oid, GetUserId(), ACL_MAINTAIN))
 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_INDEX,
 					   relation->relname);
 
@@ -3016,8 +3016,8 @@ ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
 		 * permission checks at the beginning of this routine.
 		 */
 		if (classtuple->relisshared &&
-			!object_ownercheck(RelationRelationId, relid, GetUserId()) &&
-			pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
+			pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
+			!has_parent_privs(relid, GetUserId(), ACL_MAINTAIN))
 			continue;
 
 		/*
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index e294efc67c..28c75bc093 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -19,6 +19,7 @@
 #include "catalog/namespace.h"
 #include "catalog/pg_inherits.h"
 #include "commands/lockcmds.h"
+#include "commands/tablecmds.h"
 #include "miscadmin.h"
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_clause.h"
@@ -305,5 +306,9 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
 
 	aclresult = pg_class_aclcheck(reloid, userid, aclmask);
 
+	if (aclresult != ACLCHECK_OK &&
+		has_parent_privs(reloid, userid, ACL_MAINTAIN))
+		aclresult = ACLCHECK_OK;
+
 	return aclresult;
 }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 56dc995713..60e0edb455 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -16918,12 +16918,49 @@ RangeVarCallbackMaintainsTable(const RangeVar *relation,
 				 errmsg("\"%s\" is not a table or materialized view", relation->relname)));
 
 	/* Check permissions */
-	if (!object_ownercheck(RelationRelationId, relId, GetUserId()) &&
-		pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK)
+	if (pg_class_aclcheck(relId, GetUserId(), ACL_MAINTAIN) != ACLCHECK_OK &&
+		!has_parent_privs(relId, GetUserId(), ACL_MAINTAIN))
 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_TABLE,
 					   relation->relname);
 }
 
+/*
+ * If relid is a partition, return whether userid has any of the privileges
+ * specified in acl on its partitioned table.
+ *
+ * If relid is a TOAST table, return whether userid has any of the privileges
+ * specified in acl on its main relation.
+ */
+bool
+has_parent_privs(Oid relid, Oid userid, AclMode acl)
+{
+	/*
+	 * If this is a partition, check the permissions on its partitioned table.
+	 */
+	if (get_rel_relispartition(relid))
+	{
+		Oid			root = get_partition_root(relid);
+
+		if (OidIsValid(root) &&
+			pg_class_aclcheck(root, userid, acl) == ACLCHECK_OK)
+			return true;
+	}
+
+	/*
+	 * If this is a TOAST table, check the permissions on the main relation.
+	 */
+	if (get_rel_relkind(relid) == RELKIND_TOASTVALUE)
+	{
+		Oid			parent = get_toast_parent(relid);
+
+		if (OidIsValid(parent) &&
+			pg_class_aclcheck(parent, userid, acl) == ACLCHECK_OK)
+			return true;
+	}
+
+	return false;
+}
+
 /*
  * Callback to RangeVarGetRelidExtended() for TRUNCATE processing.
  */
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 293b84bbca..9360564945 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -41,6 +41,7 @@
 #include "catalog/pg_namespace.h"
 #include "commands/cluster.h"
 #include "commands/defrem.h"
+#include "commands/tablecmds.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
@@ -568,9 +569,9 @@ vacuum_is_permitted_for_relation(Oid relid, Form_pg_class reltuple,
 	 *   - the role owns the current database and the relation is not shared
 	 *   - the role has been granted the MAINTAIN privilege on the relation
 	 */
-	if (object_ownercheck(RelationRelationId, relid, GetUserId()) ||
-		(object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared) ||
-		pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK)
+	if ((object_ownercheck(DatabaseRelationId, MyDatabaseId, GetUserId()) && !reltuple->relisshared) ||
+		pg_class_aclcheck(relid, GetUserId(), ACL_MAINTAIN) == ACLCHECK_OK ||
+		has_parent_privs(relid, GetUserId(), ACL_MAINTAIN))
 		return true;
 
 	relname = NameStr(reltuple->relname);
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index b29df6690c..41144abf3f 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -20,6 +20,7 @@
 #define HASH_PARTITION_SEED UINT64CONST(0x7A5B22367996DCFD)
 
 extern Oid	get_partition_parent(Oid relid, bool even_if_detached);
+extern Oid	get_partition_root(Oid relid);
 extern List *get_partition_ancestors(Oid relid);
 extern Oid	index_get_partition(Relation partition, Oid indexId);
 extern List *map_partition_varattnos(List *expr, int fromrel_varno,
diff --git a/src/include/catalog/pg_class.h b/src/include/catalog/pg_class.h
index e1f4eefa22..f8f2efc2a7 100644
--- a/src/include/catalog/pg_class.h
+++ b/src/include/catalog/pg_class.h
@@ -155,6 +155,7 @@ typedef FormData_pg_class *Form_pg_class;
 DECLARE_UNIQUE_INDEX_PKEY(pg_class_oid_index, 2662, ClassOidIndexId, on pg_class using btree(oid oid_ops));
 DECLARE_UNIQUE_INDEX(pg_class_relname_nsp_index, 2663, ClassNameNspIndexId, on pg_class using btree(relname name_ops, relnamespace oid_ops));
 DECLARE_INDEX(pg_class_tblspc_relfilenode_index, 3455, ClassTblspcRelfilenodeIndexId, on pg_class using btree(reltablespace oid_ops, relfilenode oid_ops));
+DECLARE_INDEX(pg_class_reltoastrelid_index, 2173, ClassToastRelidIndexId, on pg_class using btree(reltoastrelid oid_ops));
 
 #ifdef EXPOSE_TO_CLIENT_CODE
 
diff --git a/src/include/catalog/toasting.h b/src/include/catalog/toasting.h
index 51e3ba7b69..2afdec5c4a 100644
--- a/src/include/catalog/toasting.h
+++ b/src/include/catalog/toasting.h
@@ -26,5 +26,6 @@ extern void AlterTableCreateToastTable(Oid relOid, Datum reloptions,
 									   LOCKMODE lockmode);
 extern void BootstrapToastTable(char *relName,
 								Oid toastOid, Oid toastIndexOid);
+extern Oid get_toast_parent(Oid relid);
 
 #endif							/* TOASTING_H */
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 07eac9a26c..959ae53aa8 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -98,6 +98,7 @@ extern void AtEOSubXact_on_commit_actions(bool isCommit,
 extern void RangeVarCallbackMaintainsTable(const RangeVar *relation,
 										   Oid relId, Oid oldRelId,
 										   void *arg);
+extern bool has_parent_privs(Oid relid, Oid userid, AclMode acl);
 
 extern void RangeVarCallbackOwnsRelation(const RangeVar *relation,
 										 Oid relId, Oid oldRelId, void *arg);
diff --git a/src/test/isolation/expected/cluster-conflict-partition.out b/src/test/isolation/expected/cluster-conflict-partition.out
index 7acb675c97..8d21276996 100644
--- a/src/test/isolation/expected/cluster-conflict-partition.out
+++ b/src/test/isolation/expected/cluster-conflict-partition.out
@@ -22,14 +22,16 @@ starting permutation: s1_begin s1_lock_child s2_auth s2_cluster s1_commit s2_res
 step s1_begin: BEGIN;
 step s1_lock_child: LOCK cluster_part_tab1 IN SHARE UPDATE EXCLUSIVE MODE;
 step s2_auth: SET ROLE regress_cluster_part;
-step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind;
+step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind; <waiting ...>
 step s1_commit: COMMIT;
+step s2_cluster: <... completed>
 step s2_reset: RESET ROLE;
 
 starting permutation: s1_begin s2_auth s1_lock_child s2_cluster s1_commit s2_reset
 step s1_begin: BEGIN;
 step s2_auth: SET ROLE regress_cluster_part;
 step s1_lock_child: LOCK cluster_part_tab1 IN SHARE UPDATE EXCLUSIVE MODE;
-step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind;
+step s2_cluster: CLUSTER cluster_part_tab USING cluster_part_ind; <waiting ...>
 step s1_commit: COMMIT;
+step s2_cluster: <... completed>
 step s2_reset: RESET ROLE;
diff --git a/src/test/regress/expected/cluster.out b/src/test/regress/expected/cluster.out
index 542c2e098c..181b55d180 100644
--- a/src/test/regress/expected/cluster.out
+++ b/src/test/regress/expected/cluster.out
@@ -353,6 +353,8 @@ INSERT INTO clstr_3 VALUES (1);
 -- has not been clustered
 SET SESSION AUTHORIZATION regress_clstr_user;
 CLUSTER;
+WARNING:  permission denied to cluster "pg_toast_826", skipping it
+WARNING:  permission denied to cluster "clstr_2", skipping it
 SELECT * FROM clstr_1 UNION ALL
   SELECT * FROM clstr_2 UNION ALL
   SELECT * FROM clstr_3;
@@ -513,7 +515,7 @@ SELECT a.relname, a.relfilenode=b.relfilenode FROM pg_class a
 -----------+----------
  ptnowner  | t
  ptnowner1 | f
- ptnowner2 | t
+ ptnowner2 | f
 (3 rows)
 
 DROP TABLE ptnowner;
diff --git a/src/test/regress/expected/vacuum.out b/src/test/regress/expected/vacuum.out
index 0035d158b7..41ff4ba9cf 100644
--- a/src/test/regress/expected/vacuum.out
+++ b/src/test/regress/expected/vacuum.out
@@ -347,20 +347,14 @@ ALTER TABLE vacowned_parted OWNER TO regress_vacuum;
 ALTER TABLE vacowned_part1 OWNER TO regress_vacuum;
 SET ROLE regress_vacuum;
 VACUUM vacowned_parted;
-WARNING:  permission denied to vacuum "vacowned_part2", skipping it
 VACUUM vacowned_part1;
 VACUUM vacowned_part2;
-WARNING:  permission denied to vacuum "vacowned_part2", skipping it
 ANALYZE vacowned_parted;
-WARNING:  permission denied to analyze "vacowned_part2", skipping it
 ANALYZE vacowned_part1;
 ANALYZE vacowned_part2;
-WARNING:  permission denied to analyze "vacowned_part2", skipping it
 VACUUM (ANALYZE) vacowned_parted;
-WARNING:  permission denied to vacuum "vacowned_part2", skipping it
 VACUUM (ANALYZE) vacowned_part1;
 VACUUM (ANALYZE) vacowned_part2;
-WARNING:  permission denied to vacuum "vacowned_part2", skipping it
 RESET ROLE;
 -- Only one partition owned by other user.
 ALTER TABLE vacowned_parted OWNER TO CURRENT_USER;
@@ -389,26 +383,14 @@ ALTER TABLE vacowned_parted OWNER TO regress_vacuum;
 ALTER TABLE vacowned_part1 OWNER TO CURRENT_USER;
 SET ROLE regress_vacuum;
 VACUUM vacowned_parted;
-WARNING:  permission denied to vacuum "vacowned_part1", skipping it
-WARNING:  permission denied to vacuum "vacowned_part2", skipping it
 VACUUM vacowned_part1;
-WARNING:  permission denied to vacuum "vacowned_part1", skipping it
 VACUUM vacowned_part2;
-WARNING:  permission denied to vacuum "vacowned_part2", skipping it
 ANALYZE vacowned_parted;
-WARNING:  permission denied to analyze "vacowned_part1", skipping it
-WARNING:  permission denied to analyze "vacowned_part2", skipping it
 ANALYZE vacowned_part1;
-WARNING:  permission denied to analyze "vacowned_part1", skipping it
 ANALYZE vacowned_part2;
-WARNING:  permission denied to analyze "vacowned_part2", skipping it
 VACUUM (ANALYZE) vacowned_parted;
-WARNING:  permission denied to vacuum "vacowned_part1", skipping it
-WARNING:  permission denied to vacuum "vacowned_part2", skipping it
 VACUUM (ANALYZE) vacowned_part1;
-WARNING:  permission denied to vacuum "vacowned_part1", skipping it
 VACUUM (ANALYZE) vacowned_part2;
-WARNING:  permission denied to vacuum "vacowned_part2", skipping it
 RESET ROLE;
 DROP TABLE vacowned;
 DROP TABLE vacowned_parted;
-- 
2.25.1

Reply via email to