On 16 January 2012 21:30, Josh Berkus <j...@agliodbs.com> wrote:

> Useful, yes.  Harder than it looks, probably.  I tried to mock up a
> version of this years ago for a project where I needed it, and ran into
> all kinds of race conditions.

Can you remember any details about those race conditions?

> Anyway, if it could be made to work, this is extremely useful for any
> application which needs queueing behavior (with is a large plurality, if
> not a majority, of applications).

Ok, based on this feedback I decided to push further and try
implementating this.  See POC/WIP patch attached.  It seems to work
for simple examples but I haven't yet tried to break it or see how it
interacts with more complicated queries or high concurrency levels.
It probably contains at least a few rookie mistakes!  Any feedback
gratefully received.

The approach is described in my original email.  Short version:
heap_lock_tuple now takes an enum called wait_policy instead of a
boolean called nowait, with the following values:

  LockWaitBlock: wait for lock (like nowait = false before),

  LockWaitError: error if not immediately lockable (like nowait = true
                 before)

  LockWaitSkip:  give up and return HeapTupleWouldBlock if not
                 immediately lockable (this is a new policy)

The rest of the patch is about getting the appropriate value down to
that function call, following the example of the existing nowait
support, and skipping rows if you said SKIP LOCKED DATA and you got
HeapTupleWouldBlock.

Compared to one very popular commercial database's implementation, I
think this is a little bit friendlier for the user who wants to
distribute work.  Let's say you want to lock one row without lock
contention, which this patch allows with FETCH FIRST 1 ROW ONLY FOR
UPDATE SKIP LOCKED DATA in an SQL query.  In that other system, the
mechanism for limiting the number of rows fetched is done in the WHERE
clause, and therefore the N rows are counted *before* checking if the
lock can be obtained, so users sometimes have to resort to stored
procedures so they can control the FETCH from a cursor imperatively.
In another popular commercial database from Redmond, you can ask for
the top (first) N rows while using the equivalent of SKIP LOCKED DATA
and it has the same effect as this patch as far as I can tell, and
another large blue system is the same.

As discussed in another branch of this thread, you can probably get
the same effect with transactional advisory locks.  But I personally
like row skipping better as an explicit feature because:

(1) I think there might be an order-of-evaluation problem with a WHERE
clause containing both lock testing and row filtering expressions (ie
it is undefined right?) which you might need subselects to work around
(ie to be sure to avoid false positive locks, not sure about this).

(2) The advisory technique requires you to introduce an integer
identifier if you didn't already have one and then lock that despite
having already said which rows you want to try to lock in standard row
filtering expressions.

(3) The advisory technique requires all users of the table to
participate in the advisory lock protocol even if they don't want to
use the option to skip lock.

(4) It complements NOWAIT (which could also have been done with
transactional advisory locks, with a function like try_lock_or_fail).

(5) I like the idea of directly porting applications from other
databases with this feature (that is what led me here).

I can also imagine some other arguments against SKIP LOCKED DATA: "the
type of applications that would use this technique generate too much
dead tuple churn for PostgreSQL anyway" (for example, compared to the
update-tuples-in-place systems like DB2 z/OS edition where SKIP LOCKED
DATA is used to distribute work efficiently), and "databases shouldn't
be used as queues anyway, for that we have $X", and "skipping rows
provides an inconsistent view of the data" (ie a result set that never
strictly existed).

Here are some examples of previous requests or discussion of this
feature:

http://archives.postgresql.org/pgsql-general/2008-07/msg00442.php
http://archives.postgresql.org/pgsql-bugs/2003-12/msg00154.php
http://archives.postgresql.org/pgsql-general/2002-07/msg00744.php
http://blog.hydrobiont.com/2011/06/select-for-update-skip-locked-in.html

Thanks for reading!

Thomas
diff --git a/doc/src/sgml/ref/select.sgml b/doc/src/sgml/ref/select.sgml
index 01c0104..58adcae 100644
--- a/doc/src/sgml/ref/select.sgml
+++ b/doc/src/sgml/ref/select.sgml
@@ -45,7 +45,7 @@ SELECT [ ALL | DISTINCT [ ON ( <replaceable class="parameter">expression</replac
     [ LIMIT { <replaceable class="parameter">count</replaceable> | ALL } ]
     [ OFFSET <replaceable class="parameter">start</replaceable> [ ROW | ROWS ] ]
     [ FETCH { FIRST | NEXT } [ <replaceable class="parameter">count</replaceable> ] { ROW | ROWS } ONLY ]
-    [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ] [...] ]
+    [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ { NOWAIT | SKIP LOCKED DATA } ] [...] ]
 
 <phrase>where <replaceable class="parameter">from_item</replaceable> can be one of:</phrase>
 
@@ -1146,14 +1146,14 @@ FETCH { FIRST | NEXT } [ <replaceable class="parameter">count</replaceable> ] {
    <para>
     The <literal>FOR UPDATE</literal> clause has this form:
 <synopsis>
-FOR UPDATE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ]
+FOR UPDATE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ { NOWAIT | SKIP LOCKED DATA } ]
 </synopsis>
    </para>
 
    <para>
     The closely related <literal>FOR SHARE</literal> clause has this form:
 <synopsis>
-FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ]
+FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ { NOWAIT | SKIP LOCKED DATA } ]
 </synopsis>
    </para>
 
@@ -1187,9 +1187,9 @@ FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ]
 
    <para>
     To prevent the operation from waiting for other transactions to commit,
-    use the <literal>NOWAIT</> option.  With <literal>NOWAIT</>, the statement
+    use either the <literal>NOWAIT</> or <literal>SKIP LOCKED DATA</literal> option.  With <literal>NOWAIT</>, the statement
     reports an error, rather than waiting, if a selected row
-    cannot be locked immediately.  Note that <literal>NOWAIT</> applies only
+    cannot be locked immediately.  With <literal>SKIP LOCKED DATA</literal>, any selected rows that cannot be immediately locked are skipped.  Skipping locked rows provides an inconsistent view of the data, so this is not suitable for general purpose work, but can be used to avoid lock contention with multiple consumers accessing a queue-like table.  Note that <literal>NOWAIT</> and <literal>SKIP LOCKED DATA</literal> apply only
     to the row-level lock(s) &mdash; the required <literal>ROW SHARE</literal>
     table-level lock is still taken in the ordinary way (see
     <xref linkend="mvcc">).  You can use
@@ -1222,7 +1222,7 @@ FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ]
     implicitly affected) by both <literal>FOR UPDATE</literal> and
     <literal>FOR SHARE</literal> clauses, then it is processed as
     <literal>FOR UPDATE</literal>.  Similarly, a table is processed
-    as <literal>NOWAIT</> if that is specified in any of the clauses
+    as <literal>NOWAIT</>  or <literal>SKIP LOCKED DATA</literal> if that is specified in any of the clauses
     affecting it.
    </para>
 
@@ -1685,7 +1685,7 @@ SELECT distributors.* WHERE distributors.name = 'Westward';
     standard allows it only as an option of <command>DECLARE CURSOR</>.
     <productname>PostgreSQL</productname> allows it in any <command>SELECT</>
     query as well as in sub-<command>SELECT</>s, but this is an extension.
-    The <literal>FOR SHARE</> variant, and the <literal>NOWAIT</> option,
+    The <literal>FOR SHARE</> variant, and the <literal>NOWAIT</> and <literal>SKIP LOCKED DATA</literal> options,
     do not appear in the standard.
    </para>
   </refsect2>
diff --git a/doc/src/sgml/sql.sgml b/doc/src/sgml/sql.sgml
index c19e823..57fd085 100644
--- a/doc/src/sgml/sql.sgml
+++ b/doc/src/sgml/sql.sgml
@@ -863,7 +863,7 @@ SELECT [ ALL | DISTINCT [ ON ( <replaceable class="PARAMETER">expression</replac
     [ ORDER BY <replaceable class="parameter">expression</replaceable> [ ASC | DESC | USING <replaceable class="parameter">operator</replaceable> ] [ NULLS { FIRST | LAST } ] [, ...] ]
     [ LIMIT { <replaceable class="PARAMETER">count</replaceable> | ALL } ]
     [ OFFSET <replaceable class="PARAMETER">start</replaceable> ]
-    [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ] [...] ]
+    [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ { NOWAIT | SKIP LOCKED DATA } ] [...] ]
 </synopsis>
     </para>
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 99a431a..17f997c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -3437,7 +3437,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 HTSU_Result
 heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
 				ItemPointer ctid, TransactionId *update_xmax,
-				CommandId cid, LockTupleMode mode, bool nowait)
+				CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy)
 {
 	HTSU_Result result;
 	ItemPointer tid = &(tuple->t_self);
@@ -3512,7 +3512,11 @@ l3:
 		 */
 		if (!have_tuple_lock)
 		{
-			if (nowait)
+			if (wait_policy == LockWaitBlock)
+			{
+				LockTuple(relation, tid, tuple_lock_type);
+			}
+			else if (wait_policy == LockWaitError)
 			{
 				if (!ConditionalLockTuple(relation, tid, tuple_lock_type))
 					ereport(ERROR,
@@ -3520,8 +3524,13 @@ l3:
 					errmsg("could not obtain lock on row in relation \"%s\"",
 						   RelationGetRelationName(relation))));
 			}
-			else
-				LockTuple(relation, tid, tuple_lock_type);
+			else /* wait_policy == LockWaitSkip */
+			{
+				if (!ConditionalLockTuple(relation, tid, tuple_lock_type))
+				{
+					return HeapTupleWouldBlock;
+				}
+			}
 			have_tuple_lock = true;
 		}
 
@@ -3543,7 +3552,11 @@ l3:
 		else if (infomask & HEAP_XMAX_IS_MULTI)
 		{
 			/* wait for multixact to end */
-			if (nowait)
+			if (wait_policy == LockWaitBlock)
+			{
+				MultiXactIdWait((MultiXactId) xwait);
+			}
+			else if (wait_policy == LockWaitError)
 			{
 				if (!ConditionalMultiXactIdWait((MultiXactId) xwait))
 					ereport(ERROR,
@@ -3551,8 +3564,12 @@ l3:
 					errmsg("could not obtain lock on row in relation \"%s\"",
 						   RelationGetRelationName(relation))));
 			}
-			else
-				MultiXactIdWait((MultiXactId) xwait);
+			else /* wait_policy == LockWaitSkip */
+			{
+				if (!ConditionalMultiXactIdWait((MultiXactId) xwait))
+					/* TODO -- cleanup? */
+					return HeapTupleWouldBlock;
+			}
 
 			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
 
@@ -3578,7 +3595,11 @@ l3:
 		else
 		{
 			/* wait for regular transaction to end */
-			if (nowait)
+			if (wait_policy == LockWaitBlock)
+			{
+				XactLockTableWait(xwait);
+			}
+			else if (wait_policy == LockWaitError)
 			{
 				if (!ConditionalXactLockTableWait(xwait))
 					ereport(ERROR,
@@ -3586,8 +3607,12 @@ l3:
 					errmsg("could not obtain lock on row in relation \"%s\"",
 						   RelationGetRelationName(relation))));
 			}
-			else
-				XactLockTableWait(xwait);
+			else /* wait_policy == LockWaitSkip */
+			{
+				/* TODO -- clean up! */
+				if (!ConditionalXactLockTableWait(xwait))
+					return HeapTupleWouldBlock;
+			}
 
 			LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
 
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 2838b66..6f8b44d 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -2594,7 +2594,7 @@ ltrmark:;
 		test = heap_lock_tuple(relation, &tuple, &buffer,
 							   &update_ctid, &update_xmax,
 							   estate->es_output_cid,
-							   LockTupleExclusive, false);
+							   LockTupleExclusive, LockWaitBlock);
 		switch (test)
 		{
 			case HeapTupleSelfUpdated:
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 36dcc8e..06168d1 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -859,6 +859,7 @@ InitPlan(QueryDesc *queryDesc, int eflags)
 		erm->rowmarkId = rc->rowmarkId;
 		erm->markType = rc->markType;
 		erm->noWait = rc->noWait;
+		erm->skipLocked = rc->skipLocked;
 		ItemPointerSetInvalid(&(erm->curCtid));
 		estate->es_rowMarks = lappend(estate->es_rowMarks, erm);
 	}
@@ -1914,7 +1915,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
 			test = heap_lock_tuple(relation, &tuple, &buffer,
 								   &update_ctid, &update_xmax,
 								   estate->es_output_cid,
-								   lockmode, false);
+								   lockmode, LockWaitBlock);
 			/* We now have two pins on the buffer, get rid of one */
 			ReleaseBuffer(buffer);
 
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index d2a33cb..f48301f 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -73,6 +73,7 @@ lnext:
 		ItemPointerData update_ctid;
 		TransactionId update_xmax;
 		LockTupleMode lockmode;
+		LockWaitPolicy wait_policy;
 		HTSU_Result test;
 		HeapTuple	copyTuple;
 
@@ -116,13 +117,24 @@ lnext:
 		else
 			lockmode = LockTupleShared;
 
+		if (erm->noWait)
+			wait_policy = LockWaitError;
+		else if (erm->skipLocked)
+			wait_policy = LockWaitSkip;
+		else
+			wait_policy = LockWaitBlock;
+
 		test = heap_lock_tuple(erm->relation, &tuple, &buffer,
 							   &update_ctid, &update_xmax,
 							   estate->es_output_cid,
-							   lockmode, erm->noWait);
+							   lockmode, wait_policy);
 		ReleaseBuffer(buffer);
 		switch (test)
 		{
+			case HeapTupleWouldBlock:
+				/* couldn't lock tuple in SKIP LOCKED DATA mode */
+				goto lnext;
+
 			case HeapTupleSelfUpdated:
 				/* treat it as deleted; do not process */
 				goto lnext;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 2295195..c356ed2 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2222,6 +2222,7 @@ _equalLockingClause(const LockingClause *a, const LockingClause *b)
 	COMPARE_NODE_FIELD(lockedRels);
 	COMPARE_SCALAR_FIELD(forUpdate);
 	COMPARE_SCALAR_FIELD(noWait);
+	COMPARE_SCALAR_FIELD(skipLocked);
 
 	return true;
 }
@@ -2294,6 +2295,7 @@ _equalRowMarkClause(const RowMarkClause *a, const RowMarkClause *b)
 	COMPARE_SCALAR_FIELD(rti);
 	COMPARE_SCALAR_FIELD(forUpdate);
 	COMPARE_SCALAR_FIELD(noWait);
+	COMPARE_SCALAR_FIELD(skipLocked);
 	COMPARE_SCALAR_FIELD(pushedDown);
 
 	return true;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 829f6d4..ff558d9 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -845,6 +845,7 @@ _outPlanRowMark(StringInfo str, const PlanRowMark *node)
 	WRITE_UINT_FIELD(rowmarkId);
 	WRITE_ENUM_FIELD(markType, RowMarkType);
 	WRITE_BOOL_FIELD(noWait);
+	WRITE_BOOL_FIELD(skipLocked);
 	WRITE_BOOL_FIELD(isParent);
 }
 
@@ -2070,6 +2071,7 @@ _outLockingClause(StringInfo str, const LockingClause *node)
 	WRITE_NODE_FIELD(lockedRels);
 	WRITE_BOOL_FIELD(forUpdate);
 	WRITE_BOOL_FIELD(noWait);
+	WRITE_BOOL_FIELD(skipLocked);
 }
 
 static void
@@ -2248,6 +2250,7 @@ _outRowMarkClause(StringInfo str, const RowMarkClause *node)
 	WRITE_UINT_FIELD(rti);
 	WRITE_BOOL_FIELD(forUpdate);
 	WRITE_BOOL_FIELD(noWait);
+	WRITE_BOOL_FIELD(skipLocked);
 	WRITE_BOOL_FIELD(pushedDown);
 }
 
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index b9258ad..ed96e14 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -303,6 +303,7 @@ _readRowMarkClause(void)
 	READ_UINT_FIELD(rti);
 	READ_BOOL_FIELD(forUpdate);
 	READ_BOOL_FIELD(noWait);
+	READ_BOOL_FIELD(skipLocked);
 	READ_BOOL_FIELD(pushedDown);
 
 	READ_DONE();
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 2e8ea5a..32d77fe 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -1987,6 +1987,7 @@ preprocess_rowmarks(PlannerInfo *root)
 		else
 			newrc->markType = ROW_MARK_SHARE;
 		newrc->noWait = rc->noWait;
+		newrc->skipLocked = rc->skipLocked;
 		newrc->isParent = false;
 
 		prowmarks = lappend(prowmarks, newrc);
@@ -2015,6 +2016,7 @@ preprocess_rowmarks(PlannerInfo *root)
 		else
 			newrc->markType = ROW_MARK_COPY;
 		newrc->noWait = false;	/* doesn't matter */
+		newrc->skipLocked = false;
 		newrc->isParent = false;
 
 		prowmarks = lappend(prowmarks, newrc);
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index be6e93e..4d7ba46 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -2245,6 +2245,7 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc,
 	allrels->lockedRels = NIL;	/* indicates all rels */
 	allrels->forUpdate = lc->forUpdate;
 	allrels->noWait = lc->noWait;
+	allrels->skipLocked = lc->skipLocked;
 
 	if (lockedRels == NIL)
 	{
@@ -2262,12 +2263,12 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc,
 					if (rte->relkind == RELKIND_FOREIGN_TABLE)
 						break;
 					applyLockingClause(qry, i,
-									   lc->forUpdate, lc->noWait, pushedDown);
+									   lc->forUpdate, lc->noWait, lc->skipLocked, pushedDown);
 					rte->requiredPerms |= ACL_SELECT_FOR_UPDATE;
 					break;
 				case RTE_SUBQUERY:
 					applyLockingClause(qry, i,
-									   lc->forUpdate, lc->noWait, pushedDown);
+									   lc->forUpdate, lc->noWait, lc->skipLocked, pushedDown);
 
 					/*
 					 * FOR UPDATE/SHARE of subquery is propagated to all of
@@ -2317,13 +2318,13 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc,
 											 rte->eref->aliasname),
 									  parser_errposition(pstate, thisrel->location)));
 							applyLockingClause(qry, i,
-											   lc->forUpdate, lc->noWait,
+											   lc->forUpdate, lc->noWait, lc->skipLocked,
 											   pushedDown);
 							rte->requiredPerms |= ACL_SELECT_FOR_UPDATE;
 							break;
 						case RTE_SUBQUERY:
 							applyLockingClause(qry, i,
-											   lc->forUpdate, lc->noWait,
+											   lc->forUpdate, lc->noWait, lc->skipLocked,
 											   pushedDown);
 							/* see comment above */
 							transformLockingClause(pstate, rte->subquery,
@@ -2376,7 +2377,7 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc,
  */
 void
 applyLockingClause(Query *qry, Index rtindex,
-				   bool forUpdate, bool noWait, bool pushedDown)
+				   bool forUpdate, bool noWait, bool skipLocked, bool pushedDown)
 {
 	RowMarkClause *rc;
 
@@ -2411,6 +2412,7 @@ applyLockingClause(Query *qry, Index rtindex,
 	rc->rti = rtindex;
 	rc->forUpdate = forUpdate;
 	rc->noWait = noWait;
+	rc->skipLocked = skipLocked;
 	rc->pushedDown = pushedDown;
 	qry->rowMarks = lappend(qry->rowMarks, rc);
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 62fde67..a6276e5 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -101,6 +101,9 @@ typedef struct PrivTarget
 #define CAS_INITIALLY_DEFERRED		0x08
 #define CAS_NOT_VALID				0x10
 
+#define WAIT_MODE_DEFAULT 0
+#define WAIT_MODE_NOWAIT 1
+#define WAIT_MODE_SKIP 2
 
 #define parser_yyerror(msg)  scanner_yyerror(msg, yyscanner)
 #define parser_errposition(pos)  scanner_errposition(pos, yyscanner)
@@ -251,6 +254,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
 %type <boolean>	opt_force opt_or_replace
 				opt_grant_grant_option opt_grant_admin_option
 				opt_nowait opt_if_exists opt_with_data
+%type <ival>	opt_nowait_or_skip
 
 %type <list>	OptRoleList AlterOptRoleList
 %type <defelt>	CreateOptRoleElem AlterOptRoleElem
@@ -528,7 +532,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
 
 	LABEL LANGUAGE LARGE_P LAST_P LC_COLLATE_P LC_CTYPE_P LEADING
 	LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP
-	LOCATION LOCK_P
+	LOCATION LOCK_P LOCKED
 
 	MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -552,7 +556,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType,
 
 	SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES
 	SERIALIZABLE SERVER SESSION SESSION_USER SET SETOF SHARE
-	SHOW SIMILAR SIMPLE SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START
+	SHOW SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START
 	STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING
 	SYMMETRIC SYSID SYSTEM_P
 
@@ -8435,6 +8439,12 @@ opt_nowait:	NOWAIT							{ $$ = TRUE; }
 			| /*EMPTY*/						{ $$ = FALSE; }
 		;
 
+opt_nowait_or_skip:	
+			NOWAIT							{ $$ = WAIT_MODE_NOWAIT; }
+			| SKIP LOCKED DATA_P   			{ $$ = WAIT_MODE_SKIP; }
+			| /*EMPTY*/						{ $$ = WAIT_MODE_DEFAULT; }
+		;
+
 
 /*****************************************************************************
  *
@@ -9029,20 +9039,22 @@ for_locking_items:
 		;
 
 for_locking_item:
-			FOR UPDATE locked_rels_list opt_nowait
+			FOR UPDATE locked_rels_list opt_nowait_or_skip
 				{
 					LockingClause *n = makeNode(LockingClause);
 					n->lockedRels = $3;
 					n->forUpdate = TRUE;
-					n->noWait = $4;
+					n->noWait = ($4 == WAIT_MODE_NOWAIT);
+					n->skipLocked = ($4 == WAIT_MODE_SKIP);
 					$$ = (Node *) n;
 				}
-			| FOR SHARE locked_rels_list opt_nowait
+			| FOR SHARE locked_rels_list opt_nowait_or_skip
 				{
 					LockingClause *n = makeNode(LockingClause);
 					n->lockedRels = $3;
 					n->forUpdate = FALSE;
-					n->noWait = $4;
+					n->noWait = ($4 == WAIT_MODE_NOWAIT);
+					n->skipLocked = ($4 == WAIT_MODE_SKIP);
 					$$ = (Node *) n;
 				}
 		;
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
index 04f9622..e4a5085 100644
--- a/src/backend/rewrite/rewriteHandler.c
+++ b/src/backend/rewrite/rewriteHandler.c
@@ -55,7 +55,7 @@ static void rewriteValuesRTE(RangeTblEntry *rte, Relation target_relation,
 static void rewriteTargetListUD(Query *parsetree, RangeTblEntry *target_rte,
 					Relation target_relation);
 static void markQueryForLocking(Query *qry, Node *jtnode,
-					bool forUpdate, bool noWait, bool pushedDown);
+					bool forUpdate, bool noWait, bool skipLocked, bool pushedDown);
 static List *matchLocks(CmdType event, RuleLock *rulelocks,
 		   int varno, Query *parsetree);
 static Query *fireRIRrules(Query *parsetree, List *activeRIRs,
@@ -1412,7 +1412,7 @@ ApplyRetrieveRule(Query *parsetree,
 	 */
 	if (rc != NULL)
 		markQueryForLocking(rule_action, (Node *) rule_action->jointree,
-							rc->forUpdate, rc->noWait, true);
+							rc->forUpdate, rc->noWait, rc->skipLocked, true);
 
 	return parsetree;
 }
@@ -1430,7 +1430,7 @@ ApplyRetrieveRule(Query *parsetree,
  */
 static void
 markQueryForLocking(Query *qry, Node *jtnode,
-					bool forUpdate, bool noWait, bool pushedDown)
+					bool forUpdate, bool noWait, bool skipLocked, bool pushedDown)
 {
 	if (jtnode == NULL)
 		return;
@@ -1444,16 +1444,16 @@ markQueryForLocking(Query *qry, Node *jtnode,
 			/* ignore foreign tables */
 			if (rte->relkind != RELKIND_FOREIGN_TABLE)
 			{
-				applyLockingClause(qry, rti, forUpdate, noWait, pushedDown);
+				applyLockingClause(qry, rti, forUpdate, noWait, skipLocked, pushedDown);
 				rte->requiredPerms |= ACL_SELECT_FOR_UPDATE;
 			}
 		}
 		else if (rte->rtekind == RTE_SUBQUERY)
 		{
-			applyLockingClause(qry, rti, forUpdate, noWait, pushedDown);
+			applyLockingClause(qry, rti, forUpdate, noWait, skipLocked, pushedDown);
 			/* FOR UPDATE/SHARE of subquery is propagated to subquery's rels */
 			markQueryForLocking(rte->subquery, (Node *) rte->subquery->jointree,
-								forUpdate, noWait, true);
+								forUpdate, noWait, skipLocked, true);
 		}
 		/* other RTE types are unaffected by FOR UPDATE */
 	}
@@ -1463,14 +1463,14 @@ markQueryForLocking(Query *qry, Node *jtnode,
 		ListCell   *l;
 
 		foreach(l, f->fromlist)
-			markQueryForLocking(qry, lfirst(l), forUpdate, noWait, pushedDown);
+			markQueryForLocking(qry, lfirst(l), forUpdate, noWait, skipLocked, pushedDown);
 	}
 	else if (IsA(jtnode, JoinExpr))
 	{
 		JoinExpr   *j = (JoinExpr *) jtnode;
 
-		markQueryForLocking(qry, j->larg, forUpdate, noWait, pushedDown);
-		markQueryForLocking(qry, j->rarg, forUpdate, noWait, pushedDown);
+		markQueryForLocking(qry, j->larg, forUpdate, noWait, skipLocked, pushedDown);
+		markQueryForLocking(qry, j->rarg, forUpdate, noWait, skipLocked, pushedDown);
 	}
 	else
 		elog(ERROR, "unrecognized node type: %d",
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index fa38803..5f513d5 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -35,6 +35,12 @@ typedef enum
 	LockTupleExclusive
 } LockTupleMode;
 
+typedef enum
+{
+	LockWaitBlock,
+	LockWaitError,
+	LockWaitSkip
+} LockWaitPolicy;
 
 /* ----------------
  *		function prototypes for heap access method
@@ -109,7 +115,7 @@ extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
 extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 				Buffer *buffer, ItemPointer ctid,
 				TransactionId *update_xmax, CommandId cid,
-				LockTupleMode mode, bool nowait);
+				LockTupleMode mode, LockWaitPolicy wait_policy);
 extern void heap_inplace_update(Relation relation, HeapTuple tuple);
 extern bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid,
 				  Buffer buf);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 5207102..9768d5c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -423,6 +423,7 @@ typedef struct ExecRowMark
 	Index		rowmarkId;		/* unique identifier for resjunk columns */
 	RowMarkType markType;		/* see enum in nodes/plannodes.h */
 	bool		noWait;			/* NOWAIT option */
+	bool		skipLocked;		/* SKIP LOCKED DATA option */
 	ItemPointerData curCtid;	/* ctid of currently locked tuple, if any */
 } ExecRowMark;
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1d33ceb..cd1b037 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -583,6 +583,7 @@ typedef struct LockingClause
 	List	   *lockedRels;		/* FOR UPDATE or FOR SHARE relations */
 	bool		forUpdate;		/* true = FOR UPDATE, false = FOR SHARE */
 	bool		noWait;			/* NOWAIT option */
+	bool		skipLocked;		/* SKIP LOCKED DATA option */
 } LockingClause;
 
 /*
@@ -878,6 +879,7 @@ typedef struct RowMarkClause
 	Index		rti;			/* range table index of target relation */
 	bool		forUpdate;		/* true = FOR UPDATE, false = FOR SHARE */
 	bool		noWait;			/* NOWAIT option */
+	bool		skipLocked;		/* SKIP LOCKED DATA option */
 	bool		pushedDown;		/* pushed down from higher query level? */
 } RowMarkClause;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 7d90b91..7cc78a2 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -804,6 +804,7 @@ typedef struct PlanRowMark
 	Index		rowmarkId;		/* unique identifier for resjunk columns */
 	RowMarkType markType;		/* see enum above */
 	bool		noWait;			/* NOWAIT option */
+	bool		skipLocked;		/* SKIP LOCKED DATA option */
 	bool		isParent;		/* true if this is a "dummy" parent entry */
 } PlanRowMark;
 
diff --git a/src/include/parser/analyze.h b/src/include/parser/analyze.h
index b8987db..65160f1 100644
--- a/src/include/parser/analyze.h
+++ b/src/include/parser/analyze.h
@@ -31,6 +31,6 @@ extern bool analyze_requires_snapshot(Node *parseTree);
 
 extern void CheckSelectLocking(Query *qry);
 extern void applyLockingClause(Query *qry, Index rtindex,
-				   bool forUpdate, bool noWait, bool pushedDown);
+				   bool forUpdate, bool noWait, bool skipLocked, bool pushedDown);
 
 #endif   /* ANALYZE_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 3d7de06..d37a615 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -227,6 +227,7 @@ PG_KEYWORD("localtime", LOCALTIME, RESERVED_KEYWORD)
 PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD)
 PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
+PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)
 PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD)
@@ -336,6 +337,7 @@ PG_KEYWORD("share", SHARE, UNRESERVED_KEYWORD)
 PG_KEYWORD("show", SHOW, UNRESERVED_KEYWORD)
 PG_KEYWORD("similar", SIMILAR, TYPE_FUNC_NAME_KEYWORD)
 PG_KEYWORD("simple", SIMPLE, UNRESERVED_KEYWORD)
+PG_KEYWORD("skip", SKIP, UNRESERVED_KEYWORD)
 PG_KEYWORD("smallint", SMALLINT, COL_NAME_KEYWORD)
 PG_KEYWORD("snapshot", SNAPSHOT, UNRESERVED_KEYWORD)
 PG_KEYWORD("some", SOME, RESERVED_KEYWORD)
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 900272e..1073f72 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -74,7 +74,8 @@ typedef enum
 	HeapTupleInvisible,
 	HeapTupleSelfUpdated,
 	HeapTupleUpdated,
-	HeapTupleBeingUpdated
+	HeapTupleBeingUpdated,
+	HeapTupleWouldBlock	/* can be returned by heap_tuple_lock */
 } HTSU_Result;
 
 #endif   /* SNAPSHOT_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to