On 16 January 2012 21:30, Josh Berkus <j...@agliodbs.com> wrote: > Useful, yes. Harder than it looks, probably. I tried to mock up a > version of this years ago for a project where I needed it, and ran into > all kinds of race conditions.
Can you remember any details about those race conditions? > Anyway, if it could be made to work, this is extremely useful for any > application which needs queueing behavior (with is a large plurality, if > not a majority, of applications). Ok, based on this feedback I decided to push further and try implementating this. See POC/WIP patch attached. It seems to work for simple examples but I haven't yet tried to break it or see how it interacts with more complicated queries or high concurrency levels. It probably contains at least a few rookie mistakes! Any feedback gratefully received. The approach is described in my original email. Short version: heap_lock_tuple now takes an enum called wait_policy instead of a boolean called nowait, with the following values: LockWaitBlock: wait for lock (like nowait = false before), LockWaitError: error if not immediately lockable (like nowait = true before) LockWaitSkip: give up and return HeapTupleWouldBlock if not immediately lockable (this is a new policy) The rest of the patch is about getting the appropriate value down to that function call, following the example of the existing nowait support, and skipping rows if you said SKIP LOCKED DATA and you got HeapTupleWouldBlock. Compared to one very popular commercial database's implementation, I think this is a little bit friendlier for the user who wants to distribute work. Let's say you want to lock one row without lock contention, which this patch allows with FETCH FIRST 1 ROW ONLY FOR UPDATE SKIP LOCKED DATA in an SQL query. In that other system, the mechanism for limiting the number of rows fetched is done in the WHERE clause, and therefore the N rows are counted *before* checking if the lock can be obtained, so users sometimes have to resort to stored procedures so they can control the FETCH from a cursor imperatively. In another popular commercial database from Redmond, you can ask for the top (first) N rows while using the equivalent of SKIP LOCKED DATA and it has the same effect as this patch as far as I can tell, and another large blue system is the same. As discussed in another branch of this thread, you can probably get the same effect with transactional advisory locks. But I personally like row skipping better as an explicit feature because: (1) I think there might be an order-of-evaluation problem with a WHERE clause containing both lock testing and row filtering expressions (ie it is undefined right?) which you might need subselects to work around (ie to be sure to avoid false positive locks, not sure about this). (2) The advisory technique requires you to introduce an integer identifier if you didn't already have one and then lock that despite having already said which rows you want to try to lock in standard row filtering expressions. (3) The advisory technique requires all users of the table to participate in the advisory lock protocol even if they don't want to use the option to skip lock. (4) It complements NOWAIT (which could also have been done with transactional advisory locks, with a function like try_lock_or_fail). (5) I like the idea of directly porting applications from other databases with this feature (that is what led me here). I can also imagine some other arguments against SKIP LOCKED DATA: "the type of applications that would use this technique generate too much dead tuple churn for PostgreSQL anyway" (for example, compared to the update-tuples-in-place systems like DB2 z/OS edition where SKIP LOCKED DATA is used to distribute work efficiently), and "databases shouldn't be used as queues anyway, for that we have $X", and "skipping rows provides an inconsistent view of the data" (ie a result set that never strictly existed). Here are some examples of previous requests or discussion of this feature: http://archives.postgresql.org/pgsql-general/2008-07/msg00442.php http://archives.postgresql.org/pgsql-bugs/2003-12/msg00154.php http://archives.postgresql.org/pgsql-general/2002-07/msg00744.php http://blog.hydrobiont.com/2011/06/select-for-update-skip-locked-in.html Thanks for reading! Thomas
diff --git a/doc/src/sgml/ref/select.sgml b/doc/src/sgml/ref/select.sgml index 01c0104..58adcae 100644 --- a/doc/src/sgml/ref/select.sgml +++ b/doc/src/sgml/ref/select.sgml @@ -45,7 +45,7 @@ SELECT [ ALL | DISTINCT [ ON ( <replaceable class="parameter">expression</replac [ LIMIT { <replaceable class="parameter">count</replaceable> | ALL } ] [ OFFSET <replaceable class="parameter">start</replaceable> [ ROW | ROWS ] ] [ FETCH { FIRST | NEXT } [ <replaceable class="parameter">count</replaceable> ] { ROW | ROWS } ONLY ] - [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ] [...] ] + [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ { NOWAIT | SKIP LOCKED DATA } ] [...] ] <phrase>where <replaceable class="parameter">from_item</replaceable> can be one of:</phrase> @@ -1146,14 +1146,14 @@ FETCH { FIRST | NEXT } [ <replaceable class="parameter">count</replaceable> ] { <para> The <literal>FOR UPDATE</literal> clause has this form: <synopsis> -FOR UPDATE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ] +FOR UPDATE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ { NOWAIT | SKIP LOCKED DATA } ] </synopsis> </para> <para> The closely related <literal>FOR SHARE</literal> clause has this form: <synopsis> -FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ] +FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ { NOWAIT | SKIP LOCKED DATA } ] </synopsis> </para> @@ -1187,9 +1187,9 @@ FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] <para> To prevent the operation from waiting for other transactions to commit, - use the <literal>NOWAIT</> option. With <literal>NOWAIT</>, the statement + use either the <literal>NOWAIT</> or <literal>SKIP LOCKED DATA</literal> option. With <literal>NOWAIT</>, the statement reports an error, rather than waiting, if a selected row - cannot be locked immediately. Note that <literal>NOWAIT</> applies only + cannot be locked immediately. With <literal>SKIP LOCKED DATA</literal>, any selected rows that cannot be immediately locked are skipped. Skipping locked rows provides an inconsistent view of the data, so this is not suitable for general purpose work, but can be used to avoid lock contention with multiple consumers accessing a queue-like table. Note that <literal>NOWAIT</> and <literal>SKIP LOCKED DATA</literal> apply only to the row-level lock(s) — the required <literal>ROW SHARE</literal> table-level lock is still taken in the ordinary way (see <xref linkend="mvcc">). You can use @@ -1222,7 +1222,7 @@ FOR SHARE [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] implicitly affected) by both <literal>FOR UPDATE</literal> and <literal>FOR SHARE</literal> clauses, then it is processed as <literal>FOR UPDATE</literal>. Similarly, a table is processed - as <literal>NOWAIT</> if that is specified in any of the clauses + as <literal>NOWAIT</> or <literal>SKIP LOCKED DATA</literal> if that is specified in any of the clauses affecting it. </para> @@ -1685,7 +1685,7 @@ SELECT distributors.* WHERE distributors.name = 'Westward'; standard allows it only as an option of <command>DECLARE CURSOR</>. <productname>PostgreSQL</productname> allows it in any <command>SELECT</> query as well as in sub-<command>SELECT</>s, but this is an extension. - The <literal>FOR SHARE</> variant, and the <literal>NOWAIT</> option, + The <literal>FOR SHARE</> variant, and the <literal>NOWAIT</> and <literal>SKIP LOCKED DATA</literal> options, do not appear in the standard. </para> </refsect2> diff --git a/doc/src/sgml/sql.sgml b/doc/src/sgml/sql.sgml index c19e823..57fd085 100644 --- a/doc/src/sgml/sql.sgml +++ b/doc/src/sgml/sql.sgml @@ -863,7 +863,7 @@ SELECT [ ALL | DISTINCT [ ON ( <replaceable class="PARAMETER">expression</replac [ ORDER BY <replaceable class="parameter">expression</replaceable> [ ASC | DESC | USING <replaceable class="parameter">operator</replaceable> ] [ NULLS { FIRST | LAST } ] [, ...] ] [ LIMIT { <replaceable class="PARAMETER">count</replaceable> | ALL } ] [ OFFSET <replaceable class="PARAMETER">start</replaceable> ] - [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ NOWAIT ] [...] ] + [ FOR { UPDATE | SHARE } [ OF <replaceable class="parameter">table_name</replaceable> [, ...] ] [ { NOWAIT | SKIP LOCKED DATA } ] [...] ] </synopsis> </para> diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 99a431a..17f997c 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -3437,7 +3437,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup) HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer, ItemPointer ctid, TransactionId *update_xmax, - CommandId cid, LockTupleMode mode, bool nowait) + CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy) { HTSU_Result result; ItemPointer tid = &(tuple->t_self); @@ -3512,7 +3512,11 @@ l3: */ if (!have_tuple_lock) { - if (nowait) + if (wait_policy == LockWaitBlock) + { + LockTuple(relation, tid, tuple_lock_type); + } + else if (wait_policy == LockWaitError) { if (!ConditionalLockTuple(relation, tid, tuple_lock_type)) ereport(ERROR, @@ -3520,8 +3524,13 @@ l3: errmsg("could not obtain lock on row in relation \"%s\"", RelationGetRelationName(relation)))); } - else - LockTuple(relation, tid, tuple_lock_type); + else /* wait_policy == LockWaitSkip */ + { + if (!ConditionalLockTuple(relation, tid, tuple_lock_type)) + { + return HeapTupleWouldBlock; + } + } have_tuple_lock = true; } @@ -3543,7 +3552,11 @@ l3: else if (infomask & HEAP_XMAX_IS_MULTI) { /* wait for multixact to end */ - if (nowait) + if (wait_policy == LockWaitBlock) + { + MultiXactIdWait((MultiXactId) xwait); + } + else if (wait_policy == LockWaitError) { if (!ConditionalMultiXactIdWait((MultiXactId) xwait)) ereport(ERROR, @@ -3551,8 +3564,12 @@ l3: errmsg("could not obtain lock on row in relation \"%s\"", RelationGetRelationName(relation)))); } - else - MultiXactIdWait((MultiXactId) xwait); + else /* wait_policy == LockWaitSkip */ + { + if (!ConditionalMultiXactIdWait((MultiXactId) xwait)) + /* TODO -- cleanup? */ + return HeapTupleWouldBlock; + } LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); @@ -3578,7 +3595,11 @@ l3: else { /* wait for regular transaction to end */ - if (nowait) + if (wait_policy == LockWaitBlock) + { + XactLockTableWait(xwait); + } + else if (wait_policy == LockWaitError) { if (!ConditionalXactLockTableWait(xwait)) ereport(ERROR, @@ -3586,8 +3607,12 @@ l3: errmsg("could not obtain lock on row in relation \"%s\"", RelationGetRelationName(relation)))); } - else - XactLockTableWait(xwait); + else /* wait_policy == LockWaitSkip */ + { + /* TODO -- clean up! */ + if (!ConditionalXactLockTableWait(xwait)) + return HeapTupleWouldBlock; + } LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 2838b66..6f8b44d 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -2594,7 +2594,7 @@ ltrmark:; test = heap_lock_tuple(relation, &tuple, &buffer, &update_ctid, &update_xmax, estate->es_output_cid, - LockTupleExclusive, false); + LockTupleExclusive, LockWaitBlock); switch (test) { case HeapTupleSelfUpdated: diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 36dcc8e..06168d1 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -859,6 +859,7 @@ InitPlan(QueryDesc *queryDesc, int eflags) erm->rowmarkId = rc->rowmarkId; erm->markType = rc->markType; erm->noWait = rc->noWait; + erm->skipLocked = rc->skipLocked; ItemPointerSetInvalid(&(erm->curCtid)); estate->es_rowMarks = lappend(estate->es_rowMarks, erm); } @@ -1914,7 +1915,7 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, test = heap_lock_tuple(relation, &tuple, &buffer, &update_ctid, &update_xmax, estate->es_output_cid, - lockmode, false); + lockmode, LockWaitBlock); /* We now have two pins on the buffer, get rid of one */ ReleaseBuffer(buffer); diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c index d2a33cb..f48301f 100644 --- a/src/backend/executor/nodeLockRows.c +++ b/src/backend/executor/nodeLockRows.c @@ -73,6 +73,7 @@ lnext: ItemPointerData update_ctid; TransactionId update_xmax; LockTupleMode lockmode; + LockWaitPolicy wait_policy; HTSU_Result test; HeapTuple copyTuple; @@ -116,13 +117,24 @@ lnext: else lockmode = LockTupleShared; + if (erm->noWait) + wait_policy = LockWaitError; + else if (erm->skipLocked) + wait_policy = LockWaitSkip; + else + wait_policy = LockWaitBlock; + test = heap_lock_tuple(erm->relation, &tuple, &buffer, &update_ctid, &update_xmax, estate->es_output_cid, - lockmode, erm->noWait); + lockmode, wait_policy); ReleaseBuffer(buffer); switch (test) { + case HeapTupleWouldBlock: + /* couldn't lock tuple in SKIP LOCKED DATA mode */ + goto lnext; + case HeapTupleSelfUpdated: /* treat it as deleted; do not process */ goto lnext; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 2295195..c356ed2 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2222,6 +2222,7 @@ _equalLockingClause(const LockingClause *a, const LockingClause *b) COMPARE_NODE_FIELD(lockedRels); COMPARE_SCALAR_FIELD(forUpdate); COMPARE_SCALAR_FIELD(noWait); + COMPARE_SCALAR_FIELD(skipLocked); return true; } @@ -2294,6 +2295,7 @@ _equalRowMarkClause(const RowMarkClause *a, const RowMarkClause *b) COMPARE_SCALAR_FIELD(rti); COMPARE_SCALAR_FIELD(forUpdate); COMPARE_SCALAR_FIELD(noWait); + COMPARE_SCALAR_FIELD(skipLocked); COMPARE_SCALAR_FIELD(pushedDown); return true; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 829f6d4..ff558d9 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -845,6 +845,7 @@ _outPlanRowMark(StringInfo str, const PlanRowMark *node) WRITE_UINT_FIELD(rowmarkId); WRITE_ENUM_FIELD(markType, RowMarkType); WRITE_BOOL_FIELD(noWait); + WRITE_BOOL_FIELD(skipLocked); WRITE_BOOL_FIELD(isParent); } @@ -2070,6 +2071,7 @@ _outLockingClause(StringInfo str, const LockingClause *node) WRITE_NODE_FIELD(lockedRels); WRITE_BOOL_FIELD(forUpdate); WRITE_BOOL_FIELD(noWait); + WRITE_BOOL_FIELD(skipLocked); } static void @@ -2248,6 +2250,7 @@ _outRowMarkClause(StringInfo str, const RowMarkClause *node) WRITE_UINT_FIELD(rti); WRITE_BOOL_FIELD(forUpdate); WRITE_BOOL_FIELD(noWait); + WRITE_BOOL_FIELD(skipLocked); WRITE_BOOL_FIELD(pushedDown); } diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index b9258ad..ed96e14 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -303,6 +303,7 @@ _readRowMarkClause(void) READ_UINT_FIELD(rti); READ_BOOL_FIELD(forUpdate); READ_BOOL_FIELD(noWait); + READ_BOOL_FIELD(skipLocked); READ_BOOL_FIELD(pushedDown); READ_DONE(); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 2e8ea5a..32d77fe 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -1987,6 +1987,7 @@ preprocess_rowmarks(PlannerInfo *root) else newrc->markType = ROW_MARK_SHARE; newrc->noWait = rc->noWait; + newrc->skipLocked = rc->skipLocked; newrc->isParent = false; prowmarks = lappend(prowmarks, newrc); @@ -2015,6 +2016,7 @@ preprocess_rowmarks(PlannerInfo *root) else newrc->markType = ROW_MARK_COPY; newrc->noWait = false; /* doesn't matter */ + newrc->skipLocked = false; newrc->isParent = false; prowmarks = lappend(prowmarks, newrc); diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index be6e93e..4d7ba46 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -2245,6 +2245,7 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc, allrels->lockedRels = NIL; /* indicates all rels */ allrels->forUpdate = lc->forUpdate; allrels->noWait = lc->noWait; + allrels->skipLocked = lc->skipLocked; if (lockedRels == NIL) { @@ -2262,12 +2263,12 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc, if (rte->relkind == RELKIND_FOREIGN_TABLE) break; applyLockingClause(qry, i, - lc->forUpdate, lc->noWait, pushedDown); + lc->forUpdate, lc->noWait, lc->skipLocked, pushedDown); rte->requiredPerms |= ACL_SELECT_FOR_UPDATE; break; case RTE_SUBQUERY: applyLockingClause(qry, i, - lc->forUpdate, lc->noWait, pushedDown); + lc->forUpdate, lc->noWait, lc->skipLocked, pushedDown); /* * FOR UPDATE/SHARE of subquery is propagated to all of @@ -2317,13 +2318,13 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc, rte->eref->aliasname), parser_errposition(pstate, thisrel->location))); applyLockingClause(qry, i, - lc->forUpdate, lc->noWait, + lc->forUpdate, lc->noWait, lc->skipLocked, pushedDown); rte->requiredPerms |= ACL_SELECT_FOR_UPDATE; break; case RTE_SUBQUERY: applyLockingClause(qry, i, - lc->forUpdate, lc->noWait, + lc->forUpdate, lc->noWait, lc->skipLocked, pushedDown); /* see comment above */ transformLockingClause(pstate, rte->subquery, @@ -2376,7 +2377,7 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc, */ void applyLockingClause(Query *qry, Index rtindex, - bool forUpdate, bool noWait, bool pushedDown) + bool forUpdate, bool noWait, bool skipLocked, bool pushedDown) { RowMarkClause *rc; @@ -2411,6 +2412,7 @@ applyLockingClause(Query *qry, Index rtindex, rc->rti = rtindex; rc->forUpdate = forUpdate; rc->noWait = noWait; + rc->skipLocked = skipLocked; rc->pushedDown = pushedDown; qry->rowMarks = lappend(qry->rowMarks, rc); } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 62fde67..a6276e5 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -101,6 +101,9 @@ typedef struct PrivTarget #define CAS_INITIALLY_DEFERRED 0x08 #define CAS_NOT_VALID 0x10 +#define WAIT_MODE_DEFAULT 0 +#define WAIT_MODE_NOWAIT 1 +#define WAIT_MODE_SKIP 2 #define parser_yyerror(msg) scanner_yyerror(msg, yyscanner) #define parser_errposition(pos) scanner_errposition(pos, yyscanner) @@ -251,6 +254,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType, %type <boolean> opt_force opt_or_replace opt_grant_grant_option opt_grant_admin_option opt_nowait opt_if_exists opt_with_data +%type <ival> opt_nowait_or_skip %type <list> OptRoleList AlterOptRoleList %type <defelt> CreateOptRoleElem AlterOptRoleElem @@ -528,7 +532,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType, LABEL LANGUAGE LARGE_P LAST_P LC_COLLATE_P LC_CTYPE_P LEADING LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP - LOCATION LOCK_P + LOCATION LOCK_P LOCKED MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -552,7 +556,7 @@ static void processCASbits(int cas_bits, int location, const char *constrType, SAVEPOINT SCHEMA SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETOF SHARE - SHOW SIMILAR SIMPLE SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START + SHOW SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME STABLE STANDALONE_P START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P STRIP_P SUBSTRING SYMMETRIC SYSID SYSTEM_P @@ -8435,6 +8439,12 @@ opt_nowait: NOWAIT { $$ = TRUE; } | /*EMPTY*/ { $$ = FALSE; } ; +opt_nowait_or_skip: + NOWAIT { $$ = WAIT_MODE_NOWAIT; } + | SKIP LOCKED DATA_P { $$ = WAIT_MODE_SKIP; } + | /*EMPTY*/ { $$ = WAIT_MODE_DEFAULT; } + ; + /***************************************************************************** * @@ -9029,20 +9039,22 @@ for_locking_items: ; for_locking_item: - FOR UPDATE locked_rels_list opt_nowait + FOR UPDATE locked_rels_list opt_nowait_or_skip { LockingClause *n = makeNode(LockingClause); n->lockedRels = $3; n->forUpdate = TRUE; - n->noWait = $4; + n->noWait = ($4 == WAIT_MODE_NOWAIT); + n->skipLocked = ($4 == WAIT_MODE_SKIP); $$ = (Node *) n; } - | FOR SHARE locked_rels_list opt_nowait + | FOR SHARE locked_rels_list opt_nowait_or_skip { LockingClause *n = makeNode(LockingClause); n->lockedRels = $3; n->forUpdate = FALSE; - n->noWait = $4; + n->noWait = ($4 == WAIT_MODE_NOWAIT); + n->skipLocked = ($4 == WAIT_MODE_SKIP); $$ = (Node *) n; } ; diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 04f9622..e4a5085 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -55,7 +55,7 @@ static void rewriteValuesRTE(RangeTblEntry *rte, Relation target_relation, static void rewriteTargetListUD(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation); static void markQueryForLocking(Query *qry, Node *jtnode, - bool forUpdate, bool noWait, bool pushedDown); + bool forUpdate, bool noWait, bool skipLocked, bool pushedDown); static List *matchLocks(CmdType event, RuleLock *rulelocks, int varno, Query *parsetree); static Query *fireRIRrules(Query *parsetree, List *activeRIRs, @@ -1412,7 +1412,7 @@ ApplyRetrieveRule(Query *parsetree, */ if (rc != NULL) markQueryForLocking(rule_action, (Node *) rule_action->jointree, - rc->forUpdate, rc->noWait, true); + rc->forUpdate, rc->noWait, rc->skipLocked, true); return parsetree; } @@ -1430,7 +1430,7 @@ ApplyRetrieveRule(Query *parsetree, */ static void markQueryForLocking(Query *qry, Node *jtnode, - bool forUpdate, bool noWait, bool pushedDown) + bool forUpdate, bool noWait, bool skipLocked, bool pushedDown) { if (jtnode == NULL) return; @@ -1444,16 +1444,16 @@ markQueryForLocking(Query *qry, Node *jtnode, /* ignore foreign tables */ if (rte->relkind != RELKIND_FOREIGN_TABLE) { - applyLockingClause(qry, rti, forUpdate, noWait, pushedDown); + applyLockingClause(qry, rti, forUpdate, noWait, skipLocked, pushedDown); rte->requiredPerms |= ACL_SELECT_FOR_UPDATE; } } else if (rte->rtekind == RTE_SUBQUERY) { - applyLockingClause(qry, rti, forUpdate, noWait, pushedDown); + applyLockingClause(qry, rti, forUpdate, noWait, skipLocked, pushedDown); /* FOR UPDATE/SHARE of subquery is propagated to subquery's rels */ markQueryForLocking(rte->subquery, (Node *) rte->subquery->jointree, - forUpdate, noWait, true); + forUpdate, noWait, skipLocked, true); } /* other RTE types are unaffected by FOR UPDATE */ } @@ -1463,14 +1463,14 @@ markQueryForLocking(Query *qry, Node *jtnode, ListCell *l; foreach(l, f->fromlist) - markQueryForLocking(qry, lfirst(l), forUpdate, noWait, pushedDown); + markQueryForLocking(qry, lfirst(l), forUpdate, noWait, skipLocked, pushedDown); } else if (IsA(jtnode, JoinExpr)) { JoinExpr *j = (JoinExpr *) jtnode; - markQueryForLocking(qry, j->larg, forUpdate, noWait, pushedDown); - markQueryForLocking(qry, j->rarg, forUpdate, noWait, pushedDown); + markQueryForLocking(qry, j->larg, forUpdate, noWait, skipLocked, pushedDown); + markQueryForLocking(qry, j->rarg, forUpdate, noWait, skipLocked, pushedDown); } else elog(ERROR, "unrecognized node type: %d", diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index fa38803..5f513d5 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -35,6 +35,12 @@ typedef enum LockTupleExclusive } LockTupleMode; +typedef enum +{ + LockWaitBlock, + LockWaitError, + LockWaitSkip +} LockWaitPolicy; /* ---------------- * function prototypes for heap access method @@ -109,7 +115,7 @@ extern HTSU_Result heap_update(Relation relation, ItemPointer otid, extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer, ItemPointer ctid, TransactionId *update_xmax, CommandId cid, - LockTupleMode mode, bool nowait); + LockTupleMode mode, LockWaitPolicy wait_policy); extern void heap_inplace_update(Relation relation, HeapTuple tuple); extern bool heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid, Buffer buf); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 5207102..9768d5c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -423,6 +423,7 @@ typedef struct ExecRowMark Index rowmarkId; /* unique identifier for resjunk columns */ RowMarkType markType; /* see enum in nodes/plannodes.h */ bool noWait; /* NOWAIT option */ + bool skipLocked; /* SKIP LOCKED DATA option */ ItemPointerData curCtid; /* ctid of currently locked tuple, if any */ } ExecRowMark; diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 1d33ceb..cd1b037 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -583,6 +583,7 @@ typedef struct LockingClause List *lockedRels; /* FOR UPDATE or FOR SHARE relations */ bool forUpdate; /* true = FOR UPDATE, false = FOR SHARE */ bool noWait; /* NOWAIT option */ + bool skipLocked; /* SKIP LOCKED DATA option */ } LockingClause; /* @@ -878,6 +879,7 @@ typedef struct RowMarkClause Index rti; /* range table index of target relation */ bool forUpdate; /* true = FOR UPDATE, false = FOR SHARE */ bool noWait; /* NOWAIT option */ + bool skipLocked; /* SKIP LOCKED DATA option */ bool pushedDown; /* pushed down from higher query level? */ } RowMarkClause; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 7d90b91..7cc78a2 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -804,6 +804,7 @@ typedef struct PlanRowMark Index rowmarkId; /* unique identifier for resjunk columns */ RowMarkType markType; /* see enum above */ bool noWait; /* NOWAIT option */ + bool skipLocked; /* SKIP LOCKED DATA option */ bool isParent; /* true if this is a "dummy" parent entry */ } PlanRowMark; diff --git a/src/include/parser/analyze.h b/src/include/parser/analyze.h index b8987db..65160f1 100644 --- a/src/include/parser/analyze.h +++ b/src/include/parser/analyze.h @@ -31,6 +31,6 @@ extern bool analyze_requires_snapshot(Node *parseTree); extern void CheckSelectLocking(Query *qry); extern void applyLockingClause(Query *qry, Index rtindex, - bool forUpdate, bool noWait, bool pushedDown); + bool forUpdate, bool noWait, bool skipLocked, bool pushedDown); #endif /* ANALYZE_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 3d7de06..d37a615 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -227,6 +227,7 @@ PG_KEYWORD("localtime", LOCALTIME, RESERVED_KEYWORD) PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD) PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD) PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) +PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD) PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD) @@ -336,6 +337,7 @@ PG_KEYWORD("share", SHARE, UNRESERVED_KEYWORD) PG_KEYWORD("show", SHOW, UNRESERVED_KEYWORD) PG_KEYWORD("similar", SIMILAR, TYPE_FUNC_NAME_KEYWORD) PG_KEYWORD("simple", SIMPLE, UNRESERVED_KEYWORD) +PG_KEYWORD("skip", SKIP, UNRESERVED_KEYWORD) PG_KEYWORD("smallint", SMALLINT, COL_NAME_KEYWORD) PG_KEYWORD("snapshot", SNAPSHOT, UNRESERVED_KEYWORD) PG_KEYWORD("some", SOME, RESERVED_KEYWORD) diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 900272e..1073f72 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -74,7 +74,8 @@ typedef enum HeapTupleInvisible, HeapTupleSelfUpdated, HeapTupleUpdated, - HeapTupleBeingUpdated + HeapTupleBeingUpdated, + HeapTupleWouldBlock /* can be returned by heap_tuple_lock */ } HTSU_Result; #endif /* SNAPSHOT_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers