On Wed, Jun 08, 2011 at 09:17:04PM -0500, Kevin Grittner wrote:
> A patch is attached which just covers the predicate lock acquisition,
> where a snapshot is available without too much pain.  There are two
> functions which acquire predicate locks where a snapshot was not
> readily available: _bt_search() and _bt_get_endpoint().  Not only was
> it not clear how to get a snapshot in, it was not entirely clear from
> reading the code that we need to acquire predicate locks here.  Now,
> I suspect that we probably do, because I spent many long hours
> stepping through gdb to pick the spots where they are, but that was
> about a year ago and my memory of the details has faded.

For _bt_search(), the lock calls should move to _bt_first() where the
ScanDesc is available. This also keeps us from trying to take locks
during _bt_pagedel(), which is only called during vacuum and recovery.

The call in _bt_get_endpoint() seems unnecessary, because after it
returns, _bt_endpoint() takes the same lock. The only other callers of
_bt_get_endpoint() are _bt_pagedel() and _bt_insert_parent(), neither
of which should take predicate locks.

I've updated the patch, attached.

Dan

-- 
Dan R. K. Ports              MIT CSAIL                http://drkp.net/
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 6ac25af..bf75ace 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -274,7 +274,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
 			else
 				valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
 
-			CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer);
+			CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, buffer, snapshot);
 
 			if (valid)
 				scan->rs_vistuples[ntup++] = lineoff;
@@ -469,7 +469,7 @@ heapgettup(HeapScanDesc scan,
 													 snapshot,
 													 scan->rs_cbuf);
 
-				CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf);
+				CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, scan->rs_cbuf, snapshot);
 
 				if (valid && key != NULL)
 					HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
@@ -478,7 +478,7 @@ heapgettup(HeapScanDesc scan,
 				if (valid)
 				{
 					if (!scan->rs_relpredicatelocked)
-						PredicateLockTuple(scan->rs_rd, tuple);
+						PredicateLockTuple(scan->rs_rd, tuple, snapshot);
 					LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK);
 					return;
 				}
@@ -747,7 +747,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 				if (valid)
 				{
 					if (!scan->rs_relpredicatelocked)
-						PredicateLockTuple(scan->rs_rd, tuple);
+						PredicateLockTuple(scan->rs_rd, tuple, scan->rs_snapshot);
 					scan->rs_cindex = lineindex;
 					return;
 				}
@@ -755,7 +755,7 @@ heapgettup_pagemode(HeapScanDesc scan,
 			else
 			{
 				if (!scan->rs_relpredicatelocked)
-					PredicateLockTuple(scan->rs_rd, tuple);
+					PredicateLockTuple(scan->rs_rd, tuple, scan->rs_snapshot);
 				scan->rs_cindex = lineindex;
 				return;
 			}
@@ -1470,9 +1470,9 @@ heap_fetch(Relation relation,
 	valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
 
 	if (valid)
-		PredicateLockTuple(relation, tuple);
+		PredicateLockTuple(relation, tuple, snapshot);
 
-	CheckForSerializableConflictOut(valid, relation, tuple, buffer);
+	CheckForSerializableConflictOut(valid, relation, tuple, buffer, snapshot);
 
 	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
@@ -1588,11 +1588,11 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 
 		/* If it's visible per the snapshot, we must return it */
 		valid = HeapTupleSatisfiesVisibility(&heapTuple, snapshot, buffer);
-		CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer);
+		CheckForSerializableConflictOut(valid, relation, &heapTuple, buffer, snapshot);
 		if (valid)
 		{
 			ItemPointerSetOffsetNumber(tid, offnum);
-			PredicateLockTuple(relation, &heapTuple);
+			PredicateLockTuple(relation, &heapTuple, snapshot);
 			if (all_dead)
 				*all_dead = false;
 			return true;
@@ -1750,7 +1750,7 @@ heap_get_latest_tid(Relation relation,
 		 * result candidate.
 		 */
 		valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
-		CheckForSerializableConflictOut(valid, relation, &tp, buffer);
+		CheckForSerializableConflictOut(valid, relation, &tp, buffer, snapshot);
 		if (valid)
 			*tid = ctid;
 
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 27c37d6..294ab45 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -126,7 +126,7 @@ do { \
 } while(0)
 
 static IndexScanDesc index_beginscan_internal(Relation indexRelation,
-						 int nkeys, int norderbys);
+						 int nkeys, int norderbys, const Snapshot snapshot);
 
 
 /* ----------------------------------------------------------------
@@ -234,7 +234,7 @@ index_beginscan(Relation heapRelation,
 {
 	IndexScanDesc scan;
 
-	scan = index_beginscan_internal(indexRelation, nkeys, norderbys);
+	scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot);
 
 	/*
 	 * Save additional parameters into the scandesc.  Everything else was set
@@ -259,7 +259,7 @@ index_beginscan_bitmap(Relation indexRelation,
 {
 	IndexScanDesc scan;
 
-	scan = index_beginscan_internal(indexRelation, nkeys, 0);
+	scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot);
 
 	/*
 	 * Save additional parameters into the scandesc.  Everything else was set
@@ -275,7 +275,7 @@ index_beginscan_bitmap(Relation indexRelation,
  */
 static IndexScanDesc
 index_beginscan_internal(Relation indexRelation,
-						 int nkeys, int norderbys)
+						 int nkeys, int norderbys, const Snapshot snapshot)
 {
 	IndexScanDesc scan;
 	FmgrInfo   *procedure;
@@ -284,7 +284,7 @@ index_beginscan_internal(Relation indexRelation,
 	GET_REL_PROCEDURE(ambeginscan);
 
 	if (!(indexRelation->rd_am->ampredlocks))
-		PredicateLockRelation(indexRelation);
+		PredicateLockRelation(indexRelation, snapshot);
 
 	/*
 	 * We hold a reference count to the relcache entry throughout the scan.
@@ -602,7 +602,8 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
 												 scan->xs_cbuf);
 
 			CheckForSerializableConflictOut(valid, scan->heapRelation,
-											heapTuple, scan->xs_cbuf);
+											heapTuple, scan->xs_cbuf,
+											scan->xs_snapshot);
 
 			if (valid)
 			{
@@ -624,7 +625,7 @@ index_getnext(IndexScanDesc scan, ScanDirection direction)
 				else
 					scan->xs_next_hot = InvalidOffsetNumber;
 
-				PredicateLockTuple(scan->heapRelation, heapTuple);
+				PredicateLockTuple(scan->heapRelation, heapTuple, scan->xs_snapshot);
 
 				LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
 
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 2ce2bc2..8d2beec 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -64,10 +64,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
 
 	/* If index is empty and access = BT_READ, no root page is created. */
 	if (!BufferIsValid(*bufP))
-	{
-		PredicateLockRelation(rel);		/* Nothing finer to lock exists. */
 		return (BTStack) NULL;
-	}
 
 	/* Loop iterates once per level descended in the tree */
 	for (;;)
@@ -92,11 +89,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
 		page = BufferGetPage(*bufP);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 		if (P_ISLEAF(opaque))
-		{
-			if (access == BT_READ)
-				PredicateLockPage(rel, BufferGetBlockNumber(*bufP));
 			break;
-		}
 
 		/*
 		 * Find the appropriate item on the internal page, and get the child
@@ -855,9 +848,16 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 
 	if (!BufferIsValid(buf))
 	{
-		/* Only get here if index is completely empty */
+		/*
+		 * Only get here if index is completely empty.
+		 * Lock relation because nothing finer to lock exists.
+		 */
+		PredicateLockRelation(rel, NULL);
 		return false;
 	}
+	else
+		PredicateLockPage(rel, BufferGetBlockNumber(buf),
+						  scan->xs_snapshot);
 
 	/* initialize moreLeft/moreRight appropriately for scan direction */
 	if (ScanDirectionIsForward(dir))
@@ -1153,7 +1153,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 			if (!P_IGNORE(opaque))
 			{
-				PredicateLockPage(rel, blkno);
+				PredicateLockPage(rel, blkno, scan->xs_snapshot);
 				/* see if there are any matches on this page */
 				/* note that this will clear moreRight if we can stop */
 				if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque)))
@@ -1201,7 +1201,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 			if (!P_IGNORE(opaque))
 			{
-				PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf));
+				PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot);
 				/* see if there are any matches on this page */
 				/* note that this will clear moreLeft if we can stop */
 				if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
@@ -1363,11 +1363,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost)
 		buf = _bt_gettrueroot(rel);
 
 	if (!BufferIsValid(buf))
-	{
-		/* empty index... */
-		PredicateLockRelation(rel);		/* Nothing finer to lock exists. */
 		return InvalidBuffer;
-	}
 
 	page = BufferGetPage(buf);
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -1445,12 +1441,12 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
 	if (!BufferIsValid(buf))
 	{
 		/* empty index... */
-		PredicateLockRelation(rel);		/* Nothing finer to lock exists. */
+		PredicateLockRelation(rel, scan->xs_snapshot);		/* Nothing finer to lock exists. */
 		so->currPos.buf = InvalidBuffer;
 		return false;
 	}
 
-	PredicateLockPage(rel, BufferGetBlockNumber(buf));
+	PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot);
 	page = BufferGetPage(buf);
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 	Assert(P_ISLEAF(opaque));
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 1e566b2..f356874 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -113,7 +113,8 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot)
 TupleTableSlot *
 ExecSeqScan(SeqScanState *node)
 {
-	PredicateLockRelation(node->ss_currentRelation);
+	PredicateLockRelation(node->ss_currentRelation,
+						  node->ss_currentScanDesc->rs_snapshot);
 	node->ss_currentScanDesc->rs_relpredicatelocked = true;
 	return ExecScan((ScanState *) node,
 					(ExecScanAccessMtd) SeqNext,
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 9102e82..ab5aa34 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -148,9 +148,11 @@
  * predicate lock maintenance
  *		RegisterSerializableTransaction(Snapshot snapshot)
  *		RegisterPredicateLockingXid(void)
- *		PredicateLockRelation(Relation relation)
- *		PredicateLockPage(Relation relation, BlockNumber blkno)
- *		PredicateLockTuple(Relation relation, HeapTuple tuple)
+ *		PredicateLockRelation(Relation relation, Snapshot snapshot)
+ *		PredicateLockPage(Relation relation, BlockNumber blkno,
+ *						Snapshot snapshot)
+ *		PredicateLockTuple(Relation relation, HeapTuple tuple,
+ *						Snapshot snapshot)
  *		PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
  *							   BlockNumber newblkno);
  *		PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
@@ -160,7 +162,8 @@
  *
  * conflict detection (may also trigger rollback)
  *		CheckForSerializableConflictOut(bool visible, Relation relation,
- *										HeapTupleData *tup, Buffer buffer)
+ *										HeapTupleData *tup, Buffer buffer,
+ *										Snapshot snapshot)
  *		CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
  *									   Buffer buffer)
  *		CheckTableForSerializableConflictIn(Relation relation)
@@ -271,9 +274,10 @@
  * the current transaction, this is the test to see if we should do a quick
  * return.
  */
-#define SkipSerialization(relation) \
+#define SkipSerialization(relation, snapshot) \
 	((!IsolationIsSerializable()) \
 	|| ((MySerializableXact == InvalidSerializableXact)) \
+	|| ((snapshot) != NULL && !IsMVCCSnapshot(snapshot)) \
 	|| ReleasePredicateLocksIfROSafe() \
 	|| SkipPredicateLocksForRelation(relation))
 
@@ -2201,11 +2205,11 @@ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
  * Clear any finer-grained predicate locks this session has on the relation.
  */
 void
-PredicateLockRelation(const Relation relation)
+PredicateLockRelation(const Relation relation, const Snapshot snapshot)
 {
 	PREDICATELOCKTARGETTAG tag;
 
-	if (SkipSerialization(relation))
+	if (SkipSerialization(relation, snapshot))
 		return;
 
 	SET_PREDICATELOCKTARGETTAG_RELATION(tag,
@@ -2224,11 +2228,12 @@ PredicateLockRelation(const Relation relation)
  * Clear any finer-grained predicate locks this session has on the relation.
  */
 void
-PredicateLockPage(const Relation relation, const BlockNumber blkno)
+PredicateLockPage(const Relation relation, const BlockNumber blkno,
+				  const Snapshot snapshot)
 {
 	PREDICATELOCKTARGETTAG tag;
 
-	if (SkipSerialization(relation))
+	if (SkipSerialization(relation, snapshot))
 		return;
 
 	SET_PREDICATELOCKTARGETTAG_PAGE(tag,
@@ -2246,13 +2251,14 @@ PredicateLockPage(const Relation relation, const BlockNumber blkno)
  * Skip if this is a temporary table.
  */
 void
-PredicateLockTuple(const Relation relation, const HeapTuple tuple)
+PredicateLockTuple(const Relation relation, const HeapTuple tuple,
+				   const Snapshot snapshot)
 {
 	PREDICATELOCKTARGETTAG tag;
 	ItemPointer tid;
 	TransactionId targetxmin;
 
-	if (SkipSerialization(relation))
+	if (SkipSerialization(relation, snapshot))
 		return;
 
 	/*
@@ -3613,7 +3619,8 @@ XidIsConcurrent(TransactionId xid)
  */
 void
 CheckForSerializableConflictOut(const bool visible, const Relation relation,
-								const HeapTuple tuple, const Buffer buffer)
+								const HeapTuple tuple, const Buffer buffer,
+								const Snapshot snapshot)
 {
 	TransactionId xid;
 	SERIALIZABLEXIDTAG sxidtag;
@@ -3621,7 +3628,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation,
 	SERIALIZABLEXACT *sxact;
 	HTSV_Result htsvResult;
 
-	if (SkipSerialization(relation))
+	if (SkipSerialization(relation, snapshot))
 		return;
 
 	if (SxactIsMarkedForDeath(MySerializableXact))
@@ -3998,7 +4005,7 @@ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
 {
 	PREDICATELOCKTARGETTAG targettag;
 
-	if (SkipSerialization(relation))
+	if (SkipSerialization(relation, (Snapshot) NULL))
 		return;
 
 	if (SxactIsMarkedForDeath(MySerializableXact))
@@ -4090,7 +4097,7 @@ CheckTableForSerializableConflictIn(const Relation relation)
 	if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
 		return;
 
-	if (SkipSerialization(relation))
+	if (SkipSerialization(relation, (Snapshot) NULL))
 		return;
 
 	Assert(relation->rd_index == NULL); /* not an index relation */
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 760c76c..aae5326 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -44,16 +44,17 @@ extern bool PageIsPredicateLocked(const Relation relation, const BlockNumber blk
 /* predicate lock maintenance */
 extern Snapshot RegisterSerializableTransaction(Snapshot snapshot);
 extern void RegisterPredicateLockingXid(const TransactionId xid);
-extern void PredicateLockRelation(const Relation relation);
-extern void PredicateLockPage(const Relation relation, const BlockNumber blkno);
-extern void PredicateLockTuple(const Relation relation, const HeapTuple tuple);
+extern void PredicateLockRelation(const Relation relation, const Snapshot snapshot);
+extern void PredicateLockPage(const Relation relation, const BlockNumber blkno, const Snapshot snapshot);
+extern void PredicateLockTuple(const Relation relation, const HeapTuple tuple, const Snapshot snapshot);
 extern void PredicateLockPageSplit(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno);
 extern void PredicateLockPageCombine(const Relation relation, const BlockNumber oldblkno, const BlockNumber newblkno);
 extern void TransferPredicateLocksToHeapRelation(const Relation relation);
 extern void ReleasePredicateLocks(const bool isCommit);
 
 /* conflict detection (may also trigger rollback) */
-extern void CheckForSerializableConflictOut(const bool valid, const Relation relation, const HeapTuple tuple, const Buffer buffer);
+extern void CheckForSerializableConflictOut(const bool valid, const Relation relation, const HeapTuple tuple,
+											const Buffer buffer, const Snapshot snapshot);
 extern void CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple, const Buffer buffer);
 extern void CheckTableForSerializableConflictIn(const Relation relation);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to