On Fri, 2010-01-29 at 14:52 +0000, Greg Stark wrote:

> Can you explain what it does in
> more detail so we can understand why it's necessary for a sensible set
> of features?

I've slimmed down the patch to make it clearer what it does, having
committed some refactoring.

Problem: Currently when we perform conflict resolution we do not use the
relid from the WAL record, we target all users regardless of which
relations they have accessed or intend to access. So changes to table X
can cause cancelation of someone accessing table Y because **they might
later in the transaction access table X**. That is too heavy handed and
is most often overkill. This is the same problem you and I have
discussed many times, over the last 14 months, though the problem itself
has been discussed on hackers many times over last 20 months and many
potential solutions offered by me.

An example of current behaviour, using tables A, B and C 
T0: An AccessExclusiveLock is applied to B
T1: Q1 takes snapshot, takes lock on A and begins query 
T2: Q2 takes snapshot, queues for lock on B behind AccessExclusiveLock
T3: Cleanup on table C is handled that will conflict with both snapshots
T4: Q3 takes snapshot, takes lock on C and begins query (if possible)
T5: Cleanup on table C is handled that will conflict with Q3

Current: At T3, current conflict resolution will wait for
max_standby_delay and then cancel Q1 and Q2. Q3 can begin processing
immediately because the snapshot it takes will always be same or later
than the xmin that generated the cleanup at T3. At T5, Q3 will be
quickly cancelled because all the standby delay was used up at T3 and
there is none left to spend on delaying for Q3.

Proposed Resolution: 
as presented to hackers in 12/2009
http://archives.postgresql.org/pgsql-hackers/2009-12/msg01175.php

Let's look at the effect first, then return to the detail.

In this proposal, the above sequence of actions will look like this:
Conflict resolution will wait at T3 until we hit max_standby_delay, at
which point we learn that Q1 and Q2 do not conflict and we let them
continue on their way. At T5, Q3 will be cancelled without much delay, 
because we have now used up most of max_standby_delay. 

So in both approaches, Q3 that accessed table C will be canceled fairly
quickly. The key to this is that in the new proposal, Q1 and Q2 will not
be canceled: they will continue to completion.

How it works: When we process a snapshot conflict we check which queries
have snapshots that conflict. We then wait for max_standby_delay and the
check lock conflicts. (We do it this way because of a timing issue
described on the link above, pointed out by Greg). When we check for
lock conflicts we also set a latestRemovedXid on "that relation", so
that we capture all current lockers *and* allow all future lockers to
check the latestRemovedXid against their snapshot. In either case, if a
lock conflict occurs then we will cancel the query.

I mention "that relation" because *where* we record the xid limit for
each relation is an important aspect of the design. In the current patch
we take a simple approach, others are possible. If there is already a
lock in the shared lock table, then we add the latestRemovedXid to that.
If not, we keep track of the latestRemovedXid for the whole lock
partition. So we aren't tracking each relation separately in most cases,
except for when a table is being frequently accessed, or access for a
long period.

There is also an optimization added here. When we defer cancelation of
queries the same query keeps re-appearing in the conflict list for later
WAL records. As a result there is a mechanism to avoid constant
re-listing of a conflict.

The attached patch is for review and discussion only at this stage.

I'm working on other areas now while discussion takes place, or not.

-- 
 Simon Riggs           www.2ndQuadrant.com
*** a/src/backend/commands/indexcmds.c
--- b/src/backend/commands/indexcmds.c
***************
*** 540,546 **** DefineIndex(RangeVar *heapRelation,
  	 * check for that.	Also, prepared xacts are not reported, which is fine
  	 * since they certainly aren't going to do anything more.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
--- 540,546 ----
  	 * check for that.	Also, prepared xacts are not reported, which is fine
  	 * since they certainly aren't going to do anything more.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock, InvalidTransactionId);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
***************
*** 627,633 **** DefineIndex(RangeVar *heapRelation,
  	 * We once again wait until no transaction can have the table open with
  	 * the index marked as read-only for updates.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
--- 627,633 ----
  	 * We once again wait until no transaction can have the table open with
  	 * the index marked as read-only for updates.
  	 */
! 	old_lockholders = GetLockConflicts(&heaplocktag, ShareLock, InvalidTransactionId);
  
  	while (VirtualTransactionIdIsValid(*old_lockholders))
  	{
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 66,71 **** typedef struct ProcArrayStruct
--- 66,72 ----
  
  	int			numKnownAssignedXids;	/* current number of known assigned xids */
  	int			maxKnownAssignedXids;	/* allocated size of known assigned xids */
+ 
  	/*
  	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
  	 * overflowing cached subxids in PGPROC entries.
***************
*** 73,78 **** typedef struct ProcArrayStruct
--- 74,86 ----
  	TransactionId	lastOverflowedXid;
  
  	/*
+ 	 * During Hot Standby we need to store a visibility limit for to defer recovery
+ 	 * conflict processing. The cached value is accessed during lock processing,
+ 	 * but stored on the procarray header. See lock.c for further details.
+ 	 */
+ 	TransactionId	latestRemovedXidCache[NUM_LOCK_PARTITIONS];
+ 
+ 	/*
  	 * We declare procs[] as 1 entry because C wants a fixed-size array, but
  	 * actually it is maxProcs entries long.
  	 */
***************
*** 220,225 **** CreateSharedProcArray(void)
--- 228,236 ----
  											  HASH_ELEM | HASH_FUNCTION);
  		if (!KnownAssignedXidsHash)
  			elog(FATAL, "could not initialize known assigned xids hash table");
+ 
+ 		/* Initialize the latestRemovedXidCache */
+ 		MemSet(procArray->latestRemovedXidCache, 0, NUM_LOCK_PARTITIONS * sizeof(TransactionId));
  	}
  }
  
***************
*** 1216,1221 **** GetSnapshotData(Snapshot snapshot)
--- 1227,1235 ----
  		RecentGlobalXmin = FirstNormalTransactionId;
  	RecentXmin = xmin;
  
+ 	/* Reset recovery conflict processing limit after deriving new snapshot */
+ 	MyProc->limitXmin = InvalidTransactionId;
+ 
  	snapshot->xmin = xmin;
  	snapshot->xmax = xmax;
  	snapshot->xcnt = count;
***************
*** 1739,1747 **** GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
  			{
  				VirtualTransactionId vxid;
  
! 				GET_VXID_FROM_PGPROC(vxid, *proc);
! 				if (VirtualTransactionIdIsValid(vxid))
! 					vxids[count++] = vxid;
  			}
  		}
  	}
--- 1753,1774 ----
  			{
  				VirtualTransactionId vxid;
  
! 				/*
! 				 * Avoid repeated re-listing of a process for conflict processing
! 				 * by excluding it once resolved up to a particular latestRemovedXid.
! 				 * We reset proc->limitXmin each time we take a new snapshot because
! 				 * that may need to have conflict processing performed on it, as
! 				 * discussed in header comments for this function.
! 				 */
! 				if (!TransactionIdIsValid(proc->limitXmin) ||
! 					TransactionIdFollows(limitXmin, proc->limitXmin))
! 				{
! 					proc->limitXmin = limitXmin;
! 
! 					GET_VXID_FROM_PGPROC(vxid, *proc);
! 					if (VirtualTransactionIdIsValid(vxid))
! 						vxids[count++] = vxid;
! 				}
  			}
  		}
  	}
***************
*** 2559,2561 **** KnownAssignedXidsDisplay(int trace_level)
--- 2586,2610 ----
  
  	pfree(buf.data);
  }
+ 
+ /*
+  * LatestRemovedXidCache provides a way of deferring recovery conflicts up to
+  * the point where a query requests a lock on a relation. These functions provide
+  * the cache required by recovery conflict processing. (see storage/lmgr/lock.c)
+  * Function prototypes are in standby.h, not procarray.h for convenience.
+  */
+ void
+ ProcArraySetLatestRemovedXidCache(int partition, TransactionId latestRemovedXid)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 
+ 	arrayP->latestRemovedXidCache[partition] = latestRemovedXid;
+ }
+ 
+ TransactionId
+ ProcArrayGetLatestRemovedXidCache(int partition)
+ {
+ 	ProcArrayStruct *arrayP = procArray;
+ 
+ 	return arrayP->latestRemovedXidCache[partition];
+ }
*** a/src/backend/storage/ipc/standby.c
--- b/src/backend/storage/ipc/standby.c
***************
*** 27,32 ****
--- 27,33 ----
  #include "storage/procarray.h"
  #include "storage/sinvaladt.h"
  #include "storage/standby.h"
+ #include "tcop/tcopprot.h"
  #include "utils/ps_status.h"
  
  int		vacuum_defer_cleanup_age;
***************
*** 34,40 **** int		vacuum_defer_cleanup_age;
  static List *RecoveryLockList;
  
  static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason);
  static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
  static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
  static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
--- 35,43 ----
  static List *RecoveryLockList;
  
  static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason,
! 									   TransactionId latestRemovedXid,
! 									   Oid dbOid, Oid relOid);
  static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
  static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
  static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
***************
*** 157,165 **** WaitExceedsMaxStandbyDelay(void)
   */
  static void
  ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason)
  {
  	char		waitactivitymsg[100];
  
  	while (VirtualTransactionIdIsValid(*waitlist))
  	{
--- 160,171 ----
   */
  static void
  ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
! 									   ProcSignalReason reason,
! 									   TransactionId latestRemovedXid,
! 									   Oid dbOid, Oid relOid)
  {
  	char		waitactivitymsg[100];
+ 	VirtualTransactionId *lockconflicts = NULL;
  
  	while (VirtualTransactionIdIsValid(*waitlist))
  	{
***************
*** 203,221 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  			if (WaitExceedsMaxStandbyDelay())
  			{
  				pid_t pid;
  
  				/*
! 				 * Now find out who to throw out of the balloon.
  				 */
! 				Assert(VirtualTransactionIdIsValid(*waitlist));
! 				pid = CancelVirtualTransaction(*waitlist, reason);
! 
! 				/*
! 				 * Wait awhile for it to die so that we avoid flooding an
! 				 * unresponsive backend when system is heavily loaded.
! 				 */
! 				if (pid != 0)
! 					pg_usleep(5000);
  			}
  		}
  
--- 209,279 ----
  			if (WaitExceedsMaxStandbyDelay())
  			{
  				pid_t pid;
+ 				bool cancel = true;
  
  				/*
! 				 * If it is a snapshot conflict decide whether we should resolve
! 				 * immediately or defer the resolution until later. This must be
! 				 * designed carefully so that it is a deferral in all cases, in
! 				 * particular we must not set the latestRemovedXid until when the
! 				 * stand_delay has reached max, which is now!
! 				 *
! 				 * If we had a snapshot conflict then we also check for lock conflicts.
! 				 * Lock conflicts disregard whether backends are holding or waiting for
! 				 * the lock, so we get everybody that has declared an intention on the
! 				 * lock. We are careful to acquire the lock partition lock in exclusive
! 				 * mode, so that no concurrent lock requestors slip by while we do this.
! 				 * While holding the lock partition lock we also record the latestRemovedXid
! 				 * so that anybody arriving after we release the lock will cancel
! 				 * themselves using the same errors they would have got here.
! 				 *
! 				 * If conflicts do exist yet don't conflict with the snapshot then we
! 				 * then we do not need to cancel. So we need to merge the two lists
! 				 * to determine who can be spared, for now.
! 				 *
! 				 * XXX O(N^2) at present, but neither list is long in the common case.
  				 */
! 				if (reason == PROCSIG_RECOVERY_CONFLICT_SNAPSHOT)
! 				{
! 					LOCKTAG			locktag;
! 					VirtualTransactionId *conflicts;
! 
! 					SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
! 
! 					if (!lockconflicts)
! 						lockconflicts = GetLockConflicts(&locktag, AccessExclusiveLock,
! 															latestRemovedXid);
! 					conflicts = lockconflicts;
! 					cancel = false;
! 					while (VirtualTransactionIdIsValid(*conflicts))
! 					{
! 						if (waitlist->backendId == conflicts->backendId &&
! 							waitlist->localTransactionId == conflicts->localTransactionId)
! 						{
! 							cancel = true;
! 							break;
! 						}
! 						conflicts++;
! 					}
! 				}
! 
! 				if (cancel)
! 				{
! 					/*
! 					 * Now find out who to throw out of the balloon.
! 					 */
! 					Assert(VirtualTransactionIdIsValid(*waitlist));
! 					pid = CancelVirtualTransaction(*waitlist, reason);
! 
! 					/*
! 					 * Wait awhile for it to die so that we avoid flooding an
! 					 * unresponsive backend when system is heavily loaded.
! 					 */
! 					if (pid != 0)
! 						pg_usleep(5000);
! 				}
! 				else
! 					break;
  			}
  		}
  
***************
*** 232,238 **** ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
  }
  
  void
! ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid)
  {
  	VirtualTransactionId *backends;
  
--- 290,296 ----
  }
  
  void
! ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
  {
  	VirtualTransactionId *backends;
  
***************
*** 240,246 **** ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid)
  										 InvalidOid);
  
  	ResolveRecoveryConflictWithVirtualXIDs(backends,
! 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
  }
  
  void
--- 298,318 ----
  										 InvalidOid);
  
  	ResolveRecoveryConflictWithVirtualXIDs(backends,
! 										   PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
! 										   latestRemovedXid,
! 										   node.dbNode, node.relNode);
! }
! 
! /*
!  * ResolveRecoveryConflictWithRemovedTransactionId is called to resolve
!  * a deferred conflict.
!  *
!  * Treat as a self-interrupt, but don't bother actually sending the signal.
!  */
! void
! ResolveRecoveryConflictWithRemovedTransactionId(void)
! {
! 	RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
  }
  
  void
***************
*** 270,276 **** ResolveRecoveryConflictWithTablespace(Oid tsid)
  	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
  												InvalidOid);
  	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
! 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
  }
  
  void
--- 342,350 ----
  	temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
  												InvalidOid);
  	ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
! 										   PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
! 										   InvalidTransactionId,
! 										   0, 0);
  }
  
  void
***************
*** 321,327 **** ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
  	while (!lock_acquired)
  	{
  		if (++num_attempts < 3)
! 			backends = GetLockConflicts(&locktag, AccessExclusiveLock);
  		else
  		{
  			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
--- 395,403 ----
  	while (!lock_acquired)
  	{
  		if (++num_attempts < 3)
! 			backends = GetLockConflicts(&locktag,
! 										AccessExclusiveLock,
! 										InvalidTransactionId);
  		else
  		{
  			backends = GetConflictingVirtualXIDs(InvalidTransactionId,
***************
*** 330,336 **** ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
  		}
  
  		ResolveRecoveryConflictWithVirtualXIDs(backends,
! 											   PROCSIG_RECOVERY_CONFLICT_LOCK);
  
  		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
  											!= LOCKACQUIRE_NOT_AVAIL)
--- 406,414 ----
  		}
  
  		ResolveRecoveryConflictWithVirtualXIDs(backends,
! 											   PROCSIG_RECOVERY_CONFLICT_LOCK,
! 											   InvalidTransactionId,
! 											   0, 0);
  
  		if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
  											!= LOCKACQUIRE_NOT_AVAIL)
*** a/src/backend/storage/lmgr/lock.c
--- b/src/backend/storage/lmgr/lock.c
***************
*** 35,40 ****
--- 35,41 ----
  #include "access/transam.h"
  #include "access/twophase.h"
  #include "access/twophase_rmgr.h"
+ #include "access/xact.h"
  #include "miscadmin.h"
  #include "pg_trace.h"
  #include "pgstat.h"
***************
*** 439,444 **** ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
--- 440,486 ----
  	return lockhash;
  }
  
+ /*
+  * SetRecoveryConflictLatestRemovedXid functions manage the latestRemovedXid cache.
+  * During Hot Standby we generate a list of conflicts and then resolve only those
+  * conflicts that have a clear and present lock conflict. We defer the conflict
+  * for other relations to some later time, hopefully never. The deferral mechanism
+  * is that we store the latestRemovedXid that applies to a relation behind the
+  * lock partition that applies for that relation. These routines exist to provide
+  * a level of modularity to allow us to alter the cacheing mechanism as needed.
+  *
+  * It is possible to defer the recovery conflict right up until the point that
+  * we access a particular data block. Doing that means we would have to cache
+  * latestRemovedXid for each data block, even those that have exited and re-entered
+  * cache. It would also mean we would need to re-check latestRemovedXid each time
+  * we access a data block. Both of those factors make that level of granularity
+  * more trouble than it would likely be worth in practice. So we check the
+  * latestRemovedXid once each time we lock a relation: smaller cache, fewer checks.
+  *
+  * We store the latestRemovedXid on the lock entry, giving easy access to the value
+  * that applies for that relation. If a lock is heavily used or held for a long
+  * duration then it will effectively have its own private cache. When the lock entry
+  * is removed, we lose the cached latestRemovedXid value.
+  *
+  * When a lock entry is created we set the value from the cache. The current cache
+  * is just a single value for each partition, though that could be extended to have
+  * multiple entries for frequently used relations. The current cache is stored on
+  * the header of the ProcArrayStruct as a convenient place to store shmem data
+  * and could be relocated elsewhere if required.
+  */
+ static void
+ LockSetRecoveryConflictLatestRemovedXidForPartition(int partition, TransactionId latestRemovedXid)
+ {
+ 	Assert(InHotStandby);
+ 
+ 	ProcArraySetLatestRemovedXidCache(partition, latestRemovedXid);
+ }
+ 
+ static TransactionId
+ LockGetRecoveryConflictLatestRemovedXidForPartition(int partition)
+ {
+ 	return ProcArrayGetLatestRemovedXidCache(partition);
+ }
  
  /*
   * LockAcquire -- Check for lock conflicts, sleep if conflict found,
***************
*** 632,637 **** LockAcquireExtended(const LOCKTAG *locktag,
--- 674,685 ----
  		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
  		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
  		LOCK_PRINT("LockAcquire: new", lock, lockmode);
+ 
+ 		/*
+ 		 * Start the latestRemovedXid from the partition's cached value
+ 		 */
+ 		if (TransactionStartedDuringRecovery())
+ 			lock->latestRemovedXid = LockGetRecoveryConflictLatestRemovedXidForPartition(partition);
  	}
  	else
  	{
***************
*** 642,647 **** LockAcquireExtended(const LOCKTAG *locktag,
--- 690,726 ----
  	}
  
  	/*
+ 	 * If latestRemovedXid is set, resolve any recovery conflict. We do this
+ 	 * here whether we wait or not. If our snapshot conflicts then we will
+ 	 * be cancelled, so there is no need for us to re-check this after we
+ 	 * wake from a wait for the lock.
+ 	 */
+ 	if (TransactionIdIsValid(lock->latestRemovedXid) &&
+ 		TransactionIdIsValid(MyProc->xmin) &&
+ 		TransactionIdFollowsOrEquals(lock->latestRemovedXid, MyProc->xmin))
+ 	{
+ 		if (lock->nRequested == 0)
+ 		{
+ 			/*
+ 			 * There are no other requestors of this lock, so garbage-collect
+ 			 * the lock object.  We *must* do this to avoid a permanent leak
+ 			 * of shared memory, because there won't be anything to cause
+ 			 * anyone to release the lock object later.
+ 			 */
+ 			Assert(SHMQueueEmpty(&(lock->procLocks)));
+ 			if (!hash_search_with_hash_value(LockMethodLockHash,
+ 											 (void *) &(lock->tag),
+ 											 hashcode,
+ 											 HASH_REMOVE,
+ 											 NULL))
+ 				elog(PANIC, "lock table corrupted");
+ 		}
+ 		LWLockRelease(partitionLock);
+ 		ResolveRecoveryConflictWithRemovedTransactionId();
+ 		/* Not reached */
+ 	}
+ 
+ 	/*
  	 * Create the hash key for the proclock table.
  	 */
  	proclocktag.myLock = lock;
***************
*** 1788,1796 **** LockReassignCurrentOwner(void)
   * uses of the result.
   */
  VirtualTransactionId *
! GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
  {
! 	VirtualTransactionId *vxids;
  	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
  	LockMethod	lockMethodTable;
  	LOCK	   *lock;
--- 1867,1875 ----
   * uses of the result.
   */
  VirtualTransactionId *
! GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, TransactionId latestRemovedXid)
  {
! 	static VirtualTransactionId *vxids;
  	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
  	LockMethod	lockMethodTable;
  	LOCK	   *lock;
***************
*** 1798,1803 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1877,1883 ----
  	SHM_QUEUE  *procLocks;
  	PROCLOCK   *proclock;
  	uint32		hashcode;
+ 	int			partition;
  	LWLockId	partitionLock;
  	int			count = 0;
  
***************
*** 1812,1827 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
  	 * need enough space for MaxBackends + a terminator, since prepared xacts
  	 * don't count.
  	 */
! 	vxids = (VirtualTransactionId *)
! 		palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
  
  	/*
  	 * Look up the lock object matching the tag.
  	 */
  	hashcode = LockTagHashCode(locktag);
  	partitionLock = LockHashPartitionLock(hashcode);
  
! 	LWLockAcquire(partitionLock, LW_SHARED);
  
  	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
  												(void *) locktag,
--- 1892,1929 ----
  	 * need enough space for MaxBackends + a terminator, since prepared xacts
  	 * don't count.
  	 */
! 	if (!InHotStandby)
! 		vxids = (VirtualTransactionId *)
! 			palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
! 	else
! 	{
! 		if (vxids == NULL)
! 		{
! 			vxids = (VirtualTransactionId *)
! 				malloc(sizeof(VirtualTransactionId) * (MaxBackends + 1));
! 			if (vxids == NULL)
! 				ereport(ERROR,
! 					(errcode(ERRCODE_OUT_OF_MEMORY),
! 					 errmsg("out of memory")));
! 
! 		}
! 	}
  
  	/*
  	 * Look up the lock object matching the tag.
  	 */
  	hashcode = LockTagHashCode(locktag);
+ 	partition = LockHashPartition(hashcode);
  	partitionLock = LockHashPartitionLock(hashcode);
  
! 	/*
! 	 * If we are setting latestRemovedXid we need to get exclusive lock
! 	 * to avoid race conditions.
! 	 */
! 	if (TransactionIdIsValid(latestRemovedXid))
! 		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
! 	else
! 		LWLockAcquire(partitionLock, LW_SHARED);
  
  	lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
  												(void *) locktag,
***************
*** 1831,1836 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1933,1944 ----
  	if (!lock)
  	{
  		/*
+ 		 * Set the latestRemovedXid for the partition.
+ 		 */
+ 		if (TransactionIdIsValid(latestRemovedXid))
+ 			LockSetRecoveryConflictLatestRemovedXidForPartition(partition, latestRemovedXid);
+ 
+ 		/*
  		 * If the lock object doesn't exist, there is nothing holding a lock
  		 * on this lockable object.
  		 */
***************
*** 1839,1844 **** GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
--- 1947,1958 ----
  	}
  
  	/*
+ 	 * Set the latestRemovedXid for the lock
+ 	 */
+ 	if (TransactionIdIsValid(latestRemovedXid))
+ 		lock->latestRemovedXid = latestRemovedXid;
+ 
+ 	/*
  	 * Examine each existing holder (or awaiter) of the lock.
  	 */
  	conflictMask = lockMethodTable->conflictTab[lockmode];
*** a/src/include/storage/lock.h
--- b/src/include/storage/lock.h
***************
*** 312,317 **** typedef struct LOCK
--- 312,323 ----
  	int			nRequested;		/* total of requested[] array */
  	int			granted[MAX_LOCKMODES]; /* counts of granted locks */
  	int			nGranted;		/* total of granted[] array */
+ 
+ 	/*
+ 	 * For Hot Standby, the xid limit for access to this table. Allows deferred
+ 	 * and selective conflict processing based upon the relation id.
+ 	 */
+ 	TransactionId latestRemovedXid;
  } LOCK;
  
  #define LOCK_LOCKMETHOD(lock) ((LOCKMETHODID) (lock).tag.locktag_lockmethodid)
***************
*** 488,494 **** extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
! 				 LOCKMODE lockmode);
  extern void AtPrepare_Locks(void);
  extern void PostPrepare_Locks(TransactionId xid);
  extern int LockCheckConflicts(LockMethod lockMethodTable,
--- 494,500 ----
  extern void LockReleaseCurrentOwner(void);
  extern void LockReassignCurrentOwner(void);
  extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
! 				 LOCKMODE lockmode, TransactionId latestRemovedXid);
  extern void AtPrepare_Locks(void);
  extern void PostPrepare_Locks(TransactionId xid);
  extern int LockCheckConflicts(LockMethod lockMethodTable,
*** a/src/include/storage/proc.h
--- b/src/include/storage/proc.h
***************
*** 102,107 **** struct PGPROC
--- 102,112 ----
  	 */
  	bool		recoveryConflictPending;
  
+ 	TransactionId limitXmin;	/* The latest xid for which conflict processing
+ 								 * has been already performed for this proc.
+ 								 * Updated by snapshot conflicts, reset when
+ 								 * we take a new snapshot. */
+ 
  	/* Info about LWLock the process is currently waiting for, if any. */
  	bool		lwWaiting;		/* true if waiting for an LW lock */
  	bool		lwExclusive;	/* true if waiting for exclusive access */
*** a/src/include/storage/standby.h
--- b/src/include/storage/standby.h
***************
*** 29,34 **** extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
--- 29,38 ----
  extern void ResolveRecoveryConflictWithBufferPin(void);
  extern void SendRecoveryConflictWithBufferPin(void);
  
+ /* Prototypes here rather than in procarray.h */
+ extern void ProcArraySetLatestRemovedXidCache(int partition, TransactionId latestRemovedXid);
+ extern TransactionId ProcArrayGetLatestRemovedXidCache(int partition);
+ 
  /*
   * Standby Rmgr (RM_STANDBY_ID)
   *
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to