On Sun, 2010-04-25 at 13:51 -0400, Tom Lane wrote:
> Simon Riggs <si...@2ndquadrant.com> writes:
> > On Sun, 2010-04-25 at 13:33 -0400, Tom Lane wrote:
> >> If you like I'll have a go at rewriting the comments for this patch,
> >> because I am currently thinking that the problem is not so much with
> >> the code as with the poor explanation of what it's doing.  Sometimes
> >> the author is too close to the code to understand why other people
> >> have a hard time understanding it.
> 
> > That would help me, thank you.
> 
> OK.  You said you were currently working some more on the patch, so
> I'll wait for v3 and then work on it.

v3 attached

Changes:
* Strange locking in KnownAssignedXidsAdd() moved to RecordKnown...
* KnownAssignedXidsAdd() reordered, assert-ish code added
* Tail movement during snapshots no longer possible
* Tail movement during xid removal added to KnownAssignedXidsSearch()
* Major comment hacking

Little bit rough, definitely needs a re-read of all comments, so good
time to send over.

-- 
 Simon Riggs           www.2ndQuadrant.com
*** a/src/backend/access/transam/twophase.c
--- b/src/backend/access/transam/twophase.c
***************
*** 1200,1205 **** StandbyTransactionIdIsPrepared(TransactionId xid)
--- 1200,1208 ----
  
  	Assert(TransactionIdIsValid(xid));
  
+ 	if (max_prepared_xacts <= 0)
+ 		return false;					/* nothing to do */
+ 
  	/* Read and validate file */
  	buf = ReadTwoPhaseFile(xid, false);
  	if (buf == NULL)
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 6454,6459 **** CheckRecoveryConsistency(void)
--- 6454,6465 ----
  	}
  }
  
+ bool
+ XLogConsistentState(void)
+ {
+ 	return reachedMinRecoveryPoint;
+ }
+ 
  /*
   * Is the system still in recovery?
   *
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 52,57 ****
--- 52,58 ----
  #include "access/twophase.h"
  #include "miscadmin.h"
  #include "storage/procarray.h"
+ #include "storage/spin.h"
  #include "storage/standby.h"
  #include "utils/builtins.h"
  #include "utils/snapmgr.h"
***************
*** 64,73 **** typedef struct ProcArrayStruct
  	int			numProcs;		/* number of valid procs entries */
  	int			maxProcs;		/* allocated size of procs array */
  
! 	int			numKnownAssignedXids;	/* current number of known assigned
! 										 * xids */
! 	int			maxKnownAssignedXids;	/* allocated size of known assigned
! 										 * xids */
  
  	/*
  	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
--- 65,84 ----
  	int			numProcs;		/* number of valid procs entries */
  	int			maxProcs;		/* allocated size of procs array */
  
! 	/*
! 	 * Known assigned xids handling
! 	 */
! 	int			maxKnownAssignedXids;	/* allocated size */
! 
! 	/*
! 	 * Callers must hold either ProcArrayLock in Exclusive mode or
! 	 * ProcArrayLock in Shared mode *and* known_assigned_xids_lck
! 	 * to update these values.
! 	 */
! 	int			numKnownAssignedXids;	/* currrent # valid entries */
! 	int			tailKnownAssignedXids;	/* current tail */
! 	int			headKnownAssignedXids;	/* current head */
! 	slock_t		known_assigned_xids_lck;	/* shared protection lock */
  
  	/*
  	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
***************
*** 87,93 **** static ProcArrayStruct *procArray;
  /*
   * Bookkeeping for tracking emulated transactions in recovery
   */
! static HTAB *KnownAssignedXidsHash;
  static TransactionId latestObservedXid = InvalidTransactionId;
  
  /*
--- 98,105 ----
  /*
   * Bookkeeping for tracking emulated transactions in recovery
   */
! static TransactionId *KnownAssignedXids;
! static bool *KnownAssignedXidsValid;
  static TransactionId latestObservedXid = InvalidTransactionId;
  
  /*
***************
*** 142,150 **** static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
  static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
  							   TransactionId xmax);
  static bool KnownAssignedXidsExist(TransactionId xid);
! static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
  static void KnownAssignedXidsRemove(TransactionId xid);
! static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts);
  static void KnownAssignedXidsDisplay(int trace_level);
  
  /*
--- 154,166 ----
  static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
  							   TransactionId xmax);
  static bool KnownAssignedXidsExist(TransactionId xid);
! static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
! 								bool exclusive_lock);
  static void KnownAssignedXidsRemove(TransactionId xid);
! static void KnownAssignedXidsRemoveMany(TransactionId xid);
! static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
! 									  TransactionId *subxids);
! static void KnownAssignedXidsCompress(bool full);
  static void KnownAssignedXidsDisplay(int trace_level);
  
  /*
***************
*** 204,228 **** CreateSharedProcArray(void)
  		procArray->numKnownAssignedXids = 0;
  		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
  		procArray->lastOverflowedXid = InvalidTransactionId;
  	}
  
  	if (XLogRequestRecoveryConnections)
  	{
! 		/* Create or attach to the KnownAssignedXids hash table */
! 		HASHCTL		info;
! 
! 		MemSet(&info, 0, sizeof(info));
! 		info.keysize = sizeof(TransactionId);
! 		info.entrysize = sizeof(TransactionId);
! 		info.hash = tag_hash;
! 
! 		KnownAssignedXidsHash = ShmemInitHash("KnownAssignedXids Hash",
! 											  TOTAL_MAX_CACHED_SUBXIDS,
! 											  TOTAL_MAX_CACHED_SUBXIDS,
! 											  &info,
! 											  HASH_ELEM | HASH_FUNCTION);
! 		if (!KnownAssignedXidsHash)
! 			elog(FATAL, "could not initialize known assigned xids hash table");
  	}
  }
  
--- 220,240 ----
  		procArray->numKnownAssignedXids = 0;
  		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
  		procArray->lastOverflowedXid = InvalidTransactionId;
+ 		procArray->tailKnownAssignedXids = 0;
+ 		procArray->headKnownAssignedXids = 0;
+ 		SpinLockInit(&procArray->known_assigned_xids_lck);
  	}
  
  	if (XLogRequestRecoveryConnections)
  	{
! 		KnownAssignedXids = (TransactionId *)
! 				ShmemInitStruct("KnownAssignedXids",
! 						mul_size(sizeof(TransactionId), TOTAL_MAX_CACHED_SUBXIDS),
! 						&found);
! 		KnownAssignedXidsValid = (bool *)
! 				ShmemInitStruct("KnownAssignedXidsValid",
! 						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
! 						&found);
  	}
  }
  
***************
*** 544,550 **** ProcArrayApplyRecoveryInfo(RunningTransactions running)
  		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
  			continue;
  
! 		KnownAssignedXidsAdd(&xid, 1);
  	}
  
  	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
--- 556,562 ----
  		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
  			continue;
  
! 		KnownAssignedXidsAdd(xid, xid, true);
  	}
  
  	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
***************
*** 617,624 **** ProcArrayApplyXidAssignment(TransactionId topxid,
  	/*
  	 * Remove from known-assigned-xacts.
  	 */
! 	for (i = 0; i < nsubxids; i++)
! 		KnownAssignedXidsRemove(subxids[i]);
  
  	/*
  	 * Advance lastOverflowedXid when required.
--- 629,635 ----
  	/*
  	 * Remove from known-assigned-xacts.
  	 */
! 	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
  
  	/*
  	 * Advance lastOverflowedXid when required.
***************
*** 2128,2134 **** DisplayXidCache(void)
   *
   * During recovery we do not fret too much about the distinction between
   * top-level xids and subtransaction xids. We hold both together in
!  * a hash table called KnownAssignedXids. In backends, this is copied into
   * snapshots in GetSnapshotData(), taking advantage
   * of the fact that XidInMVCCSnapshot() doesn't care about the distinction
   * either. Subtransaction xids are effectively treated as top-level xids
--- 2139,2145 ----
   *
   * During recovery we do not fret too much about the distinction between
   * top-level xids and subtransaction xids. We hold both together in
!  * a data structure called KnownAssignedXids. In backends, this is copied into
   * snapshots in GetSnapshotData(), taking advantage
   * of the fact that XidInMVCCSnapshot() doesn't care about the distinction
   * either. Subtransaction xids are effectively treated as top-level xids
***************
*** 2156,2164 **** DisplayXidCache(void)
   * transaction are not visible to backends in the standby.
   * We prune KnownAssignedXids when XLOG_XACT_RUNNING_XACTS arrives, to
   * ensure we do not overflow.
-  *
-  * If we are in STANDBY_SNAPSHOT_PENDING state, then we may try to remove
-  * xids that are not present.
   */
  void
  RecordKnownAssignedTransactionIds(TransactionId xid)
--- 2167,2172 ----
***************
*** 2196,2240 **** RecordKnownAssignedTransactionIds(TransactionId xid)
  	 */
  	if (TransactionIdFollows(xid, latestObservedXid))
  	{
! 		TransactionId next_expected_xid = latestObservedXid;
! 
! 		TransactionIdAdvance(next_expected_xid);
  
  		/*
! 		 * Locking requirement is currently higher than for xid assignment in
! 		 * normal running. However, we only get called here for new high xids
! 		 * - so on a multi-processor where it is common that xids arrive out
! 		 * of order the average number of locks per assignment will actually
! 		 * reduce. So not too worried about this locking.
! 		 *
! 		 * XXX It does seem possible that we could add a whole range of
! 		 * numbers atomically to KnownAssignedXids, if we use a sorted list
! 		 * for KnownAssignedXids. But that design also increases the length of
! 		 * time we hold lock when we process commits/aborts, so on balance
! 		 * don't worry about this.
  		 */
! 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
! 
  		while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
  		{
- 			if (TransactionIdPrecedes(next_expected_xid, xid))
- 				elog(trace_recovery(DEBUG4),
- 					 "recording unobserved xid %u (latestObservedXid %u)",
- 						next_expected_xid, latestObservedXid);
- 			KnownAssignedXidsAdd(&next_expected_xid, 1);
- 
- 			/*
- 			 * Extend clog and subtrans like we do in GetNewTransactionId()
- 			 * during normal operation
- 			 */
  			ExtendCLOG(next_expected_xid);
  			ExtendSUBTRANS(next_expected_xid);
  
  			TransactionIdAdvance(next_expected_xid);
  		}
  
  		LWLockRelease(ProcArrayLock);
  
  		latestObservedXid = xid;
  	}
  
--- 2204,2239 ----
  	 */
  	if (TransactionIdFollows(xid, latestObservedXid))
  	{
! 		TransactionId next_expected_xid;
  
  		/*
! 		 * Extend clog and subtrans like we do in GetNewTransactionId()
! 		 * during normal operation using individual extend steps.
! 		 * Typical case requires almost no activity.
  		 */
! 		next_expected_xid = latestObservedXid;
! 		TransactionIdAdvance(next_expected_xid);
  		while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
  		{
  			ExtendCLOG(next_expected_xid);
  			ExtendSUBTRANS(next_expected_xid);
  
  			TransactionIdAdvance(next_expected_xid);
  		}
  
+ 		/*
+ 		 * Add the new xids onto the KnownAssignedXids array. Note that
+ 		 * we only need ProcArrayLock in shared mode to do this.
+ 		 */
+ 		next_expected_xid = latestObservedXid;
+ 		TransactionIdAdvance(next_expected_xid);
+ 		LWLockAcquire(ProcArrayLock, LW_SHARED);
+ 		KnownAssignedXidsAdd(next_expected_xid, xid, false);
  		LWLockRelease(ProcArrayLock);
  
+ 		/*
+ 		 * Now we can advance latestObservedXid
+ 		 */
  		latestObservedXid = xid;
  	}
  
***************
*** 2251,2257 **** void
  ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
  									  TransactionId *subxids)
  {
- 	int			i;
  	TransactionId max_xid;
  
  	if (standbyState == STANDBY_DISABLED)
--- 2250,2255 ----
***************
*** 2264,2273 **** ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
  	 */
  	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
  
! 	if (TransactionIdIsValid(xid))
! 		KnownAssignedXidsRemove(xid);
! 	for (i = 0; i < nsubxids; i++)
! 		KnownAssignedXidsRemove(subxids[i]);
  
  	/* Like in ProcArrayRemove, advance latestCompletedXid */
  	if (TransactionIdFollowsOrEquals(max_xid,
--- 2262,2268 ----
  	 */
  	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
  
! 	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
  
  	/* Like in ProcArrayRemove, advance latestCompletedXid */
  	if (TransactionIdFollowsOrEquals(max_xid,
***************
*** 2281,2287 **** void
  ExpireAllKnownAssignedTransactionIds(void)
  {
  	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
! 	KnownAssignedXidsRemoveMany(InvalidTransactionId, false);
  	LWLockRelease(ProcArrayLock);
  }
  
--- 2276,2282 ----
  ExpireAllKnownAssignedTransactionIds(void)
  {
  	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
! 	KnownAssignedXidsRemoveMany(InvalidTransactionId);
  	LWLockRelease(ProcArrayLock);
  }
  
***************
*** 2289,2379 **** void
  ExpireOldKnownAssignedTransactionIds(TransactionId xid)
  {
  	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
! 	KnownAssignedXidsRemoveMany(xid, true);
  	LWLockRelease(ProcArrayLock);
  }
  
  /*
   * Private module functions to manipulate KnownAssignedXids
   *
!  * There are 3 main users of the KnownAssignedXids data structure:
   *
!  *	 * backends taking snapshots
   *	 * startup process adding new knownassigned xids
   *	 * startup process removing xids as transactions end
!  *
!  * If we make KnownAssignedXids a simple sorted array then the first two
!  * operations are fast, but the last one is at least O(N). If we make
!  * KnownAssignedXids a hash table then the last two operations are fast,
!  * though we have to do more work at snapshot time. Doing more work at
!  * commit could slow down taking snapshots anyway because of lwlock
!  * contention. Scanning the hash table is O(N) on the max size of the array,
!  * so performs poorly in comparison when we have very low numbers of
!  * write transactions to process. But at least it is constant overhead
!  * and a sequential memory scan will utilise hardware memory readahead
!  * to give much improved performance. In any case the emphasis must be on
!  * having the standby process changes quickly so that it can provide
!  * high availability. So we choose to implement as a hash table.
   */
  
  /*
!  * Add xids into KnownAssignedXids.
   *
!  * Must be called while holding ProcArrayLock in Exclusive mode
   */
  static void
! KnownAssignedXidsAdd(TransactionId *xids, int nxids)
  {
! 	TransactionId *result;
! 	bool		found;
! 	int			i;
  
! 	for (i = 0; i < nxids; i++)
  	{
! 		Assert(TransactionIdIsValid(xids[i]));
  
! 		elog(trace_recovery(DEBUG4), "adding KnownAssignedXid %u", xids[i]);
  
! 		procArray->numKnownAssignedXids++;
! 		if (procArray->numKnownAssignedXids > procArray->maxKnownAssignedXids)
  		{
! 			KnownAssignedXidsDisplay(LOG);
! 			LWLockRelease(ProcArrayLock);
! 			elog(ERROR, "too many KnownAssignedXids (%u)", procArray->maxKnownAssignedXids);
  		}
  
! 		result = (TransactionId *) hash_search(KnownAssignedXidsHash, &xids[i], HASH_ENTER,
! 											   &found);
  
! 		if (!result)
  		{
! 			LWLockRelease(ProcArrayLock);
! 			ereport(ERROR,
! 					(errcode(ERRCODE_OUT_OF_MEMORY),
! 					 errmsg("out of shared memory")));
  		}
  
! 		if (found)
  		{
- 			KnownAssignedXidsDisplay(LOG);
  			LWLockRelease(ProcArrayLock);
! 			elog(ERROR, "found duplicate KnownAssignedXid %u", xids[i]);
  		}
  	}
  }
  
  /*
!  * Is an xid present in KnownAssignedXids?
   *
!  * Must be called while holding ProcArrayLock in shared mode
   */
  static bool
  KnownAssignedXidsExist(TransactionId xid)
  {
! 	bool		found;
  
! 	(void) hash_search(KnownAssignedXidsHash, &xid, HASH_FIND, &found);
! 	return found;
  }
  
  /*
--- 2284,2632 ----
  ExpireOldKnownAssignedTransactionIds(TransactionId xid)
  {
  	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
! 	KnownAssignedXidsRemoveMany(xid);
  	LWLockRelease(ProcArrayLock);
  }
  
  /*
   * Private module functions to manipulate KnownAssignedXids
   *
!  * There are 5 main users of the KnownAssignedXids data structure:
   *
!  *	 * backends taking snapshots - all valid xids are copied out
!  *	 * backends seeking to determine presence of an xid
   *	 * startup process adding new knownassigned xids
   *	 * startup process removing xids as transactions end
!  *	 * startup process left pruning array when special WAL records arrive
!  *
!  * We use a sorted array to store xids, always adding xids at head of array.
!  * The array grows until it hits the right end and then we compress the array
!  * so it starts at zeroth element again.
!  *
!  * Maintaing a compact array of valid xids is costly, so we allow gaps to
!  * form in the sequence so we can avoid compressing the contents after each
!  * removal. Gaps are represented by tracking the validity of xids in a
!  * separate array of booleans.
!  *
!  * If we have a maximum of M slots, with N xids currently spread across
!  * S elements then we have N <= S <= M always.
!  *
!  * Keeping track of both head and tail allows a simple binary search for
!  * both searching and removing. Since the array never wraps, we have
!  * head >= tail always. When xids wrap we can continue to use this
!  * technique though this only works because the number of xids we
!  * store is less than half the maximum number of xids, so we only get
!  * a single wrap. That is different from general user indexes where
!  * bsearch is not possible because the number of rows is not sufficiently
!  * limited.
!  *
!  * Only the Startup process ever modifies the array, other backends just
!  * read it or search it.  When we add xids we first add the new values beyond
!  * the head and then move the head of the array. That allows us to move the
!  * head of the array while holding ProcArrayLock in share mode.
!  * That's not the end of the story though: on CPUs with weak-memory ordering
!  * we must use spinlocks to protect the head, since we can't guarantee the
!  * order of writes to memory. Rule is that updates of head and tail of the
!  * array require ProcArrayLock in exclusive mode or (shared mode and
!  * known_assigned_xids_lck spinlock).
!  *
!  * Algorithmic analysis
!  *  * Adding new xids is ~O(1) using shared locks (exclusive for compress)
!  *  * Removing an xid tree of k xids is k*O(logS) using exclusive locks
!  *  * Taking snapshots is O(S) and is similar to normal running
!  *  * Locating an xid is essentially same as Removing
!  *  * Pruning is O(1) in vast majority of cases
!  *
!  * In comparison, using a hash table for KnownAssignedXids would mean that
!  * taking snapshots would be O(M). If we can maintain S << M then the
!  * sorted array technique will deliver significantly faster snapshots.
!  * If we try to keep S too small then we increase cost of removal
!  * dramatically, so there is an optimal point for any workload mix. We
!  * use a heuristic to decide when to compress the array, though trimming
!  * also helps reduce frequency of compressing. The heuristic requires us
!  * to track the number of currently valid xids, though updating that does
!  * not increase the number of lock accesses required.
!  *
!  *  procArray->tailKnownAssignedXids points to first currently valid xid
!  *  procArray->headKnownAssignedXids points to next xid insertion point,
!  *  which may be one off to right of array
   */
+ #define RESULT_NOT_FOUND	-1
  
  /*
!  * Compress KnownAssignedXids by removing any gaps in virtual array.
   *
!  * Must be called while holding ProcArrayLock in Exclusive mode.
   */
  static void
! KnownAssignedXidsCompress(bool full)
  {
! 	/* use volatile pointer to prevent code rearrangement */
! 	volatile ProcArrayStruct *pArray = procArray;
  
! 	int	head, tail;
! 	int	compress_index;
! 	int	i;
! 
! 	/* no spinlock required when we hold ProcArrayLock exclusive mode */
! 	head = pArray->headKnownAssignedXids;
! 	tail = pArray->tailKnownAssignedXids;
! 
! 	if (!full)
  	{
! 		int	nelements = head - tail;
  
! 		/*
! 		 * If we can choose how much to compress, use a heuristic to
! 		 * avoid compressing too often or not often enough.
! 		 *
! 		 * Heuristic is if we have a large enough current spread and
! 		 * less than 50% of the elements are currently in use, then
! 		 * compress. This should ensure we compress fairly infrequently.
! 		 * We could compress less often though the virtual array would
! 		 * spread out more and snapshots would become more expensive.
! 		 */
! 		if (nelements < 4 * PROCARRAY_MAXPROCS ||
! 			nelements < 2 * pArray->numKnownAssignedXids)
! 			return;
! 	}
  
! 	/*
! 	 * We compress the array by reading the valid values from tail
! 	 * to head, re-aligning array to 0th element. Note that we don't
! 	 * worry about zero-ing out the higher parts of the main or
! 	 * validity array since those will be overwritten when adding xids.
! 	 */
! 	compress_index = 0;
! 	for (i = tail; i < head; i++)
! 	{
! 		if (KnownAssignedXidsValid[i])
  		{
! 			if (i != compress_index)
! 			{
! 				KnownAssignedXids[compress_index] = KnownAssignedXids[i];
! 				KnownAssignedXidsValid[compress_index] = true;
! 			}
! 			compress_index++;
  		}
+ 	}
+ 	pArray->tailKnownAssignedXids = 0;
+ 	pArray->headKnownAssignedXids = compress_index;
+ 	KnownAssignedXidsValid[compress_index] = false;
+ }
+ 
+ /*
+  * Add xids into KnownAssignedXids at the head of the array.
+  *
+  * Called while holding ProcArrayLock in shared mode or exclusive mode.
+  * In shared mode, we may need to drop lock and reacquire in exclusive
+  * mode, if we need to do array compression, which happens regularly,
+  * when it becomes necessary. ProcArrayLock held at exit.
+  *
+  * Only caller at present is Startup process, though locking is
+  * sufficient to allow multiple callers, if necessary.
+  */
+ static void
+ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, bool exclusive_lock)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile ProcArrayStruct *pArray = procArray;
  
! 	TransactionId	next_xid;
! 	int			head, tail;
! 	int			nxids = 1;
! 	int			i;
  
! 	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
! 
! 	/*
! 	 * Calculate how many slots we need so we can test for overflow.
! 	 */
! 	if (to_xid >= from_xid)
! 		nxids = to_xid - from_xid + 1;
! 	else
! 	{
! 		next_xid = from_xid;
! 		while (TransactionIdPrecedes(next_xid, to_xid))
  		{
! 			nxids++;
! 			TransactionIdAdvance(next_xid);
  		}
+ 	}
+ 
+ 	/* spinlock is essential on machines with weak memory ordering! */
+ 	SpinLockAcquire(&pArray->known_assigned_xids_lck);
+ 	head = pArray->headKnownAssignedXids;
+ 	tail = pArray->tailKnownAssignedXids;
+ 	SpinLockRelease(&pArray->known_assigned_xids_lck);
  
! 	Assert(head >= 0 && head <= pArray->maxKnownAssignedXids);
! 	Assert(tail >= 0 && tail < pArray->maxKnownAssignedXids);
! 
! 	/*
! 	 * The array is sorted in TransactionId sequence. TransactionIds
! 	 * are modulo integers, so wrap when they get to INT_MAX. Our
! 	 * sense of ordering only works if the values of all array entries
! 	 * are within one half of the cycle. This rule is expressed by
! 	 * the assertion that the xid at the head follows the xid at the tail.
! 	 * TransactionId wrap-around is specifically prevented on the
! 	 * primary, so this can only fail if there is a flaw in the way
! 	 * KnownAssignedXids are managed on the standby.
! 	 */
! 	if (TransactionIdFollowsOrEquals(tail, to_xid))
! 	{
! 		KnownAssignedXidsDisplay(LOG);
! 		elog(ERROR, "tail follows head in KnownAssignedXids");
! 	}
! 
! 	/*
! 	 * If our xids won't fit easily then grab exclusive lock and compress
! 	 */
! 	if (head + nxids > pArray->maxKnownAssignedXids)
! 	{
! 		if (!exclusive_lock)
  		{
  			LWLockRelease(ProcArrayLock);
! 			LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
  		}
+ 
+ 		KnownAssignedXidsCompress(true);
+ 		head = pArray->headKnownAssignedXids;
+ 	}
+ 
+ 	/*
+ 	 * If we still won't fit then we're out of memory
+ 	 */
+ 	if (head + nxids > pArray->maxKnownAssignedXids)
+ 		elog(ERROR, "too many KnownAssignedXids");
+ 
+ 	next_xid = from_xid;
+ 	for (i = 0; i < nxids; i++)
+ 	{
+ 		/* Insert the xid at head, since it already
+ 		 * points at the next insertion point
+ 		 */
+ 		KnownAssignedXids[head] = next_xid;
+ 		KnownAssignedXidsValid[head] = true;
+ 
+ 		/* increment next_xid */
+ 		TransactionIdAdvance(next_xid);
+ 		head++;
+ 	}
+ 
+ 	/*
+ 	 * Update shared values
+ 	 */
+ 	{
+ 		/* spinlock is essential on machines with weak memory ordering! */
+ 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
+ 		pArray->numKnownAssignedXids += nxids;
+ 		pArray->headKnownAssignedXids = head;
+ 		SpinLockRelease(&pArray->known_assigned_xids_lck);
  	}
  }
  
  /*
!  * KnownAssignedXidsSearch
!  *
!  * Searches KnownAssignedXids for a specific xid and optionally removes it.
!  *
!  * Use binary search to locate value, then test validity. Callers expect the
!  * value to be present so we don't bother to test against head and tail.
   *
!  * Must be called while holding ProcArrayLock in shared or exclusive mode.
!  * Exclusive lock must be held for remove = true. If we remove the tail we
!  * reset the tail value to the next valid value, which may result in an
!  * empty array.
   */
  static bool
+ KnownAssignedXidsSearch(TransactionId xid, bool remove)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile ProcArrayStruct *pArray = procArray;
+ 
+ 	int	first, last;
+ 	int	head = 0;
+ 	int tail = 0;
+ 	int	result_index = RESULT_NOT_FOUND;
+ 	int 			mid_index;
+ 	TransactionId	mid_xid;
+ 
+ 	if (remove)
+ 	{
+ 		tail = first = pArray->tailKnownAssignedXids;
+ 		head = pArray->headKnownAssignedXids;
+ 		last = head - 1;
+ 	}
+ 	else
+ 	{
+ 		/* spinlock is essential on machines with weak memory ordering! */
+ 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
+ 		first = pArray->tailKnownAssignedXids;
+ 		last = pArray->headKnownAssignedXids - 1;
+ 		SpinLockRelease(&pArray->known_assigned_xids_lck);
+ 	}
+ 
+ 	while (first <= last)
+ 	{
+ 		mid_index = first + (last - first) / 2;
+ 		mid_xid = KnownAssignedXids[mid_index];
+ 
+ 		if (xid == mid_xid)
+ 		{
+ 			result_index = mid_index;
+ 			break;
+ 		}
+ 		else if (TransactionIdPrecedes(xid, mid_xid))
+ 			last = mid_index - 1;
+ 		else
+ 			first = mid_index + 1;
+ 	}
+ 
+ 	if (result_index == RESULT_NOT_FOUND)
+ 		return false;
+ 	else
+ 	{
+ 		if (KnownAssignedXidsValid[result_index])
+ 		{
+ 			if (remove)
+ 			{
+ 				KnownAssignedXidsValid[result_index] = false;
+ 
+ 				/*
+ 				 * If we're removing the tail element then sweep
+ 				 * up from the tail to the next valid element, if any
+ 				 */
+ 				if (result_index == tail)
+ 				{
+ 					while (!KnownAssignedXidsValid[tail])
+ 					{
+ 						tail++;
+ 						if (tail >= head)
+ 						{
+ 							pArray->headKnownAssignedXids = 0;
+ 							pArray->tailKnownAssignedXids = 0;
+ 							return true;
+ 
+ 						}
+ 					}
+ 					pArray->tailKnownAssignedXids = tail;
+ 				}
+ 			}
+ 
+ 			return true;
+ 		}
+ 		else
+ 			return false;
+ 	}
+ }
+ 
+ static bool
  KnownAssignedXidsExist(TransactionId xid)
  {
! 	Assert(TransactionIdIsValid(xid));
  
! 	return KnownAssignedXidsSearch(xid, false);
  }
  
  /*
***************
*** 2384,2414 **** KnownAssignedXidsExist(TransactionId xid)
  static void
  KnownAssignedXidsRemove(TransactionId xid)
  {
! 	bool		found;
  
  	Assert(TransactionIdIsValid(xid));
  
  	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
  
! 	(void) hash_search(KnownAssignedXidsHash, &xid, HASH_REMOVE, &found);
  
! 	if (found)
! 		procArray->numKnownAssignedXids--;
! 	Assert(procArray->numKnownAssignedXids >= 0);
  
  	/*
  	 * We can fail to find an xid if the xid came from a subtransaction that
  	 * aborts, though the xid hadn't yet been reported and no WAL records have
  	 * been written using the subxid. In that case the abort record will
  	 * contain that subxid and we haven't seen it before.
- 	 *
- 	 * If we fail to find it for other reasons it might be a problem, but it
- 	 * isn't much use to log that it happened, since we can't divine much from
- 	 * just an isolated xid value.
  	 */
  }
  
  /*
   * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
   * We filter out anything higher than xmax.
   *
--- 2637,2707 ----
  static void
  KnownAssignedXidsRemove(TransactionId xid)
  {
! 	/* use volatile pointer to prevent code rearrangement */
! 	volatile ProcArrayStruct *pArray = procArray;
  
  	Assert(TransactionIdIsValid(xid));
  
  	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
  
! 	if (KnownAssignedXidsSearch(xid, true))
! 		pArray->numKnownAssignedXids--;
! 	else
! 	{
! 		int 	i;
! 		bool	found = false;
! 
! #ifdef USE_ASSERT_CHECKING
! 		for (i = 0; i < pArray->maxKnownAssignedXids; i++)
! 		{
! 			if (KnownAssignedXids[i] == xid && KnownAssignedXidsValid[i])
! 			{
! 				KnownAssignedXidsDisplay(LOG);
! 				elog(ERROR, "found error in search for xid=%u", xid);
! 				found = true;
! 				break;
! 			}
! 		}
! #endif
! 		if (!found && XLogConsistentState())
! 				elog(ERROR, "remove could not locate xid=%u", xid);
! 	}
  
! 	if (pArray->numKnownAssignedXids < 0)
! 	{
! 		KnownAssignedXidsDisplay(LOG);
! 		Assert(pArray->numKnownAssignedXids >= 0);
! 	}
  
  	/*
  	 * We can fail to find an xid if the xid came from a subtransaction that
  	 * aborts, though the xid hadn't yet been reported and no WAL records have
  	 * been written using the subxid. In that case the abort record will
  	 * contain that subxid and we haven't seen it before.
  	 */
  }
  
  /*
+  * KnownAssignedXidsRemoveTree
+  *
+  * Must be called while holding ProcArrayLock in exclusive mode.
+  */
+ static void
+ KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
+ 									  TransactionId *subxids)
+ {
+ 	int 	i;
+ 
+ 	if (TransactionIdIsValid(xid))
+ 		KnownAssignedXidsRemove(xid);
+ 
+ 	for (i = 0; i < nsubxids; i++)
+ 		KnownAssignedXidsRemove(subxids[i]);
+ 
+ 	KnownAssignedXidsCompress(false);
+ }
+ 
+ /*
   * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
   * We filter out anything higher than xmax.
   *
***************
*** 2426,2457 **** KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
   * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
   * to the lowest xid value seen if not already lower.
   *
!  * Must be called while holding ProcArrayLock (in shared mode)
   */
  static int
  KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
  							   TransactionId xmax)
  {
! 	HASH_SEQ_STATUS status;
! 	TransactionId *knownXid;
  	int			count = 0;
  
! 	hash_seq_init(&status, KnownAssignedXidsHash);
! 	while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
  	{
! 		/*
! 		 * Filter out anything higher than xmax
! 		 */
! 		if (TransactionIdPrecedes(xmax, *knownXid))
! 			continue;
  
! 		*xarray = *knownXid;
! 		xarray++;
! 		count++;
  
! 		/* update xmin if required */
! 		if (TransactionIdPrecedes(*knownXid, *xmin))
! 			*xmin = *knownXid;
  	}
  
  	return count;
--- 2719,2779 ----
   * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
   * to the lowest xid value seen if not already lower.
   *
!  * Must be called while holding ProcArrayLock, in shared mode or higher.
   */
  static int
  KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
  							   TransactionId xmax)
  {
! 	/* use volatile pointer to prevent code rearrangement */
! 	volatile ProcArrayStruct *pArray = procArray;
! 
! 	TransactionId knownXid;
  	int			count = 0;
+ 	int			head, tail;
+ 	int			i;
+ 
+ 	Assert(TransactionIdIsValid(xmax));
  
! 	/*
! 	 * Fetch head just once, since it may change while we loop.
! 	 * We need only go as far as head at the start of the loop, since
! 	 * we are certain that an xid cannot enter and then leave the
! 	 * array without us knowing, so if we skip a few higher xids they
! 	 * will look like they were running anyway.
! 	 */
  	{
! 		/* spinlock is essential on machines with weak memory ordering! */
! 		SpinLockAcquire(&pArray->known_assigned_xids_lck);
! 		head = pArray->tailKnownAssignedXids;
! 		tail = pArray->headKnownAssignedXids;
! 		SpinLockRelease(&pArray->known_assigned_xids_lck);
! 	}
  
! 	for (i = tail; i < head; i++)
! 	{
! 		/* Skip any gaps in the array */
! 		if (KnownAssignedXidsValid[i])
! 		{
! 			knownXid = KnownAssignedXids[i];
! 
! 			/* Update xmin if required */
! 			if (count == 0 &&
! 				TransactionIdPrecedes(knownXid, *xmin))
! 				*xmin = knownXid;
  
! 			/*
! 			 * Filter out anything higher than xmax, relying on
! 			 * sorted property of array.
! 			 */
! 			if (TransactionIdPrecedes(xmax, knownXid))
! 				break;
! 
! 			/* Add knownXid onto output array */
! 			*xarray = knownXid;
! 			xarray++;
! 			count++;
! 		}
  	}
  
  	return count;
***************
*** 2464,2499 **** KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
   * Must be called while holding ProcArrayLock in Exclusive mode.
   */
  static void
! KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts)
  {
! 	TransactionId *knownXid;
! 	HASH_SEQ_STATUS status;
  
! 	if (TransactionIdIsValid(xid))
! 		elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", xid);
! 	else
! 		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
  
! 	hash_seq_init(&status, KnownAssignedXidsHash);
! 	while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
  	{
! 		TransactionId removeXid = *knownXid;
! 		bool		found;
  
! 		if (!TransactionIdIsValid(xid) || TransactionIdPrecedes(removeXid, xid))
  		{
! 			if (keepPreparedXacts && StandbyTransactionIdIsPrepared(removeXid))
! 				continue;
! 			else
  			{
! 				(void) hash_search(KnownAssignedXidsHash, &removeXid,
! 								   HASH_REMOVE, &found);
! 				if (found)
! 					procArray->numKnownAssignedXids--;
! 				Assert(procArray->numKnownAssignedXids >= 0);
  			}
  		}
  	}
  }
  
  /*
--- 2786,2835 ----
   * Must be called while holding ProcArrayLock in Exclusive mode.
   */
  static void
! KnownAssignedXidsRemoveMany(TransactionId removeXid)
  {
! 	/* use volatile pointer to prevent code rearrangement */
! 	volatile ProcArrayStruct *pArray = procArray;
  
! 	TransactionId	knownXid;
! 	int				count = 0;
! 	int				head, tail, i;
  
! 	if (!TransactionIdIsValid(removeXid))
  	{
! 		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
! 		pArray->numKnownAssignedXids = 0;
! 		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
! 		return;
! 	}
! 
! 	Assert(TransactionIdIsValid(removeXid));
! 	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
  
! 	tail = pArray->tailKnownAssignedXids;
! 	head = pArray->headKnownAssignedXids;
! 
! 	for (i = tail; i < head; i++)
! 	{
! 		if (KnownAssignedXidsValid[i])
  		{
! 			knownXid = KnownAssignedXids[i];
! 
! 			if (TransactionIdFollowsOrEquals(knownXid, removeXid))
! 				break;
! 
! 			if (!StandbyTransactionIdIsPrepared(knownXid))
  			{
! 				KnownAssignedXidsValid[i] = false;
! 				count++;
  			}
  		}
  	}
+ 
+ 	pArray->numKnownAssignedXids -= count;
+ 	Assert(pArray->numKnownAssignedXids >= 0);
+ 
+ 	KnownAssignedXidsCompress(false);
  }
  
  /*
***************
*** 2504,2531 **** KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts)
  static void
  KnownAssignedXidsDisplay(int trace_level)
  {
! 	HASH_SEQ_STATUS status;
! 	TransactionId *knownXid;
! 	StringInfoData buf;
! 	TransactionId *xids;
! 	int			nxids;
! 	int			i;
  
! 	xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);
! 	nxids = 0;
  
! 	hash_seq_init(&status, KnownAssignedXidsHash);
! 	while ((knownXid = (TransactionId *) hash_seq_search(&status)) != NULL)
! 		xids[nxids++] = *knownXid;
! 
! 	qsort(xids, nxids, sizeof(TransactionId), xidComparator);
  
  	initStringInfo(&buf);
  
! 	for (i = 0; i < nxids; i++)
! 		appendStringInfo(&buf, "%u ", xids[i]);
  
! 	elog(trace_level, "%d KnownAssignedXids %s", nxids, buf.data);
  
  	pfree(buf.data);
  }
--- 2840,2872 ----
  static void
  KnownAssignedXidsDisplay(int trace_level)
  {
! 	/* use volatile pointer to prevent code rearrangement */
! 	volatile ProcArrayStruct *pArray = procArray;
  
! 	StringInfoData	buf;
! 	int				head, tail, i;
! 	int				nxids = 0;
  
! 	tail = pArray->tailKnownAssignedXids;
! 	head = pArray->headKnownAssignedXids;
  
  	initStringInfo(&buf);
  
! 	for (i = tail; i < head; i++)
! 	{
! 		if (KnownAssignedXidsValid[i])
! 		{
! 			nxids++;
! 			appendStringInfo(&buf, "[%u]=%u ", i, KnownAssignedXids[i]);
! 		}
! 	}
  
! 	elog(trace_level, "%d KnownAssignedXids (num=%u tail=%u head=%u) %s",
! 		nxids,
! 		pArray->numKnownAssignedXids,
! 		pArray->tailKnownAssignedXids,
! 		pArray->headKnownAssignedXids,
! 		buf.data);
  
  	pfree(buf.data);
  }
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 278,283 **** extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
--- 278,284 ----
  
  extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
  
+ extern bool XLogConsistentState(void);
  extern bool RecoveryInProgress(void);
  extern bool XLogInsertAllowed(void);
  extern TimestampTz GetLatestXLogTime(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to