On Tue, 6 Feb 2018 11:12:37 -0500
Robert Haas <robertmh...@gmail.com> wrote:

> On Tue, Feb 6, 2018 at 1:28 AM, Tatsuo Ishii <is...@sraoss.co.jp> wrote:
> >> But what does that have to do with locking?
> >
> > Well, if the view is not updatable, I think there will be less point
> > to allow to lock the base tables in the view because locking is
> > typically used in a case when updates are required.
> >
> > Of course we could add special triggers to allow to update views that
> > are not automatically updatable but that kind of views are tend to
> > complex and IMO there's less need the automatic view locking feature.
> 
> Hmm.  Well, I see now why you've designed the feature in the way that
> you have, but I guess it still seems somewhat arbitrary to me.  If you
> ignore the deadlock consideration, then there's no reason not to
> define the feature as locking every table mentioned anywhere in the
> query, including subqueries, and it can work for all views whether
> updatable or not.  If the deadlock consideration is controlling, then
> I guess we can't do better than what you have, but I'm not sure how
> future-proof it is.  If in the future somebody makes views updateable
> that involve a join, say from the primary key of one table to a unique
> key of another so that no duplicate rows can be introduced, then
> they'll either have to write code to make this feature identify and
> lock the "main" table, which I'm not sure would be strong enough in
> all cases, or lock them all, which reintroduces the deadlock problem.
> 
> Personally, I would be inclined to view the deadlock problem as not
> very important.  I just don't see how that is going to come up very

I agree that the deadlock won't occur very often and this is not
so important.

I have updated the lockable-view patch to v8.

This new version doen't consider the deadlock problem, and all tables
or views appearing in the view definition query are locked recursively.
Also, this allows all kinds of views to be locked even if it is not
auto-updatable view.


> often.  What I do think will be an issue is that if you start locking
> lots of tables, you might prevent the system from getting much work
> done, whether or not you also cause any deadlocks.  But I don't see
> what we can do about that, really.  If users want full control over
> which tables get locked, then they have to name them explicitly.  Or
> alternatively, maybe they should avoid the need for full-table locks
> by using SSI, gaining the benefits of (1) optimistic rather than
> pessimistic concurrency control, (2) finer-grained locking, and (3)
> not needing to issue explicit LOCK commands.



> -- 
> Robert Haas
> EnterpriseDB: http://www.enterprisedb.com
> The Enterprise PostgreSQL Company


-- 
Yugo Nagata <nag...@sraoss.co.jp>
diff --git a/doc/src/sgml/ref/lock.sgml b/doc/src/sgml/ref/lock.sgml
index b2c7203..96d477c 100644
--- a/doc/src/sgml/ref/lock.sgml
+++ b/doc/src/sgml/ref/lock.sgml
@@ -46,6 +46,11 @@ LOCK [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [ * ]
   </para>
 
   <para>
+   When a view is specified to be locked, all relations appearing in the view
+   definition query are also locked recursively with the same lock mode. 
+  </para>
+
+  <para>
    When acquiring locks automatically for commands that reference
    tables, <productname>PostgreSQL</productname> always uses the least
    restrictive lock mode possible. <command>LOCK TABLE</command>
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 6479dcb..daf3d81 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -23,11 +23,15 @@
 #include "utils/acl.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
+#include "rewrite/rewriteHandler.h"
+#include "access/heapam.h"
+#include "nodes/nodeFuncs.h"
 
-static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
-static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
+static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid);
+static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid);
 static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
 							 Oid oldrelid, void *arg);
+static void LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait);
 
 /*
  * LOCK TABLE
@@ -62,8 +66,10 @@ LockTableCommand(LockStmt *lockstmt)
 										  RangeVarCallbackForLockTable,
 										  (void *) &lockstmt->mode);
 
-		if (recurse)
-			LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
+		if (get_rel_relkind(reloid) == RELKIND_VIEW)
+			LockViewRecurse(reloid, reloid, lockstmt->mode, lockstmt->nowait);
+		else if (recurse)
+			LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId());
 	}
 }
 
@@ -86,15 +92,17 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
 		return;					/* woops, concurrently dropped; no permissions
 								 * check */
 
-	/* Currently, we only allow plain tables to be locked */
-	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+
+	/* Currently, we only allow plain tables or views to be locked */
+	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
+		relkind != RELKIND_VIEW)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("\"%s\" is not a table",
+				 errmsg("\"%s\" is not a table or view",
 						rv->relname)));
 
 	/* Check permissions. */
-	aclresult = LockTableAclCheck(relid, lockmode);
+	aclresult = LockTableAclCheck(relid, lockmode, GetUserId());
 	if (aclresult != ACLCHECK_OK)
 		aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
 }
@@ -107,7 +115,7 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
  * multiply-inheriting children more than once, but that's no problem.
  */
 static void
-LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
+LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid)
 {
 	List	   *children;
 	ListCell   *lc;
@@ -120,7 +128,7 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
 		AclResult	aclresult;
 
 		/* Check permissions before acquiring the lock. */
-		aclresult = LockTableAclCheck(childreloid, lockmode);
+		aclresult = LockTableAclCheck(childreloid, lockmode, userid);
 		if (aclresult != ACLCHECK_OK)
 		{
 			char	   *relname = get_rel_name(childreloid);
@@ -157,15 +165,120 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
 			continue;
 		}
 
-		LockTableRecurse(childreloid, lockmode, nowait);
+		LockTableRecurse(childreloid, lockmode, nowait, userid);
 	}
 }
 
 /*
+ * Apply LOCK TABLE recursively over a view
+ *
+ * All tables and views appearing in the view definition query are locked
+ * recursively with the same lock mode. 
+ */
+
+typedef struct
+{
+	Oid root_reloid;
+	LOCKMODE lockmode;
+	bool nowait;
+	Oid viewowner;
+} LockViewRecurse_context;
+
+static bool
+LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
+{
+	if (node == NULL)
+		return false;
+
+	if (IsA(node, Query))
+	{
+		Query		*query = (Query *) node;
+		ListCell	*rtable;
+
+		foreach(rtable, query->rtable)
+		{
+			RangeTblEntry	*rte = lfirst(rtable);
+			AclResult		 aclresult;
+
+			Oid relid = rte->relid;
+			char relkind = rte->relkind;
+			char *relname = get_rel_name(relid);
+
+			/* The OLD and NEW placeholder entries are skipped */
+			if (!strcmp(rte->eref->aliasname, "old") || !strcmp(rte->eref->aliasname, "new"))
+				continue;
+
+			/* Currently, we only allow plain tables or views to be locked */
+			if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
+				relkind != RELKIND_VIEW)
+				ereport(ERROR,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						errmsg("\"%s\" is not a table or view",
+						relname)));
+
+			/* Check infinite recursion in the view definition */
+			if (relid == context->root_reloid)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						errmsg("infinite recursion detected in rules for relation \"%s\"",
+								get_rel_name(context->root_reloid))));
+
+			/* Check permissions with the view owner's privilege. */
+			aclresult = LockTableAclCheck(relid, context->lockmode, context->viewowner);
+			if (aclresult != ACLCHECK_OK)
+				aclcheck_error(aclresult, get_relkind_objtype(relkind), relname);
+
+			/* We have enough rights to lock the relation; do so. */
+			if (!context->nowait)
+				LockRelationOid(relid, context->lockmode);
+			else if (!ConditionalLockRelationOid(relid, context->lockmode))
+				ereport(ERROR,
+						(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+						errmsg("could not obtain lock on relation \"%s\"",
+								relname)));
+
+			if (relkind == RELKIND_VIEW)
+				LockViewRecurse(relid, context->root_reloid, context->lockmode, context->nowait);
+			else if (rte->inh)
+				LockTableRecurse(relid, context->lockmode, context->nowait, context->viewowner);
+		}
+
+		return query_tree_walker(query,
+								 LockViewRecurse_walker,
+								 context,
+								 QTW_IGNORE_JOINALIASES);
+	}
+
+	return expression_tree_walker(node,
+								  LockViewRecurse_walker,
+								  context);
+}
+
+static void
+LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait)
+{
+	LockViewRecurse_context context;
+
+	Relation		 view;
+	Query			*viewquery;
+
+	view = heap_open(reloid, NoLock);
+	viewquery = get_view_query(view);
+	heap_close(view, NoLock);
+
+	context.root_reloid = root_reloid;
+	context.lockmode = lockmode;
+	context.nowait = nowait;
+	context.viewowner = view->rd_rel->relowner;
+
+	LockViewRecurse_walker((Node *) viewquery, &context);
+}
+
+/*
  * Check whether the current user is permitted to lock this relation.
  */
 static AclResult
-LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
+LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
 {
 	AclResult	aclresult;
 	AclMode		aclmask;
@@ -178,7 +291,7 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
 	else
 		aclmask = ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE;
 
-	aclresult = pg_class_aclcheck(reloid, GetUserId(), aclmask);
+	aclresult = pg_class_aclcheck(reloid, userid, aclmask);
 
 	return aclresult;
 }
diff --git a/src/test/regress/expected/lock.out b/src/test/regress/expected/lock.out
index 74a434d..328e230 100644
--- a/src/test/regress/expected/lock.out
+++ b/src/test/regress/expected/lock.out
@@ -5,7 +5,12 @@
 CREATE SCHEMA lock_schema1;
 SET search_path = lock_schema1;
 CREATE TABLE lock_tbl1 (a BIGINT);
-CREATE VIEW lock_view1 AS SELECT 1;
+CREATE TABLE lock_tbl1a (a BIGINT);
+CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1;
+CREATE VIEW lock_view2(a,b) AS SELECT * FROM lock_tbl1, lock_tbl1a;
+CREATE VIEW lock_view3 AS SELECT * from lock_view2;
+CREATE VIEW lock_view4 AS SELECT (select a from lock_tbl1a limit 1) from lock_tbl1;
+CREATE VIEW lock_view5 AS SELECT * from lock_tbl1 where a in (select * from lock_tbl1a);
 CREATE ROLE regress_rol_lock1;
 ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1;
 GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1;
@@ -30,8 +35,82 @@ LOCK TABLE lock_tbl1 IN SHARE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT;
-LOCK TABLE lock_view1 IN EXCLUSIVE MODE;   -- Will fail; can't lock a non-table
-ERROR:  "lock_view1" is not a table
+ROLLBACK;
+-- Verify that we can lock views
+BEGIN TRANSACTION;
+LOCK TABLE lock_view1 IN EXCLUSIVE MODE;
+-- lock_view1 and lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_view1
+ pg_locks
+(3 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view2 IN EXCLUSIVE MODE;
+-- lock_view1, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view2
+ pg_locks
+(4 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view3 IN EXCLUSIVE MODE;
+-- lock_view3, lock_view2, lock_tbl1, and lock_tbl1a are locked recursively.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view2
+ lock_view3
+ pg_locks
+(5 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view4 IN EXCLUSIVE MODE;
+-- lock_view4, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view4
+ pg_locks
+(4 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view5 IN EXCLUSIVE MODE;
+-- lock_view5, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view5
+ pg_locks
+(4 rows)
+
 ROLLBACK;
 -- Verify that we can lock a table with inheritance children.
 CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1);
@@ -54,11 +133,18 @@ RESET ROLE;
 --
 -- Clean up
 --
+DROP VIEW lock_view5;
+DROP VIEW lock_view4;
+DROP VIEW lock_view3;
+DROP VIEW lock_view2;
 DROP VIEW lock_view1;
 DROP TABLE lock_tbl3;
 DROP TABLE lock_tbl2;
 DROP TABLE lock_tbl1;
+DROP TABLE no_lock_tbl;
+ERROR:  table "no_lock_tbl" does not exist
 DROP SCHEMA lock_schema1 CASCADE;
+NOTICE:  drop cascades to table lock_tbl1a
 DROP ROLE regress_rol_lock1;
 -- atomic ops tests
 RESET search_path;
diff --git a/src/test/regress/sql/lock.sql b/src/test/regress/sql/lock.sql
index 567e8bc..c9d6495 100644
--- a/src/test/regress/sql/lock.sql
+++ b/src/test/regress/sql/lock.sql
@@ -6,7 +6,12 @@
 CREATE SCHEMA lock_schema1;
 SET search_path = lock_schema1;
 CREATE TABLE lock_tbl1 (a BIGINT);
-CREATE VIEW lock_view1 AS SELECT 1;
+CREATE TABLE lock_tbl1a (a BIGINT);
+CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1;
+CREATE VIEW lock_view2(a,b) AS SELECT * FROM lock_tbl1, lock_tbl1a;
+CREATE VIEW lock_view3 AS SELECT * from lock_view2;
+CREATE VIEW lock_view4 AS SELECT (select a from lock_tbl1a limit 1) from lock_tbl1;
+CREATE VIEW lock_view5 AS SELECT * from lock_tbl1 where a in (select * from lock_tbl1a);
 CREATE ROLE regress_rol_lock1;
 ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1;
 GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1;
@@ -33,7 +38,43 @@ LOCK TABLE lock_tbl1 IN SHARE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT;
-LOCK TABLE lock_view1 IN EXCLUSIVE MODE;   -- Will fail; can't lock a non-table
+ROLLBACK;
+
+-- Verify that we can lock views
+BEGIN TRANSACTION;
+LOCK TABLE lock_view1 IN EXCLUSIVE MODE;
+-- lock_view1 and lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view2 IN EXCLUSIVE MODE;
+-- lock_view1, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view3 IN EXCLUSIVE MODE;
+-- lock_view3, lock_view2, lock_tbl1, and lock_tbl1a are locked recursively.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view4 IN EXCLUSIVE MODE;
+-- lock_view4, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view5 IN EXCLUSIVE MODE;
+-- lock_view5, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and relname like '%lock_%'
+ order by relname;
 ROLLBACK;
 
 -- Verify that we can lock a table with inheritance children.
@@ -58,10 +99,15 @@ RESET ROLE;
 --
 -- Clean up
 --
+DROP VIEW lock_view5;
+DROP VIEW lock_view4;
+DROP VIEW lock_view3;
+DROP VIEW lock_view2;
 DROP VIEW lock_view1;
 DROP TABLE lock_tbl3;
 DROP TABLE lock_tbl2;
 DROP TABLE lock_tbl1;
+DROP TABLE no_lock_tbl;
 DROP SCHEMA lock_schema1 CASCADE;
 DROP ROLE regress_rol_lock1;
 

Reply via email to