On Fri, 2010-04-16 at 11:29 +0300, Heikki Linnakangas wrote:
> Simon Riggs wrote:
> > On Tue, 2010-04-13 at 21:09 +0300, Heikki Linnakangas wrote:
> >> A quick fix would be to check if there's any entries in the hash table
> >> before scanning it. That would eliminate the overhead when there's no
> >> in-progress transactions in the master. But as soon as there's even one,
> >> the overhead comes back.
> >
> > Any fix should be fairly quick because of the way its modularised - with
> > something like this in mind.
> >
> > I'll try a circular buffer implementation, with fastpath.
>
> I started experimenting with a sorted array based implementation on
> Tuesday but got carried away with other stuff. I now got back to that
> and cleaned it up.
>
> How does the attached patch look like? It's probably similar to what you
> had in mind.
It looks like a second version of what I'm working on and about to
publish. I'll take that as a compliment!
My patch is attached here also, for discussion.
The two patches look same in their main parts, though I have quite a few
extra tweaks in there, which you can read about in comments. One tweak I
don't have is the use of the presence array that allows a sensible
bsearch, so I'll to alter my patch to use that idea but keep the rest of
my code.
--
Simon Riggs www.2ndQuadrant.com
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 3cc9bdd..d432c9d 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1200,6 +1200,9 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
Assert(TransactionIdIsValid(xid));
+ if (max_prepared_xacts <= 0)
+ return false; /* nothing to do */
+
/* Read and validate file */
buf = ReadTwoPhaseFile(xid, false);
if (buf == NULL)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3adbee2..9245727 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6453,6 +6453,12 @@ CheckRecoveryConsistency(void)
}
}
+bool
+XLogConsistentState(void)
+{
+ return reachedMinRecoveryPoint;
+}
+
/*
* Is the system still in recovery?
*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 88169b4..050c547 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -68,6 +68,10 @@ typedef struct ProcArrayStruct
* xids */
int maxKnownAssignedXids; /* allocated size of known assigned
* xids */
+ int tailKnownAssignedXids; /* current tail of known assigned
+ * xids */
+ int headKnownAssignedXids; /* current head of known assigned
+ * xids */
/*
* Highest subxid that overflowed KnownAssignedXids array. Similar to
@@ -87,7 +91,7 @@ static ProcArrayStruct *procArray;
/*
* Bookkeeping for tracking emulated transactions in recovery
*/
-static HTAB *KnownAssignedXidsHash;
+static TransactionId *KnownAssignedXids;
static TransactionId latestObservedXid = InvalidTransactionId;
/*
@@ -142,9 +146,12 @@ static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
TransactionId xmax);
static bool KnownAssignedXidsExist(TransactionId xid);
-static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
-static void KnownAssignedXidsRemove(TransactionId xid);
-static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts);
+static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, bool have_lock);
+static void KnownAssignedXidsRemove(TransactionId xid, int *index);
+static void KnownAssignedXidsRemoveMany(TransactionId xid);
+static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
+ TransactionId *subxids);
+static void KnownAssignedXidsCompress(bool full);
static void KnownAssignedXidsDisplay(int trace_level);
/*
@@ -204,10 +211,13 @@ CreateSharedProcArray(void)
procArray->numKnownAssignedXids = 0;
procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
procArray->lastOverflowedXid = InvalidTransactionId;
+ procArray->tailKnownAssignedXids = 0;
+ procArray->headKnownAssignedXids = 0;
}
if (XLogRequestRecoveryConnections)
{
+#ifdef KNOWNASSIGNED_USE_HASH
/* Create or attach to the KnownAssignedXids hash table */
HASHCTL info;
@@ -223,6 +233,12 @@ CreateSharedProcArray(void)
HASH_ELEM | HASH_FUNCTION);
if (!KnownAssignedXidsHash)
elog(FATAL, "could not initialize known assigned xids hash table");
+#else
+ KnownAssignedXids = (TransactionId *)
+ ShmemInitStruct("KnownAssignedXids",
+ mul_size(sizeof(TransactionId), TOTAL_MAX_CACHED_SUBXIDS),
+ &found);
+#endif
}
}
@@ -417,6 +433,8 @@ void
ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
{
snapshotOldestActiveXid = oldestActiveXid;
+
+// KnownAssignedXidsTest();
}
/*
@@ -544,10 +562,10 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
continue;
- KnownAssignedXidsAdd(&xid, 1);
+ KnownAssignedXidsAdd(xid, xid, true);
}
- KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+ KnownAssignedXidsDisplay(LOG);
/*
* Update lastOverflowedXid if the snapshot had overflown. We don't know
@@ -617,8 +635,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
/*
* Remove from known-assigned-xacts.
*/
- for (i = 0; i < nsubxids; i++)
- KnownAssignedXidsRemove(subxids[i]);
+ KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
/*
* Advance lastOverflowedXid when required.
@@ -2230,45 +2247,35 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
*/
if (TransactionIdFollows(xid, latestObservedXid))
{
- TransactionId next_expected_xid = latestObservedXid;
-
- TransactionIdAdvance(next_expected_xid);
+ TransactionId next_expected_xid;
/*
- * Locking requirement is currently higher than for xid assignment in
- * normal running. However, we only get called here for new high xids
- * - so on a multi-processor where it is common that xids arrive out
- * of order the average number of locks per assignment will actually
- * reduce. So not too worried about this locking.
- *
- * XXX It does seem possible that we could add a whole range of
- * numbers atomically to KnownAssignedXids, if we use a sorted list
- * for KnownAssignedXids. But that design also increases the length of
- * time we hold lock when we process commits/aborts, so on balance
- * don't worry about this.
+ * Extend clog and subtrans like we do in GetNewTransactionId()
+ * during normal operation using individual extend steps.
+ * Typical case requires almost no activity.
*/
- LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-
+ next_expected_xid = latestObservedXid;
+ TransactionIdAdvance(next_expected_xid);
while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
{
- if (TransactionIdPrecedes(next_expected_xid, xid))
- elog(trace_recovery(DEBUG4),
- "recording unobserved xid %u (latestObservedXid %u)",
- next_expected_xid, latestObservedXid);
- KnownAssignedXidsAdd(&next_expected_xid, 1);
-
- /*
- * Extend clog and subtrans like we do in GetNewTransactionId()
- * during normal operation
- */
ExtendCLOG(next_expected_xid);
ExtendSUBTRANS(next_expected_xid);
TransactionIdAdvance(next_expected_xid);
}
- LWLockRelease(ProcArrayLock);
+ /*
+ * Add the new xids onto the KnownAssignedXids array. Note that
+ * we don't need ProcArrayLock to do this, mimicing the locking
+ * behaviour when assigning an xid in normal running.
+ */
+ next_expected_xid = latestObservedXid;
+ TransactionIdAdvance(next_expected_xid);
+ KnownAssignedXidsAdd(next_expected_xid, xid, false);
+ /*
+ * Now we can advance latestObservedXid
+ */
latestObservedXid = xid;
}
@@ -2285,7 +2292,6 @@ void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
TransactionId *subxids)
{
- int i;
TransactionId max_xid;
if (standbyState == STANDBY_DISABLED)
@@ -2298,10 +2304,8 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- if (TransactionIdIsValid(xid))
- KnownAssignedXidsRemove(xid);
- for (i = 0; i < nsubxids; i++)
- KnownAssignedXidsRemove(subxids[i]);
+ KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
+
/* Like in ProcArrayRemove, advance latestCompletedXid */
if (TransactionIdFollowsOrEquals(max_xid,
@@ -2315,7 +2319,7 @@ void
ExpireAllKnownAssignedTransactionIds(void)
{
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- KnownAssignedXidsRemoveMany(InvalidTransactionId, false);
+ KnownAssignedXidsRemoveMany(InvalidTransactionId);
LWLockRelease(ProcArrayLock);
}
@@ -2323,7 +2327,7 @@ void
ExpireOldKnownAssignedTransactionIds(TransactionId xid)
{
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- KnownAssignedXidsRemoveMany(xid, true);
+ KnownAssignedXidsRemoveMany(xid);
LWLockRelease(ProcArrayLock);
}
@@ -2349,7 +2353,7 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
* having the standby process changes quickly so that it can provide
* high availability. So we choose to implement as a hash table.
*/
-
+#ifdef KNOWNASSIGNED_USE_HASH
/*
* Add xids into KnownAssignedXids.
*
@@ -2563,3 +2567,772 @@ KnownAssignedXidsDisplay(int trace_level)
pfree(buf.data);
}
+#else
+/*
+ * Circular buffer implementation =========================================
+ *
+ * We use a circular virtual array to store xids. Xids are always added in
+ * sequence on the right-hand side (head) of the array allowing us to
+ * maintain the list in sorted sequence from tail to head. When we add xids
+ * we move the head of the array atomically, so we don't need ProcArrayLock.
+ *
+ * Maintaing a contiguous array is costly, so we allow gaps to form in the
+ * sequence so we can avoid compressing the contents after each removal.
+ *
+ * Removing xids from the array requires ExclusiveLock for the usual
+ * transactional reasons. That allows us to left trim the list while only
+ * holding ProcArrayLock in Shared mode by atomically updating the tail.
+ * That can be useful since older xids are naturally removed from the tail
+ * and in a steady state we seldom need to explicitly compress the array.
+ * With carefully chosen compression we can also say that xids never move
+ * right and that compression always maintains the sorted order.
+ *
+ * If we have a maximum of M slots, with N xids currently spread across
+ * S elements then we have N <= S <= M always.
+ *
+ * * Adding 1 xid is O(1) and is lock free
+ * * Taking snapshots is O(S) and is similar to normal running
+ * * Pruning is mostly O(1)
+ * * Locating an xid is essentially same as Removing
+ * * Removing 1 xid is best case O(1) though can be O(S) in worst case.
+ * Worst case behaviour depends upon how empty we let the array become
+ * so compressing when S >> N and/or S is large reduces worst case.
+ * May need a tunable parameter to control this behaviour.
+ *
+ * In comparison, using a hash table for KnownAssignedXids would mean
+ * * Adding would be slightly slower and would require locks
+ * * Removing would be slightly slower than best case but more consistent
+ * * Taking snapshots would be O(M) (i.e. >> O(S))
+ * * Pruning is O(M)
+ *
+ * Removing a large tree of X values a naive approach would give O(N^2)
+ * behaviour. To avoid those costs we retain index of last searched location,
+ * so we can begin next scan from last location. That is not possible using
+ * hash table.
+ *
+ * Explicit compression becomes a more likely requirement if the current
+ * contents of the array grows in size.
+ * Compression requires ProcArrayLock in Exclusive mode.
+ *
+ * 0 <= head < max
+ * 0 <= tail < max
+ *
+ * procArray->tailKnownAssignedXids points to current tail of virtual array,
+ * which should be a valid xid if array contains at least one valid xid
+ *
+ * procArray->headKnownAssignedXids points to next insertion point of
+ * virtual array, so is always invalid
+ *
+ * if head == tail && N = 0 then array is empty, otherwise fully spread out
+ */
+
+#define KnownAssignedXidsIncrementIndex(index) \
+{ \
+ index++; \
+ if (index >= procArray->maxKnownAssignedXids) \
+ index = 0; \
+}
+
+#define KnownAssignedXidsGetNumElements(nelements, head, tail) \
+{ \
+ if (head == tail) \
+ { \
+ if (procArray->numKnownAssignedXids == 0) \
+ nelements = 0; \
+ else \
+ nelements = procArray->maxKnownAssignedXids; \
+ } \
+ else if (head > tail) \
+ nelements = head - tail + 1; \
+ else \
+ nelements = procArray->maxKnownAssignedXids - (tail - head) + 1; \
+}
+
+/*
+ * Compress KnownAssignedXids by removing any gaps in virtual array.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode.
+ */
+static void
+KnownAssignedXidsCompress(bool full)
+{
+ int head = procArray->headKnownAssignedXids;
+ int tail = procArray->tailKnownAssignedXids;
+ int current_index;
+ int compress_index;
+ int nelements;
+
+ if (procArray->numKnownAssignedXids == 0)
+ {
+ Assert(head = tail);
+ return;
+ }
+
+ KnownAssignedXidsGetNumElements(nelements, head, tail);
+
+ if (!full)
+ {
+ /*
+ * If we can choose how much to compress, use a heuristic to
+ * avoid compressing too often or not often enough.
+ *
+ * Heuristic is if we have a large enough current spread and
+ * less than 50% of the elements are currently in use, then
+ * compress. This should ensure we compress fairly infrequently.
+ * We could compress less often though the virtual array would
+ * spread out more and snapshots would become more expensive.
+ */
+ if (nelements < 4 * MaxConnections ||
+ nelements < 2 * procArray->numKnownAssignedXids)
+ return;
+ }
+
+ elog(LOG, "Compressing KnownAssignedXids... (num=%u tail=%u head=%u nelements=%u) %s",
+ procArray->numKnownAssignedXids,
+ procArray->tailKnownAssignedXids,
+ procArray->headKnownAssignedXids,
+ nelements,
+ (full ? "full" : ""));
+
+ /*
+ * Left trim the virtual array up to the leftmost valid xid.
+ */
+ current_index = tail;
+ do
+ {
+ /* Skip any gaps in the array */
+ if (TransactionIdIsValid(KnownAssignedXids[current_index]))
+ break;
+
+ KnownAssignedXidsIncrementIndex(current_index);
+
+ } while (current_index != head);
+
+ if (current_index == head)
+ procArray->headKnownAssignedXids = procArray->tailKnownAssignedXids;
+ else
+ {
+ tail = current_index;
+
+ /*
+ * We compress the virtual array by reading the array from tail
+ * to head, filling gaps as we proceed. Rely on implicit cache line
+ * buffering rather than fiddling with memcpy(). This method allows
+ * us to optimise search by realising that xids never move right
+ * in the virtual array when being compressed.
+ */
+ compress_index = tail;
+ do
+ {
+ if (TransactionIdIsValid(KnownAssignedXids[current_index]))
+ {
+ if (current_index != compress_index)
+ KnownAssignedXids[compress_index] = KnownAssignedXids[current_index];
+
+ KnownAssignedXidsIncrementIndex(compress_index);
+ KnownAssignedXidsIncrementIndex(current_index);
+ }
+ else
+ KnownAssignedXidsIncrementIndex(current_index);
+
+ } while (current_index != head);
+
+ procArray->headKnownAssignedXids = compress_index;
+ }
+}
+
+/*
+ * Add xids into KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, bool have_lock)
+{
+ TransactionId next_xid;
+ int head = procArray->headKnownAssignedXids;
+ int tail = procArray->tailKnownAssignedXids;
+ int nxids = 1;
+ int i;
+
+ Assert(head >= 0 && head < procArray->maxKnownAssignedXids);
+ Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids);
+ Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
+
+ /*
+ * Calculate how many slots we need so we can test for overflow.
+ */
+ if (to_xid >= from_xid)
+ nxids = to_xid - from_xid + 1;
+ else
+ {
+ next_xid = from_xid;
+ while (TransactionIdPrecedes(next_xid, to_xid))
+ {
+ nxids++;
+ TransactionIdAdvance(next_xid);
+ }
+ }
+ if (procArray->numKnownAssignedXids + nxids > procArray->maxKnownAssignedXids)
+ elog(ERROR, "too many KnownAssignedXids");
+
+ next_xid = from_xid;
+ for (i = 0; i < nxids; i++)
+ {
+ /*
+ * If we've run out of space in the array then we
+ * need to perform an emergency compression. Can
+ * only test for this once we've advanced head.
+ */
+ if (head == tail && procArray->numKnownAssignedXids > 0)
+ {
+ if (!have_lock)
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ KnownAssignedXidsCompress(true);
+ head = procArray->headKnownAssignedXids;
+ tail = procArray->tailKnownAssignedXids;
+ if (head++ >= procArray->maxKnownAssignedXids)
+ head = 0;
+ if (!have_lock)
+ LWLockRelease(ProcArrayLock);
+ }
+
+ /* insert the xid */
+ KnownAssignedXids[head] = next_xid;
+ procArray->numKnownAssignedXids++;
+
+ /* increment next_xid */
+ TransactionIdAdvance(next_xid);
+
+ /* move head to next insertion point */
+ KnownAssignedXidsIncrementIndex(head);
+ }
+
+ /*
+ * Set the new head of the ProcArray last, so it is atomic
+ */
+ procArray->headKnownAssignedXids = head;
+}
+
+/*
+ * KnownAssignedXidsSearch
+ *
+ * Searches KnownAssignedXids for a specific xid and optionally removes it.
+ *
+ * Must be called while holding ProcArrayLock in shared or exclusive mode.
+ * Exclusive lock must be held for remove = true
+ *
+ * Tail can be moved as far forward as first valid xid in shared mode.
+ * While holding shared lock we cannot remove xids, so no other caller
+ * would move tail to a different place than we do. We don't try to trim
+ * the element we are removing.
+ *
+ * Search algorithm is specifically designed to meet the needs of most
+ * frequent search patterns. Xids tend to be removed from lower end of
+ * virtual array, so we begin with a linear search from tail until we find
+ * the first valid xid. If xid matches, or array is empty we exit quickly.
+ * Next, we attempt to predict the array offset that should contain
+ * the xid, if it is present, knowing that the array is laid out one xid
+ * per array element. If this works we access the xid directly and
+ * we're done. If not, we search between left and right index.
+ */
+#define RESULT_NOT_FOUND -1
+static bool
+KnownAssignedXidsSearch(TransactionId xid, bool remove, int *index)
+{
+ TransactionId knownXid;
+ int head, i, tail, newtail;
+ int result_index = RESULT_NOT_FOUND; /* set >= 0 if found */
+
+ if (procArray->numKnownAssignedXids == 0)
+ return false;
+
+ /*
+ * Fetch the tail just once, since it may change while we loop,
+ * when remove is false.
+ */
+ tail = procArray->tailKnownAssignedXids;
+
+ /*
+ * Avoid O(N^2) behaviour for removal of a tree of xids by restarting
+ * search from last location for each xid in tree.
+ */
+ if (*index > RESULT_NOT_FOUND)
+ i = *index;
+ else
+ i = tail;
+
+ newtail = tail;
+
+ /*
+ * Fetch the head just once, since it may change while we loop.
+ * We need only go as far as head at the start of the loop, since
+ * we are certain that an xid cannot enter and then leave the
+ * array without us knowing, so if we skip a few higher xids they
+ * will look like they were running anyway.
+ */
+ head = procArray->headKnownAssignedXids;
+
+ Assert(head >= 0 && head < procArray->maxKnownAssignedXids);
+ Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids);
+
+ /*
+ * Locate the first valid xid in the array
+ */
+ do
+ {
+ knownXid = KnownAssignedXids[i];
+
+ /* Skip any gaps in the array */
+ if (TransactionIdIsValid(knownXid))
+ break;
+
+ newtail = i;
+
+ KnownAssignedXidsIncrementIndex(i);
+
+ } while (i != head);
+
+ /*
+ * If the array is non-empty, search for the xid
+ */
+ if (i != head)
+ {
+ TransactionId left_xid = knownXid;
+ int left_index = i;
+
+ /*
+ * Check to see whether xid matches, or desired xid
+ * is somewhere to the right.
+ */
+ if (left_xid == xid)
+ result_index = left_index;
+ else if (TransactionIdFollowsOrEquals(xid, left_xid))
+ {
+ int xid_gap;
+ int nelements;
+
+ /*
+ * xid is somewhere to right. We know that xids
+ * are laid out one by one across the array, so
+ * we can predict where the xid should be iff
+ * the array has not been compressed.
+ */
+
+ KnownAssignedXidsGetNumElements(nelements, head, left_index);
+
+ /*
+ * We already saw first element, so if there is more than one
+ * element we can continue searching to the right.
+ */
+ if (nelements > 1)
+ {
+ TransactionId current_xid;
+ int right_index;
+
+ /*
+ * Calculate the difference between the xid in the current
+ * array element and the xid we are searching for. This will
+ * give us the offset to the element that may contain xid.
+ */
+ if (xid > left_xid)
+ xid_gap = (int) xid - left_xid;
+ else
+ xid_gap = (int) (INT_MAX - left_xid) + xid - FirstNormalTransactionId;
+
+ /*
+ * If the xid_gap is too wide then we know the array has been
+ * compressed, so no point in predicting index. Just use
+ * head as our right_index in that case.
+ */
+ if (xid_gap <= nelements)
+ {
+ /*
+ * Access the xid directly, if possible.
+ */
+ right_index = left_index + xid_gap;
+ if (right_index >= procArray->maxKnownAssignedXids)
+ right_index -= procArray->maxKnownAssignedXids;
+ current_xid = KnownAssignedXids[right_index];
+
+ if (current_xid == xid)
+ result_index = right_index;
+ else
+ {
+ if (TransactionIdIsValid(current_xid) &&
+ TransactionIdPrecedes(current_xid, xid))
+ elog(ERROR, "KnownAssignedXids (num=%u tail=%u head=%u) left [%u]=%u right [%u]=%u search %u",
+ procArray->numKnownAssignedXids,
+ procArray->tailKnownAssignedXids,
+ procArray->headKnownAssignedXids,
+ left_index, left_xid,
+ right_index, current_xid,
+ xid);
+ }
+ }
+ else
+ right_index = head;
+
+ /*
+ * If we've not yet found our xid, then either the xid
+ * has been removed already or it lies somewhere
+ * between left_index and right_index in the virtual array.
+ * Latter state can only happen as a result of a compress
+ * action, so search space should be small enough to
+ * make linear search acceptable.
+ *
+ * We could use a binary search to locate the xid though
+ * because of the gaps we cannot use bsearch(). It is also
+ * a quirky algorithm and too complex for practicality.
+ *
+ * We use two properties to improve the algorithm: xids never
+ * move right when compression occurs and the array is sorted
+ * even after compression. We search left (i.e. backwards)
+ * from right_index until we hit left_index, or we find
+ * a valid xid that precedes our xid.
+ */
+ if (result_index == RESULT_NOT_FOUND)
+ {
+ i = right_index;
+ while (i != left_index)
+ {
+ i--;
+ if (i < 0)
+ i = procArray->maxKnownAssignedXids - 1;
+
+ current_xid = KnownAssignedXids[i];
+
+ if (current_xid == xid)
+ {
+ result_index = i;
+ break;
+ }
+ else if (TransactionIdPrecedes(current_xid, xid))
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * If we can simply prune the list, then do so.
+ */
+ if (*index == RESULT_NOT_FOUND && newtail != tail)
+ {
+ /*
+ * Move the tail of the buffer to the leftmost valid xid.
+ * This action happens atomically, so no problem with locking.
+ * It is possible that someone else has already done this, but
+ * it's not possible that they moved it to a different place than
+ * we will do because xids can only be removed while holding
+ * ExclusiveLock, which would conflict with us.
+ */
+ procArray->tailKnownAssignedXids = newtail;
+ }
+
+ /*
+ * Remove xid, if required and return result
+ */
+ if (result_index == RESULT_NOT_FOUND)
+ return false;
+ else
+ {
+ *index = result_index;
+ if (remove)
+ KnownAssignedXids[result_index] = InvalidTransactionId;
+ return true;
+ }
+}
+
+static bool
+KnownAssignedXidsExist(TransactionId xid)
+{
+ int index;
+
+ Assert(TransactionIdIsValid(xid));
+
+ return KnownAssignedXidsSearch(xid, false, &index);
+}
+
+/*
+ * Remove one xid from anywhere in KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsRemove(TransactionId xid, int *index)
+{
+ Assert(TransactionIdIsValid(xid));
+
+ elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
+
+ if (KnownAssignedXidsSearch(xid, true, index))
+ procArray->numKnownAssignedXids--;
+ else
+ {
+ int i;
+ bool found = false;
+
+ if (XLogConsistentState())
+ {
+ for (i = 0; i < procArray->maxKnownAssignedXids; i++)
+ {
+ if (KnownAssignedXids[i] == xid)
+ {
+ KnownAssignedXids[i] = InvalidTransactionId;
+ elog(LOG, "found error in search for xid=%u", xid);
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ elog(LOG, "failed to remove xid=%u", xid);
+ KnownAssignedXidsDisplay(LOG);
+ }
+ }
+ }
+
+ Assert(procArray->numKnownAssignedXids >= 0);
+
+ /*
+ * We can fail to find an xid if the xid came from a subtransaction that
+ * aborts, though the xid hadn't yet been reported and no WAL records have
+ * been written using the subxid. In that case the abort record will
+ * contain that subxid and we haven't seen it before.
+ *
+ * If we fail to find it for other reasons it might be a problem, but it
+ * isn't much use to log that it happened, since we can't divine much from
+ * just an isolated xid value.
+ */
+}
+
+/*
+ * KnownAssignedXidsRemoveTree
+ *
+ * Must be called while holding ProcArrayLock in exclusive mode.
+ */
+static void
+KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
+ TransactionId *subxids)
+{
+ int i;
+ int index = RESULT_NOT_FOUND;
+
+ if (TransactionIdIsValid(xid))
+ KnownAssignedXidsRemove(xid, &index);
+
+ for (i = 0; i < nsubxids; i++)
+ KnownAssignedXidsRemove(subxids[i], &index);
+
+ KnownAssignedXidsCompress(false);
+}
+
+/*
+ * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
+ * We filter out anything higher than xmax.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
+{
+ TransactionId xtmp = InvalidTransactionId;
+
+ return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
+}
+
+/*
+ * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
+ * to the lowest xid value seen if not already lower.
+ *
+ * Must be called while holding ProcArrayLock, in shared mode or higher.
+ */
+static int
+KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
+ TransactionId xmax)
+{
+ TransactionId knownXid;
+ int count = 0;
+ bool can_move_tail = true;
+ int head, i, tail, newtail;
+
+ Assert(TransactionIdIsValid(xmax));
+
+ if (procArray->numKnownAssignedXids == 0)
+ return 0;
+
+ /*
+ * Fetch the tail just once, since it may change while we loop.
+ */
+ tail = procArray->tailKnownAssignedXids;
+ i = tail;
+ newtail = tail;
+
+ /*
+ * Fetch the head just once, since it may change while we loop.
+ * We need only go as far as head at the start of the loop, since
+ * we are certain that an xid cannot enter and then leave the
+ * array without us knowing, so if we skip a few higher xids they
+ * will look like they were running anyway.
+ */
+ head = procArray->headKnownAssignedXids;
+
+ Assert(head >= 0 && head < procArray->maxKnownAssignedXids);
+ Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids);
+
+ do
+ {
+ knownXid = KnownAssignedXids[i];
+
+ /* Skip any gaps in the array */
+ if (TransactionIdIsValid(knownXid))
+ {
+ /* Update xmin if required */
+ if (count == 0 &&
+ TransactionIdPrecedes(knownXid, *xmin))
+ *xmin = knownXid;
+
+ /*
+ * Filter out anything higher than xmax, relying on
+ * sorted property of array.
+ */
+ if (TransactionIdPrecedes(xmax, knownXid))
+ break;
+
+ /* Add knownXid onto output array */
+ *xarray = knownXid;
+ xarray++;
+ count++;
+ can_move_tail = false;
+ }
+ else if (can_move_tail)
+ newtail = i;
+
+ KnownAssignedXidsIncrementIndex(i);
+
+ } while (i != head);
+
+ /*
+ * If we can simply prune the list, then do so.
+ */
+ if (newtail != tail)
+ {
+ /*
+ * Move the tail of the buffer to the leftmost valid xid.
+ * This action happens atomically, so no problem with locking.
+ * It is possible that someone else has already done this, but
+ * it's not possible that they moved it to a different place than
+ * we will do because xids can only be removed while holding
+ * ExclusiveLock, which would conflict with us.
+ */
+ procArray->tailKnownAssignedXids = newtail;
+ }
+
+ return count;
+}
+
+/*
+ * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
+ * then clear the whole table.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode.
+ */
+static void
+KnownAssignedXidsRemoveMany(TransactionId removeXid)
+{
+ TransactionId knownXid;
+ int count = 0;
+ int head, i;
+
+ if (procArray->numKnownAssignedXids == 0)
+ return;
+
+ if (!TransactionIdIsValid(removeXid))
+ {
+ elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
+ procArray->numKnownAssignedXids = 0;
+ procArray->headKnownAssignedXids = procArray->tailKnownAssignedXids;
+ KnownAssignedXids[procArray->tailKnownAssignedXids] = InvalidTransactionId;
+ return;
+ }
+
+ Assert(TransactionIdIsValid(removeXid));
+ elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
+
+ i = procArray->tailKnownAssignedXids;
+ head = procArray->headKnownAssignedXids;
+
+ Assert(head >= 0 && head < procArray->maxKnownAssignedXids);
+
+ do
+ {
+ knownXid = KnownAssignedXids[i];
+
+ if (TransactionIdFollowsOrEquals(knownXid, removeXid))
+ break;
+
+ if (TransactionIdIsValid(knownXid) &&
+ !StandbyTransactionIdIsPrepared(knownXid))
+ {
+ KnownAssignedXids[i] = InvalidTransactionId;
+ count++;
+ }
+
+ KnownAssignedXidsIncrementIndex(i);
+
+ } while (i != head);
+
+ procArray->numKnownAssignedXids -= count;
+ Assert(procArray->numKnownAssignedXids >= 0);
+
+ KnownAssignedXidsCompress(false);
+}
+
+/*
+ * Display KnownAssignedXids to provide debug trail
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static void
+KnownAssignedXidsDisplay(int trace_level)
+{
+ TransactionId knownXid;
+ StringInfoData buf;
+ int head, i;
+ int nxids = 0;
+
+ i = procArray->tailKnownAssignedXids;
+ head = procArray->headKnownAssignedXids;
+
+ initStringInfo(&buf);
+
+ if (procArray->numKnownAssignedXids > 0)
+ {
+ do
+ {
+ knownXid = KnownAssignedXids[i];
+
+ if (TransactionIdIsValid(knownXid))
+ {
+ nxids++;
+ appendStringInfo(&buf, "[%u]=%u ", i, knownXid);
+ }
+
+ KnownAssignedXidsIncrementIndex(i);
+
+ } while (i != head);
+ }
+
+ elog(trace_level, "%d KnownAssignedXids (num=%u tail=%u head=%u) %s",
+ nxids,
+ procArray->numKnownAssignedXids,
+ procArray->tailKnownAssignedXids,
+ procArray->headKnownAssignedXids,
+ buf.data);
+
+ pfree(buf.data);
+}
+#endif
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0295a61..df7b009 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -278,6 +278,7 @@ extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
+extern bool XLogConsistentState(void);
extern bool RecoveryInProgress(void);
extern bool XLogInsertAllowed(void);
extern TimestampTz GetLatestXLogTime(void);
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers