Hi,

So, I have spent the last couple of days trying to figure out a nice
solution for VACUUM, TRUNCATE and REINDEX, and attached is a set of
three patches to address the issues of this thread:
1) 0001 does the work for TRUNCATE, but using RangeVarGetRelidExtended
with a custom callback based on the existing truncate_check_rel().  I
had to split the session-level checks into a separate routine as no
Relation is available, but that finishes by being a neat result without
changing any user-facing behavior.
2) 0002 does the work for VACUUM.  It happens that vacuum_rel() already
does all the skip-related checks we need to know about to decide if a
relation needs to be vacuum'ed or not, so I refactored things as
follows:
2-1) For a VACUUM manual listing relations, issue an ERROR if it cannot
be vacuum'ed. Previously vacuum_rel() would just log a WARNING and call
it a day *after* locking the relation.  But as we need to rely on
RangeVarGetRelidExtended() an ERROR is necessary.  The ERROR happens
only if VACUUM FULL is used.
2-2) When a relation list is not specified in a manual VACUUM command,
then the decision to skip the relation is done in get_all_vacuum_rels()
when building the relation list with the pg_class lookup.  This logs a
DEBUG message when the relation is skipped, which is more information
that what we have now.  The checks need to happen again in vacuum_rel as
the VACUUM work could be spawned across multiple transactions, where a
WARNING is logged.
3) REINDEX is already smart enough to check for ownership of relations
if one is manually listed and reports an ERROR.  However it can cause
the instance to be stuck when doing a database-wide REINDEX on a
database using just the owner of this database.  In this case it seems
to me that we need to make ReindexMultipleTables in terms of ACL
checks, as per 0003.

I quite like the shape of the patches proposed here, and the refactoring
is I think pretty clear.  Each patch can be treated independently as
well.  Comments are welcome.  (Those patches are not indented yet, which
does not matter much at this stage anyway.)

Thanks,
--
Michael
From 4dc84bb5fb0d33a5f6fedd31828adda9db8fd2d4 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 26 Jul 2018 11:51:45 +0900
Subject: [PATCH 1/3] Refactor TRUNCATE execution to avoid early lock lookups

A caller of TRUNCATE can perform early lookup obtention which can cause
other sessions to block on the request done, causing potentially DOS
attacks as even a non-privileged user can attempt a truncation of a
critical catalog table to block even all incoming connection attempts.

This commit fixes the case of TRUNCATE so as RangeVarGetRelidExtended is
used with a callback doing the necessary checks at an earlier stage,
avoiding locking issues at early stages.

Author: Michael Paquier
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526...@wrigleys.postgresql.org
---
 src/backend/commands/tablecmds.c | 83 +++++++++++++++++++++++++-------
 1 file changed, 65 insertions(+), 18 deletions(-)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index eb2d33dd86..b06c9dbf63 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -300,7 +300,10 @@ struct DropRelationCallbackState
 #define child_dependency_type(child_is_partition)	\
 	((child_is_partition) ? DEPENDENCY_AUTO : DEPENDENCY_NORMAL)
 
-static void truncate_check_rel(Relation rel);
+static void truncate_check_rel(Oid relid, Form_pg_class reltuple);
+static void truncate_check_session(Relation rel);
+static void RangeVarCallbackForTruncate(const RangeVar *relation,
+							Oid relId, Oid oldRelId, void *arg);
 static List *MergeAttributes(List *schema, List *supers, char relpersistence,
 				bool is_partition, List **supOids, List **supconstr,
 				int *supOidCount);
@@ -1336,15 +1339,25 @@ ExecuteTruncate(TruncateStmt *stmt)
 		bool		recurse = rv->inh;
 		Oid			myrelid;
 
-		rel = heap_openrv(rv, AccessExclusiveLock);
-		myrelid = RelationGetRelid(rel);
+		myrelid = RangeVarGetRelidExtended(rv, AccessExclusiveLock,
+										   false, RangeVarCallbackForTruncate,
+										   NULL);
+
+		/* open the relation, we already hold a lock on it */
+		rel = heap_open(myrelid, NoLock);
+
 		/* don't throw error for "TRUNCATE foo, foo" */
 		if (list_member_oid(relids, myrelid))
 		{
 			heap_close(rel, AccessExclusiveLock);
 			continue;
 		}
-		truncate_check_rel(rel);
+
+		/*
+		 * RangeVarGetRelidExtended has done some basic checks with its
+		 * callback, and only remain session-level checks.
+		 */
+		truncate_check_session(rel);
 		rels = lappend(rels, rel);
 		relids = lappend_oid(relids, myrelid);
 		/* Log this relation only if needed for logical decoding */
@@ -1367,7 +1380,9 @@ ExecuteTruncate(TruncateStmt *stmt)
 
 				/* find_all_inheritors already got lock */
 				rel = heap_open(childrelid, NoLock);
-				truncate_check_rel(rel);
+				truncate_check_rel(RelationGetRelid(rel), rel->rd_rel);
+				truncate_check_session(rel);
+
 				rels = lappend(rels, rel);
 				relids = lappend_oid(relids, childrelid);
 				/* Log this relation only if needed for logical decoding */
@@ -1450,7 +1465,8 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
 				ereport(NOTICE,
 						(errmsg("truncate cascades to table \"%s\"",
 								RelationGetRelationName(rel))));
-				truncate_check_rel(rel);
+				truncate_check_rel(relid, rel->rd_rel);
+				truncate_check_session(rel);
 				rels = lappend(rels, rel);
 				relids = lappend_oid(relids, relid);
 				/* Log this relation only if needed for logical decoding */
@@ -1700,38 +1716,47 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
 }
 
 /*
- * Check that a given rel is safe to truncate.  Subroutine for ExecuteTruncate
+ * Check that a given relation is safe to truncate.  Subroutine for
+ * ExecuteTruncate and RangeVarCallbackForTruncate.
  */
 static void
-truncate_check_rel(Relation rel)
+truncate_check_rel(Oid relid, Form_pg_class reltuple)
 {
 	AclResult	aclresult;
+	char	   *relname = NameStr(reltuple->relname);
 
 	/*
 	 * Only allow truncate on regular tables and partitioned tables (although,
 	 * the latter are only being included here for the following checks; no
 	 * physical truncation will occur in their case.)
 	 */
-	if (rel->rd_rel->relkind != RELKIND_RELATION &&
-		rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+	if (reltuple->relkind != RELKIND_RELATION &&
+
+		reltuple->relkind != RELKIND_PARTITIONED_TABLE)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("\"%s\" is not a table",
-						RelationGetRelationName(rel))));
+				 errmsg("\"%s\" is not a table", relname)));
 
 	/* Permissions checks */
-	aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
-								  ACL_TRUNCATE);
+	aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_TRUNCATE);
 	if (aclresult != ACLCHECK_OK)
-		aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
-					   RelationGetRelationName(rel));
+		aclcheck_error(aclresult, get_relkind_objtype(reltuple->relkind),
+					   relname);
 
-	if (!allowSystemTableMods && IsSystemRelation(rel))
+	if (!allowSystemTableMods && IsSystemClass(relid, reltuple))
 		ereport(ERROR,
 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 				 errmsg("permission denied: \"%s\" is a system catalog",
-						RelationGetRelationName(rel))));
+						relname)));
+}
 
+/*
+ * Set of sanity checks at session-level to check if a given relation
+ * is safe to truncate.
+ */
+static void
+truncate_check_session(Relation rel)
+{
 	/*
 	 * Don't allow truncate on temp tables of other backends ... their local
 	 * buffer manager is not going to cope.
@@ -13419,6 +13444,28 @@ RangeVarCallbackOwnsTable(const RangeVar *relation,
 		aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relId)), relation->relname);
 }
 
+/*
+ * Callback to RangeVarGetRelidExtended() for TRUNCATE processing.
+ */
+static void
+RangeVarCallbackForTruncate(const RangeVar *relation,
+							Oid relId, Oid oldRelId, void *arg)
+{
+	HeapTuple	tuple;
+
+	/* Nothing to do if the relation was not found. */
+	if (!OidIsValid(relId))
+		return;
+
+	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
+	if (!HeapTupleIsValid(tuple))	/* should not happen */
+		elog(ERROR, "cache lookup failed for relation %u", relId);
+
+	truncate_check_rel(relId, (Form_pg_class) GETSTRUCT(tuple));
+
+	ReleaseSysCache(tuple);
+}
+
 /*
  * Callback to RangeVarGetRelidExtended(), similar to
  * RangeVarCallbackOwnsTable() but without checks on the type of the relation.
-- 
2.18.0

From 0812127447a77029f669c5baa0eaca48f7397315 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 26 Jul 2018 13:38:05 +0900
Subject: [PATCH 2/3] Refactor VACUUM execution to avoid early lock lookups

A caller of VACUUM can perform early lookup obtention which can cause
other sessions to block on the request done, causing potentially DOS
attacks as even a non-privileged user can attempt a truncation of a
critical catalog table to block even all incoming connection attempts.

Contrary to TRUNCATE, a client could attempt a system-wide VACUUM after
building the list of relations to VACUUM, which can cause vacuum_rel()
to try to lock the relation but the thing would just lock.  When the
client specifies a list of relations and the relation needs to be
skipped, fail hard so as there is no conflict with any relation a user
has no rights to work on.

vacuum_rel() already had the sanity checks needed, except that those
were applied too late.  This commit refactors the code so as relation
skips are checked beforehand, making it safe to lock lookup attacks.

Author: Michael Paquier
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526...@wrigleys.postgresql.org
---
 src/backend/commands/vacuum.c | 168 +++++++++++++++++++++++-----------
 1 file changed, 116 insertions(+), 52 deletions(-)

diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 5736f12b8f..502564db68 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -30,11 +30,13 @@
 #include "access/multixact.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "commands/cluster.h"
+#include "commands/tablecmds.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
@@ -47,6 +49,7 @@
 #include "utils/acl.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
@@ -68,7 +71,10 @@ static BufferAccessStrategy vac_strategy;
 
 
 /* non-export function prototypes */
-static List *expand_vacuum_rel(VacuumRelation *vrel);
+static void RangeVarCallbackForVacuum(const RangeVar *relation,
+				  Oid relId, Oid oldRelId, void *arg);
+static bool vacuum_skip_rel(Oid relid, Form_pg_class reltuple, int elevel);
+static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
 static List *get_all_vacuum_rels(void);
 static void vac_truncate_clog(TransactionId frozenXID,
 				  MultiXactId minMulti,
@@ -257,7 +263,7 @@ vacuum(int options, List *relations, VacuumParams *params,
 			List	   *sublist;
 			MemoryContext old_context;
 
-			sublist = expand_vacuum_rel(vrel);
+			sublist = expand_vacuum_rel(vrel, options);
 			old_context = MemoryContextSwitchTo(vac_context);
 			newrels = list_concat(newrels, sublist);
 			MemoryContextSwitchTo(old_context);
@@ -408,6 +414,101 @@ vacuum(int options, List *relations, VacuumParams *params,
 	vac_context = NULL;
 }
 
+/*
+ * Callback to RangeVarGetRelidExtended() for VACUUM processing when
+ * the list of relations to work on is provided by caller of a manual
+ * VACUUM.
+ */
+void
+RangeVarCallbackForVacuum(const RangeVar *relation,
+						  Oid relId, Oid oldRelId, void *arg)
+{
+	HeapTuple	tuple;
+	int		   *options = (int *) arg;
+	Form_pg_class reltuple;
+
+	Assert(!IsAutoVacuumWorkerProcess());
+
+	/* Nothing to do if the relation was not found. */
+	if (!OidIsValid(relId))
+		return;
+
+	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relId));
+	if (!HeapTupleIsValid(tuple))	/* should not happen */
+		elog(ERROR, "cache lookup failed for relation %u", relId);
+
+	reltuple = (Form_pg_class) GETSTRUCT(tuple);
+
+	/* Check permissions */
+
+	/*
+	 * VACUUM FULL takes an exclusive lock on the relation vacuumed, hence
+	 * restrict its access similarly to what vacuum_rel() would do.  Issuing
+	 * an ERROR makes sure that there is no lock escalation when a
+	 * non-privileged user lists this relation.
+	 */
+	if (((*options) & VACOPT_FULL) != 0)
+	{
+		(void) vacuum_skip_rel(relId, reltuple, ERROR);
+	}
+
+	ReleaseSysCache(tuple);
+}
+
+/*
+ * Check if a given relation can be safely vacuumed or not.  If the
+ * relation needs to be skipped, issue a log message and return true to
+ * let the caller know what to do with this relation.
+ */
+static bool
+vacuum_skip_rel(Oid relid, Form_pg_class reltuple, int elevel)
+{
+	/*
+	 * Check permissions.
+	 *
+	 * We allow the user to vacuum a table if he is superuser, the table
+	 * owner, or the database owner (but in the latter case, only if it's not
+	 * a shared relation).  pg_class_ownercheck includes the superuser case.
+	 *
+	 * Note we choose to treat permissions failure as a WARNING and keep
+	 * trying to vacuum the rest of the DB --- is this appropriate?
+	 */
+	if (!(pg_class_ownercheck(relid, GetUserId()) ||
+		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !reltuple->relisshared)))
+	{
+		if (reltuple->relisshared)
+			ereport(elevel,
+					(errmsg("skipping \"%s\" --- only superuser can vacuum it",
+							NameStr(reltuple->relname))));
+		else if (reltuple->relnamespace == PG_CATALOG_NAMESPACE)
+			ereport(elevel,
+					(errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
+							NameStr(reltuple->relname))));
+		else
+			ereport(elevel,
+					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
+							NameStr(reltuple->relname))));
+		return true;
+	}
+
+	/*
+	 * Check that it's of a vacuumable relkind.
+	 */
+	if (reltuple->relkind != RELKIND_RELATION &&
+		reltuple->relkind != RELKIND_MATVIEW &&
+		reltuple->relkind != RELKIND_TOASTVALUE &&
+		reltuple->relkind != RELKIND_PARTITIONED_TABLE)
+	{
+		ereport(elevel,
+				(errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
+						NameStr(reltuple->relname))));
+		return true;
+	}
+
+	return false;
+}
+
+
 /*
  * Given a VacuumRelation, fill in the table OID if it wasn't specified,
  * and optionally add VacuumRelations for partitions of the table.
@@ -423,7 +524,7 @@ vacuum(int options, List *relations, VacuumParams *params,
  * are made in vac_context.
  */
 static List *
-expand_vacuum_rel(VacuumRelation *vrel)
+expand_vacuum_rel(VacuumRelation *vrel, int options)
 {
 	List	   *vacrels = NIL;
 	MemoryContext oldcontext;
@@ -454,7 +555,9 @@ expand_vacuum_rel(VacuumRelation *vrel)
 		 * below, as well as find_all_inheritors's expectation that the caller
 		 * holds some lock on the starting relation.
 		 */
-		relid = RangeVarGetRelid(vrel->relation, AccessShareLock, false);
+		relid = RangeVarGetRelidExtended(vrel->relation, AccessShareLock,
+										 false, RangeVarCallbackForVacuum,
+										 (void *) &options);
 
 		/*
 		 * Make a returnable VacuumRelation for this rel.
@@ -545,15 +648,10 @@ get_all_vacuum_rels(void)
 	{
 		Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
 		MemoryContext oldcontext;
+		Oid			relid = HeapTupleGetOid(tuple);
 
-		/*
-		 * We include partitioned tables here; depending on which operation is
-		 * to be performed, caller will decide whether to process or ignore
-		 * them.
-		 */
-		if (classForm->relkind != RELKIND_RELATION &&
-			classForm->relkind != RELKIND_MATVIEW &&
-			classForm->relkind != RELKIND_PARTITIONED_TABLE)
+		/* check permissions of relation */
+		if (vacuum_skip_rel(relid, classForm, DEBUG1))
 			continue;
 
 		/*
@@ -563,7 +661,7 @@ get_all_vacuum_rels(void)
 		 */
 		oldcontext = MemoryContextSwitchTo(vac_context);
 		vacrels = lappend(vacrels, makeVacuumRelation(NULL,
-													  HeapTupleGetOid(tuple),
+													  relid,
 													  NIL));
 		MemoryContextSwitchTo(oldcontext);
 	}
@@ -1436,47 +1534,13 @@ vacuum_rel(Oid relid, RangeVar *relation, int options, VacuumParams *params)
 	}
 
 	/*
-	 * Check permissions.
-	 *
-	 * We allow the user to vacuum a table if he is superuser, the table
-	 * owner, or the database owner (but in the latter case, only if it's not
-	 * a shared relation).  pg_class_ownercheck includes the superuser case.
-	 *
-	 * Note we choose to treat permissions failure as a WARNING and keep
-	 * trying to vacuum the rest of the DB --- is this appropriate?
+	 * Check if relation needs to be skipped.  This check happens as well
+	 * when building the relation list to VACUUM for a manual operation,
+	 * and needs to be done here as well as this could be spawned across
+	 * multiple transactions.
 	 */
-	if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
-		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
+	if (vacuum_skip_rel(RelationGetRelid(onerel), onerel->rd_rel, WARNING))
 	{
-		if (onerel->rd_rel->relisshared)
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only superuser can vacuum it",
-							RelationGetRelationName(onerel))));
-		else if (onerel->rd_rel->relnamespace == PG_CATALOG_NAMESPACE)
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only superuser or database owner can vacuum it",
-							RelationGetRelationName(onerel))));
-		else
-			ereport(WARNING,
-					(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
-							RelationGetRelationName(onerel))));
-		relation_close(onerel, lmode);
-		PopActiveSnapshot();
-		CommitTransactionCommand();
-		return false;
-	}
-
-	/*
-	 * Check that it's of a vacuumable relkind.
-	 */
-	if (onerel->rd_rel->relkind != RELKIND_RELATION &&
-		onerel->rd_rel->relkind != RELKIND_MATVIEW &&
-		onerel->rd_rel->relkind != RELKIND_TOASTVALUE &&
-		onerel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-	{
-		ereport(WARNING,
-				(errmsg("skipping \"%s\" --- cannot vacuum non-tables or special system tables",
-						RelationGetRelationName(onerel))));
 		relation_close(onerel, lmode);
 		PopActiveSnapshot();
 		CommitTransactionCommand();
-- 
2.18.0

From 07c396c208bd0f689810aca08995be17c826f3cb Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@paquier.xyz>
Date: Thu, 26 Jul 2018 14:12:38 +0900
Subject: [PATCH 3/3] Restrict access to system-wide REINDEX for non-privileged
 users

A database owner running a system-wide REINDEX has the possibility to
also do the operation on critical system catalogs, which can cause a
PostgreSQL to go unresponsive or even block authentication.  This commit
makes sure that a user running a REINDEX SYSTEM or DATABASE only works
on the following relations:
- The user is a superuser
- The user is the table owner
- The user is the database owner, only if the relation worked on is not
shared.

Author: Michael Paquier
Discussion: https://postgr.es/m/152512087100.19803.12733865831237526...@wrigleys.postgresql.org
---
 src/backend/commands/indexcmds.c | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index b9dad9672e..1bfc34bc70 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -2415,6 +2415,16 @@ ReindexMultipleTables(const char *objectName, ReindexObjectType objectKind,
 			!IsSystemClass(relid, classtuple))
 			continue;
 
+		/*
+		 * We allow the user to reindex a table if he is superuser, the table
+		 * owner, or the database owner (but in the latter case, only if it's not
+		 * a shared relation).
+		 */
+		if (!(pg_class_ownercheck(relid, GetUserId()) ||
+			  (pg_database_ownercheck(MyDatabaseId, GetUserId()) &&
+			   !classtuple->relisshared)))
+			continue;
+
 		/* Save the list of relation OIDs in private context */
 		old = MemoryContextSwitchTo(private_context);
 
-- 
2.18.0

Attachment: signature.asc
Description: PGP signature

Reply via email to