On Tue, 27 Mar 2018 23:28:04 +0900
Yugo Nagata <nag...@sraoss.co.jp> wrote:

I found the previous patch was broken and this can't handle
views that has subqueries as bellow;

 CREATE VIEW lock_view6 AS SELECT * from (select * from lock_tbl1) sub;

I fixed this and attached the updated version including additional tests.

Regards,

> On Tue, 6 Feb 2018 11:12:37 -0500
> Robert Haas <robertmh...@gmail.com> wrote:
> 
> > On Tue, Feb 6, 2018 at 1:28 AM, Tatsuo Ishii <is...@sraoss.co.jp> wrote:
> > >> But what does that have to do with locking?
> > >
> > > Well, if the view is not updatable, I think there will be less point
> > > to allow to lock the base tables in the view because locking is
> > > typically used in a case when updates are required.
> > >
> > > Of course we could add special triggers to allow to update views that
> > > are not automatically updatable but that kind of views are tend to
> > > complex and IMO there's less need the automatic view locking feature.
> > 
> > Hmm.  Well, I see now why you've designed the feature in the way that
> > you have, but I guess it still seems somewhat arbitrary to me.  If you
> > ignore the deadlock consideration, then there's no reason not to
> > define the feature as locking every table mentioned anywhere in the
> > query, including subqueries, and it can work for all views whether
> > updatable or not.  If the deadlock consideration is controlling, then
> > I guess we can't do better than what you have, but I'm not sure how
> > future-proof it is.  If in the future somebody makes views updateable
> > that involve a join, say from the primary key of one table to a unique
> > key of another so that no duplicate rows can be introduced, then
> > they'll either have to write code to make this feature identify and
> > lock the "main" table, which I'm not sure would be strong enough in
> > all cases, or lock them all, which reintroduces the deadlock problem.
> > 
> > Personally, I would be inclined to view the deadlock problem as not
> > very important.  I just don't see how that is going to come up very
> 
> I agree that the deadlock won't occur very often and this is not
> so important.
> 
> I have updated the lockable-view patch to v8.
> 
> This new version doen't consider the deadlock problem, and all tables
> or views appearing in the view definition query are locked recursively.
> Also, this allows all kinds of views to be locked even if it is not
> auto-updatable view.
> 
> 
> > often.  What I do think will be an issue is that if you start locking
> > lots of tables, you might prevent the system from getting much work
> > done, whether or not you also cause any deadlocks.  But I don't see
> > what we can do about that, really.  If users want full control over
> > which tables get locked, then they have to name them explicitly.  Or
> > alternatively, maybe they should avoid the need for full-table locks
> > by using SSI, gaining the benefits of (1) optimistic rather than
> > pessimistic concurrency control, (2) finer-grained locking, and (3)
> > not needing to issue explicit LOCK commands.
> 
> 
> 
> > -- 
> > Robert Haas
> > EnterpriseDB: http://www.enterprisedb.com
> > The Enterprise PostgreSQL Company
> 
> 
> -- 
> Yugo Nagata <nag...@sraoss.co.jp>


-- 
Yugo Nagata <nag...@sraoss.co.jp>
diff --git a/doc/src/sgml/ref/lock.sgml b/doc/src/sgml/ref/lock.sgml
index b2c7203..96d477c 100644
--- a/doc/src/sgml/ref/lock.sgml
+++ b/doc/src/sgml/ref/lock.sgml
@@ -46,6 +46,11 @@ LOCK [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [ * ]
   </para>
 
   <para>
+   When a view is specified to be locked, all relations appearing in the view
+   definition query are also locked recursively with the same lock mode. 
+  </para>
+
+  <para>
    When acquiring locks automatically for commands that reference
    tables, <productname>PostgreSQL</productname> always uses the least
    restrictive lock mode possible. <command>LOCK TABLE</command>
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 6479dcb..e15d87d 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -23,11 +23,15 @@
 #include "utils/acl.h"
 #include "utils/lsyscache.h"
 #include "utils/syscache.h"
+#include "rewrite/rewriteHandler.h"
+#include "access/heapam.h"
+#include "nodes/nodeFuncs.h"
 
-static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait);
-static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode);
+static void LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid);
+static AclResult LockTableAclCheck(Oid relid, LOCKMODE lockmode, Oid userid);
 static void RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid,
 							 Oid oldrelid, void *arg);
+static void LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait);
 
 /*
  * LOCK TABLE
@@ -62,8 +66,10 @@ LockTableCommand(LockStmt *lockstmt)
 										  RangeVarCallbackForLockTable,
 										  (void *) &lockstmt->mode);
 
-		if (recurse)
-			LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait);
+		if (get_rel_relkind(reloid) == RELKIND_VIEW)
+			LockViewRecurse(reloid, reloid, lockstmt->mode, lockstmt->nowait);
+		else if (recurse)
+			LockTableRecurse(reloid, lockstmt->mode, lockstmt->nowait, GetUserId());
 	}
 }
 
@@ -86,15 +92,17 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
 		return;					/* woops, concurrently dropped; no permissions
 								 * check */
 
-	/* Currently, we only allow plain tables to be locked */
-	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+
+	/* Currently, we only allow plain tables or views to be locked */
+	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
+		relkind != RELKIND_VIEW)
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-				 errmsg("\"%s\" is not a table",
+				 errmsg("\"%s\" is not a table or view",
 						rv->relname)));
 
 	/* Check permissions. */
-	aclresult = LockTableAclCheck(relid, lockmode);
+	aclresult = LockTableAclCheck(relid, lockmode, GetUserId());
 	if (aclresult != ACLCHECK_OK)
 		aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
 }
@@ -107,7 +115,7 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid,
  * multiply-inheriting children more than once, but that's no problem.
  */
 static void
-LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
+LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait, Oid userid)
 {
 	List	   *children;
 	ListCell   *lc;
@@ -120,7 +128,7 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
 		AclResult	aclresult;
 
 		/* Check permissions before acquiring the lock. */
-		aclresult = LockTableAclCheck(childreloid, lockmode);
+		aclresult = LockTableAclCheck(childreloid, lockmode, userid);
 		if (aclresult != ACLCHECK_OK)
 		{
 			char	   *relname = get_rel_name(childreloid);
@@ -157,15 +165,120 @@ LockTableRecurse(Oid reloid, LOCKMODE lockmode, bool nowait)
 			continue;
 		}
 
-		LockTableRecurse(childreloid, lockmode, nowait);
+		LockTableRecurse(childreloid, lockmode, nowait, userid);
 	}
 }
 
 /*
+ * Apply LOCK TABLE recursively over a view
+ *
+ * All tables and views appearing in the view definition query are locked
+ * recursively with the same lock mode.
+ */
+
+typedef struct
+{
+	Oid root_reloid;
+	LOCKMODE lockmode;
+	bool nowait;
+	Oid viewowner;
+	Oid viewoid
+} LockViewRecurse_context;
+
+static bool
+LockViewRecurse_walker(Node *node, LockViewRecurse_context *context)
+{
+	if (node == NULL)
+		return false;
+
+	if (IsA(node, Query))
+	{
+		Query		*query = (Query *) node;
+		ListCell	*rtable;
+
+		foreach(rtable, query->rtable)
+		{
+			RangeTblEntry	*rte = lfirst(rtable);
+			AclResult		 aclresult;
+
+			Oid relid = rte->relid;
+			char relkind = rte->relkind;
+			char *relname = get_rel_name(relid);
+
+			/* The OLD and NEW placeholder entries in the view's rtable are skipped. */
+			if (relid == context->viewoid &&
+				(!strcmp(rte->eref->aliasname, "old") || !strcmp(rte->eref->aliasname, "new")))
+				continue;
+
+			/* Currently, we only allow plain tables or views to be locked. */
+			if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE &&
+				relkind != RELKIND_VIEW)
+				continue;
+
+			/* Check infinite recursion in the view definition. */
+			if (relid == context->root_reloid)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						errmsg("infinite recursion detected in rules for relation \"%s\"",
+								get_rel_name(context->root_reloid))));
+
+			/* Check permissions with the view owner's privilege. */
+			aclresult = LockTableAclCheck(relid, context->lockmode, context->viewowner);
+			if (aclresult != ACLCHECK_OK)
+				aclcheck_error(aclresult, get_relkind_objtype(relkind), relname);
+
+			/* We have enough rights to lock the relation; do so. */
+			if (!context->nowait)
+				LockRelationOid(relid, context->lockmode);
+			else if (!ConditionalLockRelationOid(relid, context->lockmode))
+				ereport(ERROR,
+						(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+						errmsg("could not obtain lock on relation \"%s\"",
+								relname)));
+
+			if (relkind == RELKIND_VIEW)
+				LockViewRecurse(relid, context->root_reloid, context->lockmode, context->nowait);
+			else if (rte->inh)
+				LockTableRecurse(relid, context->lockmode, context->nowait, context->viewowner);
+		}
+
+		return query_tree_walker(query,
+								 LockViewRecurse_walker,
+								 context,
+								 QTW_IGNORE_JOINALIASES);
+	}
+
+	return expression_tree_walker(node,
+								  LockViewRecurse_walker,
+								  context);
+}
+
+static void
+LockViewRecurse(Oid reloid, Oid root_reloid, LOCKMODE lockmode, bool nowait)
+{
+	LockViewRecurse_context context;
+
+	Relation		 view;
+	Query			*viewquery;
+
+	view = heap_open(reloid, NoLock);
+	viewquery = get_view_query(view);
+	heap_close(view, NoLock);
+
+	context.root_reloid = root_reloid;
+	context.lockmode = lockmode;
+	context.nowait = nowait;
+	context.viewowner = view->rd_rel->relowner;
+	context.viewoid = reloid;
+
+	LockViewRecurse_walker((Node *) viewquery, &context);
+}
+
+/*
  * Check whether the current user is permitted to lock this relation.
  */
 static AclResult
-LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
+LockTableAclCheck(Oid reloid, LOCKMODE lockmode, Oid userid)
 {
 	AclResult	aclresult;
 	AclMode		aclmask;
@@ -178,7 +291,7 @@ LockTableAclCheck(Oid reloid, LOCKMODE lockmode)
 	else
 		aclmask = ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE;
 
-	aclresult = pg_class_aclcheck(reloid, GetUserId(), aclmask);
+	aclresult = pg_class_aclcheck(reloid, userid, aclmask);
 
 	return aclresult;
 }
diff --git a/src/test/regress/expected/lock.out b/src/test/regress/expected/lock.out
index 74a434d..7954c45 100644
--- a/src/test/regress/expected/lock.out
+++ b/src/test/regress/expected/lock.out
@@ -5,7 +5,13 @@
 CREATE SCHEMA lock_schema1;
 SET search_path = lock_schema1;
 CREATE TABLE lock_tbl1 (a BIGINT);
-CREATE VIEW lock_view1 AS SELECT 1;
+CREATE TABLE lock_tbl1a (a BIGINT);
+CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1;
+CREATE VIEW lock_view2(a,b) AS SELECT * FROM lock_tbl1, lock_tbl1a;
+CREATE VIEW lock_view3 AS SELECT * from lock_view2;
+CREATE VIEW lock_view4 AS SELECT (select a from lock_tbl1a limit 1) from lock_tbl1;
+CREATE VIEW lock_view5 AS SELECT * from lock_tbl1 where a in (select * from lock_tbl1a);
+CREATE VIEW lock_view6 AS SELECT * from (select * from lock_tbl1) sub;
 CREATE ROLE regress_rol_lock1;
 ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1;
 GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1;
@@ -30,8 +36,90 @@ LOCK TABLE lock_tbl1 IN SHARE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT;
-LOCK TABLE lock_view1 IN EXCLUSIVE MODE;   -- Will fail; can't lock a non-table
-ERROR:  "lock_view1" is not a table
+ROLLBACK;
+-- Verify that we can lock views.
+BEGIN TRANSACTION;
+LOCK TABLE lock_view1 IN EXCLUSIVE MODE;
+-- lock_view1 and lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_view1
+(2 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view2 IN EXCLUSIVE MODE;
+-- lock_view1, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view2
+(3 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view3 IN EXCLUSIVE MODE;
+-- lock_view3, lock_view2, lock_tbl1, and lock_tbl1a are locked recursively.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view2
+ lock_view3
+(4 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view4 IN EXCLUSIVE MODE;
+-- lock_view4, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view4
+(3 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view5 IN EXCLUSIVE MODE;
+-- lock_view5, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_tbl1a
+ lock_view5
+(3 rows)
+
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view6 IN EXCLUSIVE MODE;
+-- lock_view6 an lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+  relname   
+------------
+ lock_tbl1
+ lock_view6
+(2 rows)
+
 ROLLBACK;
 -- Verify that we can lock a table with inheritance children.
 CREATE TABLE lock_tbl2 (b BIGINT) INHERITS (lock_tbl1);
@@ -54,10 +142,16 @@ RESET ROLE;
 --
 -- Clean up
 --
+DROP VIEW lock_view6;
+DROP VIEW lock_view5;
+DROP VIEW lock_view4;
+DROP VIEW lock_view3;
+DROP VIEW lock_view2;
 DROP VIEW lock_view1;
 DROP TABLE lock_tbl3;
 DROP TABLE lock_tbl2;
 DROP TABLE lock_tbl1;
+DROP TABLE lock_tbl1a;
 DROP SCHEMA lock_schema1 CASCADE;
 DROP ROLE regress_rol_lock1;
 -- atomic ops tests
diff --git a/src/test/regress/sql/lock.sql b/src/test/regress/sql/lock.sql
index 567e8bc..91946d9 100644
--- a/src/test/regress/sql/lock.sql
+++ b/src/test/regress/sql/lock.sql
@@ -6,7 +6,13 @@
 CREATE SCHEMA lock_schema1;
 SET search_path = lock_schema1;
 CREATE TABLE lock_tbl1 (a BIGINT);
-CREATE VIEW lock_view1 AS SELECT 1;
+CREATE TABLE lock_tbl1a (a BIGINT);
+CREATE VIEW lock_view1 AS SELECT * FROM lock_tbl1;
+CREATE VIEW lock_view2(a,b) AS SELECT * FROM lock_tbl1, lock_tbl1a;
+CREATE VIEW lock_view3 AS SELECT * from lock_view2;
+CREATE VIEW lock_view4 AS SELECT (select a from lock_tbl1a limit 1) from lock_tbl1;
+CREATE VIEW lock_view5 AS SELECT * from lock_tbl1 where a in (select * from lock_tbl1a);
+CREATE VIEW lock_view6 AS SELECT * from (select * from lock_tbl1) sub;
 CREATE ROLE regress_rol_lock1;
 ALTER ROLE regress_rol_lock1 SET search_path = lock_schema1;
 GRANT USAGE ON SCHEMA lock_schema1 TO regress_rol_lock1;
@@ -33,7 +39,50 @@ LOCK TABLE lock_tbl1 IN SHARE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN SHARE ROW EXCLUSIVE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN EXCLUSIVE MODE NOWAIT;
 LOCK TABLE lock_tbl1 IN ACCESS EXCLUSIVE MODE NOWAIT;
-LOCK TABLE lock_view1 IN EXCLUSIVE MODE;   -- Will fail; can't lock a non-table
+ROLLBACK;
+
+-- Verify that we can lock views.
+BEGIN TRANSACTION;
+LOCK TABLE lock_view1 IN EXCLUSIVE MODE;
+-- lock_view1 and lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view2 IN EXCLUSIVE MODE;
+-- lock_view1, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view3 IN EXCLUSIVE MODE;
+-- lock_view3, lock_view2, lock_tbl1, and lock_tbl1a are locked recursively.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view4 IN EXCLUSIVE MODE;
+-- lock_view4, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view5 IN EXCLUSIVE MODE;
+-- lock_view5, lock_tbl1, and lock_tbl1a are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
+ROLLBACK;
+BEGIN TRANSACTION;
+LOCK TABLE lock_view6 IN EXCLUSIVE MODE;
+-- lock_view6 an lock_tbl1 are locked.
+select relname from pg_locks l, pg_class c
+ where l.relation = c.oid and mode = 'ExclusiveLock'
+ order by relname;
 ROLLBACK;
 
 -- Verify that we can lock a table with inheritance children.
@@ -58,10 +107,16 @@ RESET ROLE;
 --
 -- Clean up
 --
+DROP VIEW lock_view6;
+DROP VIEW lock_view5;
+DROP VIEW lock_view4;
+DROP VIEW lock_view3;
+DROP VIEW lock_view2;
 DROP VIEW lock_view1;
 DROP TABLE lock_tbl3;
 DROP TABLE lock_tbl2;
 DROP TABLE lock_tbl1;
+DROP TABLE lock_tbl1a;
 DROP SCHEMA lock_schema1 CASCADE;
 DROP ROLE regress_rol_lock1;
 

Reply via email to