On 04.11.2011 00:43, Simon Riggs wrote:
On Thu, Nov 3, 2011 at 6:12 PM, Pavan Deolasee<pavan.deola...@gmail.com> wrote:
When PGPROC array is allocated, we also allocate another array of
PGPROC_MINIMAL structures of the same size. While accessing the
ProcArray, a simple pointer mathematic can get us the corresponding
PGPROC_MINIMAL structure. The only exception being the dummy PGPROC
for prepared transaction. A first cut version of the patch is
attached. It looks big, but most of the changes are cosmetic because
of added indirection. The patch also contains another change to keep
the ProcArray sorted by (PGPROC *) to preserve locality of references
while traversing the array.
This is very good.
If you look at your PGPROC_MINIMAL, its mostly transaction related
stuff, so I would rename it PGXACT or similar. Not sure why you talk
about pointer math, seems easy enough just to have two arrays
protected by one lock, and have each proc use the same offset in both
arrays.
Agreed, that seems more clean. The PGPROCs for prepared transactions are
currently allocated separately, so they're not currently in the same
array as all other PGPROCs, but that could be changed. Here's a WIP
patch for that. I kept the PGPROC_MINIMAL nomenclature for now, but I
agree with Simon's suggestion to rename it.
On 03.11.2011 20:12, Pavan Deolasee wrote:
> Its quite possible that the effect of the patch is more evident on the
> particular hardware that I am testing. But the approach nevertheless
> seems reasonable. It will very useful if someone else having access to
> a large box can test the effect of the patch.
I tested this on an 8-core x64 box, but couldn't see any measurable
difference in pgbench performance. I tried with and without -N and -S,
and --unlogged-tables, but no difference.
While looking at GetSnapshotData(), it occurred to me that when a PGPROC
entry does not have its xid set, ie. xid == InvalidTransactionId, it's
pointless to check the subxid array for that proc. If a transaction has
no xid, none of its subtransactions can have an xid either. That's a
trivial optimization, see attached. I tested this with "pgbench -S -M
prepared -c 500" on the 8-core box, and it seemed to make a 1-2%
difference (without the other patch). So, almost in the noise, but it
also doesn't cost us anything in terms of readability or otherwise.
--
Heikki Linnakangas
EnterpriseDB http://www.enterprisedb.com
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 477982d..b72915b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -113,7 +113,8 @@ int max_prepared_xacts = 0;
typedef struct GlobalTransactionData
{
- PGPROC proc; /* dummy proc */
+ GlobalTransaction next;
+ int pgprocno; /* dummy proc */
BackendId dummyBackendId; /* similar to backend id for backends */
TimestampTz prepared_at; /* time of preparation */
XLogRecPtr prepare_lsn; /* XLOG offset of prepare record */
@@ -207,7 +208,8 @@ TwoPhaseShmemInit(void)
sizeof(GlobalTransaction) * max_prepared_xacts));
for (i = 0; i < max_prepared_xacts; i++)
{
- gxacts[i].proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxacts[i].pgprocno = PreparedXactProcs[i].pgprocno;
+ gxacts[i].next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = &gxacts[i];
/*
@@ -243,6 +245,8 @@ MarkAsPreparing(TransactionId xid, const char *gid,
TimestampTz prepared_at, Oid owner, Oid databaseid)
{
GlobalTransaction gxact;
+ PGPROC *proc;
+ PGPROC_MINIMAL *proc_minimal;
int i;
if (strlen(gid) >= GIDSIZE)
@@ -274,7 +278,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
TwoPhaseState->numPrepXacts--;
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
/* and put it back in the freelist */
- gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxact->next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact;
/* Back up index count too, so we don't miss scanning one */
i--;
@@ -302,32 +306,36 @@ MarkAsPreparing(TransactionId xid, const char *gid,
errhint("Increase max_prepared_transactions (currently %d).",
max_prepared_xacts)));
gxact = TwoPhaseState->freeGXacts;
- TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->proc.links.next;
+ TwoPhaseState->freeGXacts = (GlobalTransaction) gxact->next;
- /* Initialize it */
- MemSet(&gxact->proc, 0, sizeof(PGPROC));
- SHMQueueElemInit(&(gxact->proc.links));
- gxact->proc.waitStatus = STATUS_OK;
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
+
+ /* Initialize the PGPROC entry */
+ MemSet(proc, 0, sizeof(PGPROC));
+ proc->pgprocno = gxact->pgprocno;
+ SHMQueueElemInit(&(proc->links));
+ proc->waitStatus = STATUS_OK;
/* We set up the gxact's VXID as InvalidBackendId/XID */
- gxact->proc.lxid = (LocalTransactionId) xid;
- gxact->proc.xid = xid;
- gxact->proc.xmin = InvalidTransactionId;
- gxact->proc.pid = 0;
- gxact->proc.backendId = InvalidBackendId;
- gxact->proc.databaseId = databaseid;
- gxact->proc.roleId = owner;
- gxact->proc.inCommit = false;
- gxact->proc.vacuumFlags = 0;
- gxact->proc.lwWaiting = false;
- gxact->proc.lwExclusive = false;
- gxact->proc.lwWaitLink = NULL;
- gxact->proc.waitLock = NULL;
- gxact->proc.waitProcLock = NULL;
+ proc->lxid = (LocalTransactionId) xid;
+ proc_minimal->xid = xid;
+ proc_minimal->xmin = InvalidTransactionId;
+ proc_minimal->inCommit = false;
+ proc_minimal->vacuumFlags = 0;
+ proc->pid = 0;
+ proc->backendId = InvalidBackendId;
+ proc->databaseId = databaseid;
+ proc->roleId = owner;
+ proc->lwWaiting = false;
+ proc->lwExclusive = false;
+ proc->lwWaitLink = NULL;
+ proc->waitLock = NULL;
+ proc->waitProcLock = NULL;
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
- SHMQueueInit(&(gxact->proc.myProcLocks[i]));
+ SHMQueueInit(&(proc->myProcLocks[i]));
/* subxid data must be filled later by GXactLoadSubxactData */
- gxact->proc.subxids.overflowed = false;
- gxact->proc.subxids.nxids = 0;
+ proc_minimal->overflowed = false;
+ proc_minimal->nxids = 0;
gxact->prepared_at = prepared_at;
/* initialize LSN to 0 (start of WAL) */
@@ -358,17 +366,19 @@ static void
GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
TransactionId *children)
{
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGPROC_MINIMAL *proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
/* We need no extra lock since the GXACT isn't valid yet */
if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
{
- gxact->proc.subxids.overflowed = true;
+ proc_minimal->overflowed = true;
nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
}
if (nsubxacts > 0)
{
- memcpy(gxact->proc.subxids.xids, children,
+ memcpy(proc->subxids.xids, children,
nsubxacts * sizeof(TransactionId));
- gxact->proc.subxids.nxids = nsubxacts;
+ proc_minimal->nxids = nsubxacts;
}
}
@@ -389,7 +399,7 @@ MarkAsPrepared(GlobalTransaction gxact)
* Put it into the global ProcArray so TransactionIdIsInProgress considers
* the XID as still running.
*/
- ProcArrayAdd(&gxact->proc);
+ ProcArrayAdd(&ProcGlobal->allProcs[gxact->pgprocno]);
}
/*
@@ -406,6 +416,7 @@ LockGXact(const char *gid, Oid user)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
/* Ignore not-yet-valid GIDs */
if (!gxact->valid)
@@ -436,7 +447,7 @@ LockGXact(const char *gid, Oid user)
* there may be some other issues as well. Hence disallow until
* someone gets motivated to make it work.
*/
- if (MyDatabaseId != gxact->proc.databaseId)
+ if (MyDatabaseId != proc->databaseId)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("prepared transaction belongs to another database"),
@@ -483,7 +494,7 @@ RemoveGXact(GlobalTransaction gxact)
TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
/* and put it back in the freelist */
- gxact->proc.links.next = (SHM_QUEUE *) TwoPhaseState->freeGXacts;
+ gxact->next = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact;
LWLockRelease(TwoPhaseStateLock);
@@ -518,8 +529,9 @@ TransactionIdIsPrepared(TransactionId xid)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC_MINIMAL *proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
- if (gxact->valid && gxact->proc.xid == xid)
+ if (gxact->valid && proc_minimal->xid == xid)
{
result = true;
break;
@@ -642,6 +654,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
while (status->array != NULL && status->currIdx < status->ngxacts)
{
GlobalTransaction gxact = &status->array[status->currIdx++];
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGPROC_MINIMAL *proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
Datum values[5];
bool nulls[5];
HeapTuple tuple;
@@ -656,11 +670,11 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
- values[0] = TransactionIdGetDatum(gxact->proc.xid);
+ values[0] = TransactionIdGetDatum(proc_minimal->xid);
values[1] = CStringGetTextDatum(gxact->gid);
values[2] = TimestampTzGetDatum(gxact->prepared_at);
values[3] = ObjectIdGetDatum(gxact->owner);
- values[4] = ObjectIdGetDatum(gxact->proc.databaseId);
+ values[4] = ObjectIdGetDatum(proc->databaseId);
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
result = HeapTupleGetDatum(tuple);
@@ -711,10 +725,11 @@ TwoPhaseGetDummyProc(TransactionId xid)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC_MINIMAL *proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
- if (gxact->proc.xid == xid)
+ if (proc_minimal->xid == xid)
{
- result = &gxact->proc;
+ result = &ProcGlobal->allProcs[gxact->pgprocno];
break;
}
}
@@ -841,7 +856,9 @@ save_state_data(const void *data, uint32 len)
void
StartPrepare(GlobalTransaction gxact)
{
- TransactionId xid = gxact->proc.xid;
+ PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ PGPROC_MINIMAL *proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
+ TransactionId xid = proc_minimal->xid;
TwoPhaseFileHeader hdr;
TransactionId *children;
RelFileNode *commitrels;
@@ -865,7 +882,7 @@ StartPrepare(GlobalTransaction gxact)
hdr.magic = TWOPHASE_MAGIC;
hdr.total_len = 0; /* EndPrepare will fill this in */
hdr.xid = xid;
- hdr.database = gxact->proc.databaseId;
+ hdr.database = proc->databaseId;
hdr.prepared_at = gxact->prepared_at;
hdr.owner = gxact->owner;
hdr.nsubxacts = xactGetCommittedChildren(&children);
@@ -913,7 +930,8 @@ StartPrepare(GlobalTransaction gxact)
void
EndPrepare(GlobalTransaction gxact)
{
- TransactionId xid = gxact->proc.xid;
+ PGPROC_MINIMAL *proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
+ TransactionId xid = proc_minimal->xid;
TwoPhaseFileHeader *hdr;
char path[MAXPGPATH];
XLogRecData *record;
@@ -1021,7 +1039,7 @@ EndPrepare(GlobalTransaction gxact)
*/
START_CRIT_SECTION();
- MyProc->inCommit = true;
+ MyProcMinimal->inCommit = true;
gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
records.head);
@@ -1069,7 +1087,7 @@ EndPrepare(GlobalTransaction gxact)
* checkpoint starting after this will certainly see the gxact as a
* candidate for fsyncing.
*/
- MyProc->inCommit = false;
+ MyProcMinimal->inCommit = false;
END_CRIT_SECTION();
@@ -1242,6 +1260,8 @@ void
FinishPreparedTransaction(const char *gid, bool isCommit)
{
GlobalTransaction gxact;
+ PGPROC *proc;
+ PGPROC_MINIMAL *proc_minimal;
TransactionId xid;
char *buf;
char *bufptr;
@@ -1260,7 +1280,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
* try to commit the same GID at once.
*/
gxact = LockGXact(gid, GetUserId());
- xid = gxact->proc.xid;
+ proc = &ProcGlobal->allProcs[gxact->pgprocno];
+ proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
+ xid = proc_minimal->xid;
/*
* Read and validate the state file
@@ -1309,7 +1331,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nsubxacts, children,
hdr->nabortrels, abortrels);
- ProcArrayRemove(&gxact->proc, latestXid);
+ ProcArrayRemove(proc, latestXid);
/*
* In case we fail while running the callbacks, mark the gxact invalid so
@@ -1540,10 +1562,11 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+ PGPROC_MINIMAL *proc_minimal = &ProcGlobal->allProcs_Minimal[gxact->pgprocno];
if (gxact->valid &&
XLByteLE(gxact->prepare_lsn, redo_horizon))
- xids[nxids++] = gxact->proc.xid;
+ xids[nxids++] = proc_minimal->xid;
}
LWLockRelease(TwoPhaseStateLock);
@@ -1972,7 +1995,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
- MyProc->inCommit = true;
+ MyProcMinimal->inCommit = true;
/* Emit the XLOG commit record */
xlrec.xid = xid;
@@ -2037,7 +2060,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
- MyProc->inCommit = false;
+ MyProcMinimal->inCommit = false;
END_CRIT_SECTION();
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 61dcfed..7c986aa 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -54,7 +54,7 @@ GetNewTransactionId(bool isSubXact)
if (IsBootstrapProcessingMode())
{
Assert(!isSubXact);
- MyProc->xid = BootstrapTransactionId;
+ MyProcMinimal->xid = BootstrapTransactionId;
return BootstrapTransactionId;
}
@@ -208,20 +208,21 @@ GetNewTransactionId(bool isSubXact)
* TransactionId and int fetch/store are atomic.
*/
volatile PGPROC *myproc = MyProc;
+ volatile PGPROC_MINIMAL *myprocminimal = MyProcMinimal;
if (!isSubXact)
- myproc->xid = xid;
+ myprocminimal->xid = xid;
else
{
- int nxids = myproc->subxids.nxids;
+ int nxids = myprocminimal->nxids;
if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
{
myproc->subxids.xids[nxids] = xid;
- myproc->subxids.nxids = nxids + 1;
+ myprocminimal->nxids = nxids + 1;
}
else
- myproc->subxids.overflowed = true;
+ myprocminimal->overflowed = true;
}
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c151d3b..e0f7419 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -981,7 +981,7 @@ RecordTransactionCommit(void)
* bit fuzzy, but it doesn't matter.
*/
START_CRIT_SECTION();
- MyProc->inCommit = true;
+ MyProcMinimal->inCommit = true;
SetCurrentTransactionStopTimestamp();
@@ -1155,7 +1155,7 @@ RecordTransactionCommit(void)
*/
if (markXidCommitted)
{
- MyProc->inCommit = false;
+ MyProcMinimal->inCommit = false;
END_CRIT_SECTION();
}
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index 32985a4..bde51bb 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -223,7 +223,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy)
* OK, let's do it. First let other backends know I'm in ANALYZE.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- MyProc->vacuumFlags |= PROC_IN_ANALYZE;
+ MyProcMinimal->vacuumFlags |= PROC_IN_ANALYZE;
LWLockRelease(ProcArrayLock);
/*
@@ -250,7 +250,7 @@ analyze_rel(Oid relid, VacuumStmt *vacstmt, BufferAccessStrategy bstrategy)
* because the vacuum flag is cleared by the end-of-xact code.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- MyProc->vacuumFlags &= ~PROC_IN_ANALYZE;
+ MyProcMinimal->vacuumFlags &= ~PROC_IN_ANALYZE;
LWLockRelease(ProcArrayLock);
}
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index f42504c..3542b3a 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -893,9 +893,9 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound)
* which is probably Not Good.
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- MyProc->vacuumFlags |= PROC_IN_VACUUM;
+ MyProcMinimal->vacuumFlags |= PROC_IN_VACUUM;
if (for_wraparound)
- MyProc->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
+ MyProcMinimal->vacuumFlags |= PROC_VACUUM_FOR_WRAPAROUND;
LWLockRelease(ProcArrayLock);
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index dd2d6ee..dc93b42 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -702,7 +702,7 @@ ProcessStandbyHSFeedbackMessage(void)
* safe, and if we're moving it backwards, well, the data is at risk
* already since a VACUUM could have just finished calling GetOldestXmin.)
*/
- MyProc->xmin = reply.xmin;
+ MyProcMinimal->xmin = reply.xmin;
}
/* Main loop of walsender process */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 56c0bd8..bb8b832 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -192,7 +192,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
XLOGShmemInit();
CLOGShmemInit();
SUBTRANSShmemInit();
- TwoPhaseShmemInit();
MultiXactShmemInit();
InitBufferPool();
@@ -213,6 +212,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
InitProcGlobal();
CreateSharedProcArray();
CreateSharedBackendStatus();
+ TwoPhaseShmemInit();
/*
* Set up shared-inval messaging
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 1a48485..a4cf965 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -82,14 +82,17 @@ typedef struct ProcArrayStruct
TransactionId lastOverflowedXid;
/*
- * We declare procs[] as 1 entry because C wants a fixed-size array, but
+ * We declare pgprocnos[] as 1 entry because C wants a fixed-size array, but
* actually it is maxProcs entries long.
*/
- PGPROC *procs[1]; /* VARIABLE LENGTH ARRAY */
+ int pgprocnos[1]; /* VARIABLE LENGTH ARRAY */
} ProcArrayStruct;
static ProcArrayStruct *procArray;
+static PGPROC *allProcs;
+static PGPROC_MINIMAL *allProcs_Minimal;
+
/*
* Bookkeeping for tracking emulated transactions in recovery
*/
@@ -169,8 +172,8 @@ ProcArrayShmemSize(void)
/* Size of the ProcArray structure itself */
#define PROCARRAY_MAXPROCS (MaxBackends + max_prepared_xacts)
- size = offsetof(ProcArrayStruct, procs);
- size = add_size(size, mul_size(sizeof(PGPROC *), PROCARRAY_MAXPROCS));
+ size = offsetof(ProcArrayStruct, pgprocnos);
+ size = add_size(size, mul_size(sizeof(int), PROCARRAY_MAXPROCS));
/*
* During Hot Standby processing we have a data structure called
@@ -211,8 +214,8 @@ CreateSharedProcArray(void)
/* Create or attach to the ProcArray shared structure */
procArray = (ProcArrayStruct *)
ShmemInitStruct("Proc Array",
- add_size(offsetof(ProcArrayStruct, procs),
- mul_size(sizeof(PGPROC *),
+ add_size(offsetof(ProcArrayStruct, pgprocnos),
+ mul_size(sizeof(int),
PROCARRAY_MAXPROCS)),
&found);
@@ -231,6 +234,9 @@ CreateSharedProcArray(void)
procArray->lastOverflowedXid = InvalidTransactionId;
}
+ allProcs = ProcGlobal->allProcs;
+ allProcs_Minimal = ProcGlobal->allProcs_Minimal;
+
/* Create or attach to the KnownAssignedXids arrays too, if needed */
if (EnableHotStandby)
{
@@ -253,6 +259,7 @@ void
ProcArrayAdd(PGPROC *proc)
{
ProcArrayStruct *arrayP = procArray;
+ int index;
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -269,7 +276,28 @@ ProcArrayAdd(PGPROC *proc)
errmsg("sorry, too many clients already")));
}
- arrayP->procs[arrayP->numProcs] = proc;
+ /*
+ * Keep the procs array sorted by (PGPROC *) so that we can utilize
+ * locality of references much better. This is useful while traversing the
+ * ProcArray because there is a increased likelyhood of finding the next
+ * PGPROC structure in the cache.
+ *
+ * Since the occurance of adding/removing a proc is much lower than the
+ * access to the ProcArray itself, the overhead should be marginal
+ */
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ /*
+ * If we are the first PGPROC or if we have found our right position in
+ * the array, break
+ */
+ if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
+ break;
+ }
+
+ memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
+ (arrayP->numProcs - index) * sizeof (int));
+ arrayP->pgprocnos[index] = proc->pgprocno;
arrayP->numProcs++;
LWLockRelease(ProcArrayLock);
@@ -316,10 +344,12 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
for (index = 0; index < arrayP->numProcs; index++)
{
- if (arrayP->procs[index] == proc)
+ if (arrayP->pgprocnos[index] == proc->pgprocno)
{
- arrayP->procs[index] = arrayP->procs[arrayP->numProcs - 1];
- arrayP->procs[arrayP->numProcs - 1] = NULL; /* for debugging */
+ /* Keep the PGPROC array sorted. See notes above */
+ memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
+ (arrayP->numProcs - index - 1) * sizeof (int));
+ arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
arrayP->numProcs--;
LWLockRelease(ProcArrayLock);
return;
@@ -349,6 +379,8 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
void
ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
{
+ PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[proc->pgprocno];
+
if (TransactionIdIsValid(latestXid))
{
/*
@@ -361,17 +393,17 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- proc->xid = InvalidTransactionId;
+ proc_minimal->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
- proc->xmin = InvalidTransactionId;
+ proc_minimal->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
- proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- proc->inCommit = false; /* be sure this is cleared in abort */
+ proc_minimal->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ proc_minimal->inCommit = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
/* Clear the subtransaction-XID cache too while holding the lock */
- proc->subxids.nxids = 0;
- proc->subxids.overflowed = false;
+ proc_minimal->nxids = 0;
+ proc_minimal->overflowed = false;
/* Also advance global latestCompletedXid while holding the lock */
if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
@@ -390,10 +422,10 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
Assert(!TransactionIdIsValid(proc->xid));
proc->lxid = InvalidLocalTransactionId;
- proc->xmin = InvalidTransactionId;
+ proc_minimal->xmin = InvalidTransactionId;
/* must be cleared with xid/xmin: */
- proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- proc->inCommit = false; /* be sure this is cleared in abort */
+ proc_minimal->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ proc_minimal->inCommit = false; /* be sure this is cleared in abort */
proc->recoveryConflictPending = false;
Assert(proc->subxids.nxids == 0);
@@ -413,24 +445,26 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
void
ProcArrayClearTransaction(PGPROC *proc)
{
+ PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[proc->pgprocno];
+
/*
* We can skip locking ProcArrayLock here, because this action does not
* actually change anyone's view of the set of running XIDs: our entry is
* duplicate with the gxact that has already been inserted into the
* ProcArray.
*/
- proc->xid = InvalidTransactionId;
+ proc_minimal->xid = InvalidTransactionId;
proc->lxid = InvalidLocalTransactionId;
- proc->xmin = InvalidTransactionId;
+ proc_minimal->xmin = InvalidTransactionId;
proc->recoveryConflictPending = false;
/* redundant, but just in case */
- proc->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
- proc->inCommit = false;
+ proc_minimal->vacuumFlags &= ~PROC_VACUUM_STATE_MASK;
+ proc_minimal->inCommit = false;
/* Clear the subtransaction-XID cache too */
- proc->subxids.nxids = 0;
- proc->subxids.overflowed = false;
+ proc_minimal->nxids = 0;
+ proc_minimal->overflowed = false;
}
/*
@@ -811,7 +845,9 @@ TransactionIdIsInProgress(TransactionId xid)
/* No shortcuts, gotta grovel through the array */
for (i = 0; i < arrayP->numProcs; i++)
{
- volatile PGPROC *proc = arrayP->procs[i];
+ int pgprocno = arrayP->pgprocnos[i];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
TransactionId pxid;
/* Ignore my own proc --- dealt with it above */
@@ -819,7 +855,7 @@ TransactionIdIsInProgress(TransactionId xid)
continue;
/* Fetch xid just once - see GetNewTransactionId */
- pxid = proc->xid;
+ pxid = proc_minimal->xid;
if (!TransactionIdIsValid(pxid))
continue;
@@ -844,7 +880,7 @@ TransactionIdIsInProgress(TransactionId xid)
/*
* Step 2: check the cached child-Xids arrays
*/
- for (j = proc->subxids.nxids - 1; j >= 0; j--)
+ for (j = proc_minimal->nxids - 1; j >= 0; j--)
{
/* Fetch xid just once - see GetNewTransactionId */
TransactionId cxid = proc->subxids.xids[j];
@@ -864,7 +900,7 @@ TransactionIdIsInProgress(TransactionId xid)
* we hold ProcArrayLock. So we can't miss an Xid that we need to
* worry about.)
*/
- if (proc->subxids.overflowed)
+ if (proc_minimal->overflowed)
xids[nxids++] = pxid;
}
@@ -965,10 +1001,13 @@ TransactionIdIsActive(TransactionId xid)
for (i = 0; i < arrayP->numProcs; i++)
{
- volatile PGPROC *proc = arrayP->procs[i];
+ int pgprocno = arrayP->pgprocnos[i];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
+ TransactionId pxid;
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId pxid = proc->xid;
+ pxid = proc_minimal->xid;
if (!TransactionIdIsValid(pxid))
continue;
@@ -1060,9 +1099,11 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
- if (ignoreVacuum && (proc->vacuumFlags & PROC_IN_VACUUM))
+ if (ignoreVacuum && (proc_minimal->vacuumFlags & PROC_IN_VACUUM))
continue;
if (allDbs ||
@@ -1070,7 +1111,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
proc->databaseId == 0) /* always include WalSender */
{
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId xid = proc->xid;
+ TransactionId xid = proc_minimal->xid;
/* First consider the transaction's own Xid, if any */
if (TransactionIdIsNormal(xid) &&
@@ -1084,7 +1125,7 @@ GetOldestXmin(bool allDbs, bool ignoreVacuum)
* have an Xmin but not (yet) an Xid; conversely, if it has an
* Xid, that could determine some not-yet-set Xmin.
*/
- xid = proc->xmin; /* Fetch just once */
+ xid = proc_minimal->xmin; /* Fetch just once */
if (TransactionIdIsNormal(xid) &&
TransactionIdPrecedes(xid, result))
result = xid;
@@ -1200,6 +1241,8 @@ GetSnapshotData(Snapshot snapshot)
int count = 0;
int subcount = 0;
bool suboverflowed = false;
+ static TransactionId *xmins = NULL;
+ int numProcs;
Assert(snapshot != NULL);
@@ -1235,6 +1278,15 @@ GetSnapshotData(Snapshot snapshot)
errmsg("out of memory")));
}
+ if (xmins == NULL)
+ {
+ xmins = malloc(procArray->maxProcs * sizeof(TransactionId));
+ if (xmins == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+
/*
* It is sufficient to get shared lock on ProcArrayLock, even if we are
* going to set MyProc->xmin.
@@ -1261,6 +1313,8 @@ GetSnapshotData(Snapshot snapshot)
if (!snapshot->takenDuringRecovery)
{
+ int *pgprocnos = arrayP->pgprocnos;
+
/*
* Spin over procArray checking xid, xmin, and subxids. The goal is
* to gather all active xids, find the lowest xmin, and try to record
@@ -1269,23 +1323,25 @@ GetSnapshotData(Snapshot snapshot)
* prepared transaction xids are held in KnownAssignedXids, so these
* will be seen without needing to loop through procs here.
*/
- for (index = 0; index < arrayP->numProcs; index++)
+ numProcs = arrayP->numProcs;
+ for (index = 0; index < numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = pgprocnos[index];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
TransactionId xid;
/* Ignore procs running LAZY VACUUM */
- if (proc->vacuumFlags & PROC_IN_VACUUM)
+ if (proc_minimal->vacuumFlags & PROC_IN_VACUUM)
+ {
+ xmins[index] = InvalidTransactionId;
continue;
+ }
/* Update globalxmin to be the smallest valid xmin */
- xid = proc->xmin; /* fetch just once */
- if (TransactionIdIsNormal(xid) &&
- TransactionIdPrecedes(xid, globalxmin))
- globalxmin = xid;
+ xmins[index] = proc_minimal->xmin; /* fetch just once */
/* Fetch xid just once - see GetNewTransactionId */
- xid = proc->xid;
+ xid = proc_minimal->xid;
/*
* If the transaction has been assigned an xid < xmax we add it to
@@ -1300,7 +1356,7 @@ GetSnapshotData(Snapshot snapshot)
{
if (TransactionIdFollowsOrEquals(xid, xmax))
continue;
- if (proc != MyProc)
+ if (proc_minimal != MyProcMinimal)
snapshot->xip[count++] = xid;
if (TransactionIdPrecedes(xid, xmin))
xmin = xid;
@@ -1321,16 +1377,17 @@ GetSnapshotData(Snapshot snapshot)
*
* Again, our own XIDs are not included in the snapshot.
*/
- if (!suboverflowed && proc != MyProc)
+ if (!suboverflowed && proc_minimal != MyProcMinimal)
{
- if (proc->subxids.overflowed)
+ if (proc_minimal->overflowed)
suboverflowed = true;
else
{
- int nxids = proc->subxids.nxids;
+ int nxids = proc_minimal->nxids;
if (nxids > 0)
{
+ volatile PGPROC *proc = &allProcs[pgprocno];
memcpy(snapshot->subxip + subcount,
(void *) proc->subxids.xids,
nxids * sizeof(TransactionId));
@@ -1342,6 +1399,7 @@ GetSnapshotData(Snapshot snapshot)
}
else
{
+ numProcs = 0;
/*
* We're in hot standby, so get XIDs from KnownAssignedXids.
*
@@ -1372,9 +1430,8 @@ GetSnapshotData(Snapshot snapshot)
suboverflowed = true;
}
- if (!TransactionIdIsValid(MyProc->xmin))
- MyProc->xmin = TransactionXmin = xmin;
-
+ if (!TransactionIdIsValid(MyProcMinimal->xmin))
+ MyProcMinimal->xmin = TransactionXmin = xmin;
LWLockRelease(ProcArrayLock);
/*
@@ -1382,6 +1439,14 @@ GetSnapshotData(Snapshot snapshot)
* different way of computing it than GetOldestXmin uses, but should give
* the same result.
*/
+ for (index = 0; index < numProcs; index++)
+ {
+ TransactionId xid = xmins[index];
+ if (TransactionIdIsNormal(xid) &&
+ TransactionIdPrecedes(xid, globalxmin))
+ globalxmin = xid;
+ }
+
if (TransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
@@ -1436,14 +1501,16 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
TransactionId xid;
/* Ignore procs running LAZY VACUUM */
- if (proc->vacuumFlags & PROC_IN_VACUUM)
+ if (proc_minimal->vacuumFlags & PROC_IN_VACUUM)
continue;
- xid = proc->xid; /* fetch just once */
+ xid = proc_minimal->xid; /* fetch just once */
if (xid != sourcexid)
continue;
@@ -1459,7 +1526,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
/*
* Likewise, let's just make real sure its xmin does cover us.
*/
- xid = proc->xmin; /* fetch just once */
+ xid = proc_minimal->xmin; /* fetch just once */
if (!TransactionIdIsNormal(xid) ||
!TransactionIdPrecedesOrEquals(xid, xmin))
continue;
@@ -1470,7 +1537,7 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
* GetSnapshotData first, we'll be overwriting a valid xmin here,
* so we don't check that.)
*/
- MyProc->xmin = TransactionXmin = xmin;
+ MyProcMinimal->xmin = TransactionXmin = xmin;
result = true;
break;
@@ -1562,12 +1629,14 @@ GetRunningTransactionData(void)
*/
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
TransactionId xid;
int nxids;
/* Fetch xid just once - see GetNewTransactionId */
- xid = proc->xid;
+ xid = proc_minimal->xid;
/*
* We don't need to store transactions that don't have a TransactionId
@@ -1585,7 +1654,7 @@ GetRunningTransactionData(void)
* Save subtransaction XIDs. Other backends can't add or remove
* entries while we're holding XidGenLock.
*/
- nxids = proc->subxids.nxids;
+ nxids = proc_minimal->nxids;
if (nxids > 0)
{
memcpy(&xids[count], (void *) proc->subxids.xids,
@@ -1593,7 +1662,7 @@ GetRunningTransactionData(void)
count += nxids;
subcount += nxids;
- if (proc->subxids.overflowed)
+ if (proc_minimal->overflowed)
suboverflowed = true;
/*
@@ -1653,11 +1722,12 @@ GetOldestActiveTransactionId(void)
*/
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
TransactionId xid;
/* Fetch xid just once - see GetNewTransactionId */
- xid = proc->xid;
+ xid = proc_minimal->xid;
if (!TransactionIdIsNormal(xid))
continue;
@@ -1709,12 +1779,14 @@ GetTransactionsInCommit(TransactionId **xids_p)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
+ TransactionId pxid;
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId pxid = proc->xid;
+ pxid = proc_minimal->xid;
- if (proc->inCommit && TransactionIdIsValid(pxid))
+ if (proc_minimal->inCommit && TransactionIdIsValid(pxid))
xids[nxids++] = pxid;
}
@@ -1744,12 +1816,14 @@ HaveTransactionsInCommit(TransactionId *xids, int nxids)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
+ TransactionId pxid;
/* Fetch xid just once - see GetNewTransactionId */
- TransactionId pxid = proc->xid;
+ pxid = proc_minimal->xid;
- if (proc->inCommit && TransactionIdIsValid(pxid))
+ if (proc_minimal->inCommit && TransactionIdIsValid(pxid))
{
int i;
@@ -1792,7 +1866,7 @@ BackendPidGetProc(int pid)
for (index = 0; index < arrayP->numProcs; index++)
{
- PGPROC *proc = arrayP->procs[index];
+ PGPROC *proc = &allProcs[arrayP->pgprocnos[index]];
if (proc->pid == pid)
{
@@ -1833,9 +1907,11 @@ BackendXidGetPid(TransactionId xid)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
- if (proc->xid == xid)
+ if (proc_minimal->xid == xid)
{
result = proc->pid;
break;
@@ -1901,18 +1977,20 @@ GetCurrentVirtualXIDs(TransactionId limitXmin, bool excludeXmin0,
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
if (proc == MyProc)
continue;
- if (excludeVacuum & proc->vacuumFlags)
+ if (excludeVacuum & proc_minimal->vacuumFlags)
continue;
if (allDbs || proc->databaseId == MyDatabaseId)
{
/* Fetch xmin just once - might change on us */
- TransactionId pxmin = proc->xmin;
+ TransactionId pxmin = proc_minimal->xmin;
if (excludeXmin0 && !TransactionIdIsValid(pxmin))
continue;
@@ -1996,7 +2074,9 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
/* Exclude prepared transactions */
if (proc->pid == 0)
@@ -2006,7 +2086,7 @@ GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbOid)
proc->databaseId == dbOid)
{
/* Fetch xmin just once - can't change on us, but good coding */
- TransactionId pxmin = proc->xmin;
+ TransactionId pxmin = proc_minimal->xmin;
/*
* We ignore an invalid pxmin because this means that backend has
@@ -2050,8 +2130,9 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
for (index = 0; index < arrayP->numProcs; index++)
{
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
VirtualTransactionId procvxid;
- PGPROC *proc = arrayP->procs[index];
GET_VXID_FROM_PGPROC(procvxid, *proc);
@@ -2104,7 +2185,9 @@ MinimumActiveBackends(int min)
*/
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
/*
* Since we're not holding a lock, need to check that the pointer is
@@ -2122,10 +2205,10 @@ MinimumActiveBackends(int min)
if (proc == MyProc)
continue; /* do not count myself */
+ if (proc_minimal->xid == InvalidTransactionId)
+ continue; /* do not count if no XID assigned */
if (proc->pid == 0)
continue; /* do not count prepared xacts */
- if (proc->xid == InvalidTransactionId)
- continue; /* do not count if no XID assigned */
if (proc->waitLock != NULL)
continue; /* do not count if blocked on a lock */
count++;
@@ -2150,7 +2233,8 @@ CountDBBackends(Oid databaseid)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
if (proc->pid == 0)
continue; /* do not count prepared xacts */
@@ -2179,7 +2263,8 @@ CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conflictPending)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
if (databaseid == InvalidOid || proc->databaseId == databaseid)
{
@@ -2217,7 +2302,8 @@ CountUserBackends(Oid roleid)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
if (proc->pid == 0)
continue; /* do not count prepared xacts */
@@ -2277,7 +2363,9 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
for (index = 0; index < arrayP->numProcs; index++)
{
- volatile PGPROC *proc = arrayP->procs[index];
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGPROC_MINIMAL *proc_minimal = &allProcs_Minimal[pgprocno];
if (proc->databaseId != databaseId)
continue;
@@ -2291,7 +2379,7 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
else
{
(*nbackends)++;
- if ((proc->vacuumFlags & PROC_IS_AUTOVACUUM) &&
+ if ((proc_minimal->vacuumFlags & PROC_IS_AUTOVACUUM) &&
nautovacs < MAXAUTOVACPIDS)
autovac_pids[nautovacs++] = proc->pid;
}
@@ -2321,8 +2409,8 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
#define XidCacheRemove(i) \
do { \
- MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProc->subxids.nxids - 1]; \
- MyProc->subxids.nxids--; \
+ MyProc->subxids.xids[i] = MyProc->subxids.xids[MyProcMinimal->nxids - 1]; \
+ MyProcMinimal->nxids--; \
} while (0)
/*
@@ -2361,7 +2449,7 @@ XidCacheRemoveRunningXids(TransactionId xid,
{
TransactionId anxid = xids[i];
- for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
+ for (j = MyProcMinimal->nxids - 1; j >= 0; j--)
{
if (TransactionIdEquals(MyProc->subxids.xids[j], anxid))
{
@@ -2377,11 +2465,11 @@ XidCacheRemoveRunningXids(TransactionId xid,
* error during AbortSubTransaction. So instead of Assert, emit a
* debug warning.
*/
- if (j < 0 && !MyProc->subxids.overflowed)
+ if (j < 0 && !MyProcMinimal->overflowed)
elog(WARNING, "did not find subXID %u in MyProc", anxid);
}
- for (j = MyProc->subxids.nxids - 1; j >= 0; j--)
+ for (j = MyProcMinimal->nxids - 1; j >= 0; j--)
{
if (TransactionIdEquals(MyProc->subxids.xids[j], xid))
{
@@ -2390,7 +2478,7 @@ XidCacheRemoveRunningXids(TransactionId xid,
}
}
/* Ordinarily we should have found it, unless the cache has overflowed */
- if (j < 0 && !MyProc->subxids.overflowed)
+ if (j < 0 && !MyProcMinimal->overflowed)
elog(WARNING, "did not find subXID %u in MyProc", xid);
/* Also advance global latestCompletedXid while holding the lock */
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index 7e7f6af..4fd7bd7 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -450,6 +450,7 @@ FindLockCycleRecurse(PGPROC *checkProc,
int *nSoftEdges) /* output argument */
{
PGPROC *proc;
+ PGPROC_MINIMAL *proc_minimal;
LOCK *lock;
PROCLOCK *proclock;
SHM_QUEUE *procLocks;
@@ -516,6 +517,7 @@ FindLockCycleRecurse(PGPROC *checkProc,
while (proclock)
{
proc = proclock->tag.myProc;
+ proc_minimal = &ProcGlobal->allProcs_Minimal[proc->pgprocno];
/* A proc never blocks itself */
if (proc != checkProc)
@@ -541,7 +543,7 @@ FindLockCycleRecurse(PGPROC *checkProc,
* vacuumFlag bit), but we don't do that here to avoid
* grabbing ProcArrayLock.
*/
- if (proc->vacuumFlags & PROC_IS_AUTOVACUUM)
+ if (proc_minimal->vacuumFlags & PROC_IS_AUTOVACUUM)
blocking_autovacuum_proc = proc;
/* This proc hard-blocks checkProc */
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index ed8344f..af45f9a 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -3182,9 +3182,10 @@ GetRunningTransactionLocks(int *nlocks)
proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
{
PGPROC *proc = proclock->tag.myProc;
+ PGPROC_MINIMAL *proc_minimal = &ProcGlobal->allProcs_Minimal[proc->pgprocno];
LOCK *lock = proclock->tag.myLock;
- accessExclusiveLocks[index].xid = proc->xid;
+ accessExclusiveLocks[index].xid = proc_minimal->xid;
accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index eda3a98..289569a 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
#include <sys/time.h>
#include "access/transam.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
@@ -57,6 +58,7 @@ bool log_lock_waits = false;
/* Pointer to this process's PGPROC struct, if any */
PGPROC *MyProc = NULL;
+PGPROC_MINIMAL *MyProcMinimal = NULL;
/*
* This spinlock protects the freelist of recycled PGPROC structures.
@@ -70,6 +72,7 @@ NON_EXEC_STATIC slock_t *ProcStructLock = NULL;
/* Pointers to shared-memory structures */
PROC_HDR *ProcGlobal = NULL;
NON_EXEC_STATIC PGPROC *AuxiliaryProcs = NULL;
+PGPROC *PreparedXactProcs = NULL;
/* If we are waiting for a lock, this points to the associated LOCALLOCK */
static LOCALLOCK *lockAwaited = NULL;
@@ -106,13 +109,19 @@ ProcGlobalShmemSize(void)
/* ProcGlobal */
size = add_size(size, sizeof(PROC_HDR));
- /* AuxiliaryProcs */
- size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
/* MyProcs, including autovacuum workers and launcher */
size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
+ /* AuxiliaryProcs */
+ size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
+ /* Prepared xacts */
+ size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGPROC)));
/* ProcStructLock */
size = add_size(size, sizeof(slock_t));
+ size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC_MINIMAL)));
+ size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC_MINIMAL)));
+ size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGPROC_MINIMAL)));
+
return size;
}
@@ -157,10 +166,11 @@ void
InitProcGlobal(void)
{
PGPROC *procs;
+ PGPROC_MINIMAL *procs_minimal;
int i,
j;
bool found;
- uint32 TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS;
+ uint32 TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;
/* Create the ProcGlobal shared structure */
ProcGlobal = (PROC_HDR *)
@@ -195,14 +205,38 @@ InitProcGlobal(void)
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
MemSet(procs, 0, TotalProcs * sizeof(PGPROC));
+
+ /*
+ * Also allocate a separate array of PROC_MINIMAL structures. We keep this
+ * out of band of the main PGPROC array to ensure the very heavily accessed
+ * members of the PGPROC structure are stored contiguously in the memory.
+ * This provides significant performance benefits, especially on a
+ * multiprocessor system by improving cache hit ratio.
+ *
+ * Note: We separate the members needed by GetSnapshotData since that's the
+ * most frequently accessed code path. There is one PROC_MINIMAL structure
+ * for every PGPROC structure.
+ */
+ procs_minimal = (PGPROC_MINIMAL *) ShmemAlloc(TotalProcs * sizeof(PGPROC_MINIMAL));
+ MemSet(procs_minimal, 0, TotalProcs * sizeof(PGPROC_MINIMAL));
+ ProcGlobal->allProcs_Minimal = procs_minimal;
+
for (i = 0; i < TotalProcs; i++)
{
/* Common initialization for all PGPROCs, regardless of type. */
- /* Set up per-PGPROC semaphore, latch, and backendLock */
- PGSemaphoreCreate(&(procs[i].sem));
- InitSharedLatch(&(procs[i].procLatch));
- procs[i].backendLock = LWLockAssign();
+ /*
+ * Set up per-PGPROC semaphore, latch, and backendLock. Prepared
+ * xact dummy PGPROCs don't need these though - they're never
+ * associated with a real process
+ */
+ if (i < MaxBackends + NUM_AUXILIARY_PROCS)
+ {
+ PGSemaphoreCreate(&(procs[i].sem));
+ InitSharedLatch(&(procs[i].procLatch));
+ procs[i].backendLock = LWLockAssign();
+ }
+ procs[i].pgprocno = i;
/*
* Newly created PGPROCs for normal backends or for autovacuum must
@@ -234,6 +268,7 @@ InitProcGlobal(void)
* auxiliary proceses.
*/
AuxiliaryProcs = &procs[MaxBackends];
+ PreparedXactProcs = &procs[MaxBackends + NUM_AUXILIARY_PROCS];
/* Create ProcStructLock spinlock, too */
ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
@@ -296,6 +331,7 @@ InitProcess(void)
(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
errmsg("sorry, too many clients already")));
}
+ MyProcMinimal = &ProcGlobal->allProcs_Minimal[MyProc->pgprocno];
/*
* Now that we have a PGPROC, mark ourselves as an active postmaster
@@ -313,18 +349,18 @@ InitProcess(void)
SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId;
- MyProc->xid = InvalidTransactionId;
- MyProc->xmin = InvalidTransactionId;
+ MyProcMinimal->xid = InvalidTransactionId;
+ MyProcMinimal->xmin = InvalidTransactionId;
MyProc->pid = MyProcPid;
/* backendId, databaseId and roleId will be filled in later */
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
- MyProc->inCommit = false;
- MyProc->vacuumFlags = 0;
+ MyProcMinimal->inCommit = false;
+ MyProcMinimal->vacuumFlags = 0;
/* NB -- autovac launcher intentionally does not set IS_AUTOVACUUM */
if (IsAutoVacuumWorkerProcess())
- MyProc->vacuumFlags |= PROC_IS_AUTOVACUUM;
+ MyProcMinimal->vacuumFlags |= PROC_IS_AUTOVACUUM;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitLink = NULL;
@@ -462,6 +498,7 @@ InitAuxiliaryProcess(void)
((volatile PGPROC *) auxproc)->pid = MyProcPid;
MyProc = auxproc;
+ MyProcMinimal = &ProcGlobal->allProcs_Minimal[auxproc->pgprocno];
SpinLockRelease(ProcStructLock);
@@ -472,13 +509,13 @@ InitAuxiliaryProcess(void)
SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK;
MyProc->lxid = InvalidLocalTransactionId;
- MyProc->xid = InvalidTransactionId;
- MyProc->xmin = InvalidTransactionId;
+ MyProcMinimal->xid = InvalidTransactionId;
+ MyProcMinimal->xmin = InvalidTransactionId;
MyProc->backendId = InvalidBackendId;
MyProc->databaseId = InvalidOid;
MyProc->roleId = InvalidOid;
- MyProc->inCommit = false;
- MyProc->vacuumFlags = 0;
+ MyProcMinimal->inCommit = false;
+ MyProcMinimal->vacuumFlags = 0;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitLink = NULL;
@@ -1045,6 +1082,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
if (deadlock_state == DS_BLOCKED_BY_AUTOVACUUM && allow_autovacuum_cancel)
{
PGPROC *autovac = GetBlockingAutoVacuumPgproc();
+ PGPROC_MINIMAL *autovac_minimal = &ProcGlobal->allProcs_Minimal[autovac->pgprocno];
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@@ -1053,8 +1091,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* wraparound.
*/
if ((autovac != NULL) &&
- (autovac->vacuumFlags & PROC_IS_AUTOVACUUM) &&
- !(autovac->vacuumFlags & PROC_VACUUM_FOR_WRAPAROUND))
+ (autovac_minimal->vacuumFlags & PROC_IS_AUTOVACUUM) &&
+ !(autovac_minimal->vacuumFlags & PROC_VACUUM_FOR_WRAPAROUND))
{
int pid = autovac->pid;
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 50fb780..1f4f5b4 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -577,7 +577,7 @@ static void
SnapshotResetXmin(void)
{
if (RegisteredSnapshots == 0 && ActiveSnapshot == NULL)
- MyProc->xmin = InvalidTransactionId;
+ MyProcMinimal->xmin = InvalidTransactionId;
}
/*
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 6e798b1..40651db 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -35,8 +35,6 @@
struct XidCache
{
- bool overflowed;
- int nxids;
TransactionId xids[PGPROC_MAX_CACHED_SUBXIDS];
};
@@ -86,27 +84,14 @@ struct PGPROC
LocalTransactionId lxid; /* local id of top-level transaction currently
* being executed by this proc, if running;
* else InvalidLocalTransactionId */
-
- TransactionId xid; /* id of top-level transaction currently being
- * executed by this proc, if running and XID
- * is assigned; else InvalidTransactionId */
-
- TransactionId xmin; /* minimal running XID as it was when we were
- * starting our xact, excluding LAZY VACUUM:
- * vacuum must not remove tuples deleted by
- * xid >= xmin ! */
-
int pid; /* Backend's process ID; 0 if prepared xact */
+ int pgprocno;
/* These fields are zero while a backend is still starting up: */
BackendId backendId; /* This backend's backend ID (if assigned) */
Oid databaseId; /* OID of database this backend is using */
Oid roleId; /* OID of role using this backend */
- bool inCommit; /* true if within commit critical section */
-
- uint8 vacuumFlags; /* vacuum-related flags, see above */
-
/*
* While in hot standby mode, shows that a conflict signal has been sent
* for the current transaction. Set/cleared while holding ProcArrayLock,
@@ -160,7 +145,35 @@ struct PGPROC
extern PGDLLIMPORT PGPROC *MyProc;
+extern PGDLLIMPORT struct PGPROC_MINIMAL *MyProcMinimal;
+
+/*
+ * A minimal part of the PGPROC. We store these members out of the main PGPROC
+ * structure since they are very heavily accessed members and usually in a loop
+ * for all active PGPROCs. Storing them in a separate array ensures that these
+ * members can be very effeciently accessed with minimum cache misses. On a
+ * large multiprocessor system, this can show a significant performance
+ * improvement.
+ */
+struct PGPROC_MINIMAL
+{
+ TransactionId xid; /* id of top-level transaction currently being
+ * executed by this proc, if running and XID
+ * is assigned; else InvalidTransactionId */
+ TransactionId xmin; /* minimal running XID as it was when we were
+ * starting our xact, excluding LAZY VACUUM:
+ * vacuum must not remove tuples deleted by
+ * xid >= xmin ! */
+
+ uint8 vacuumFlags; /* vacuum-related flags, see above */
+ bool overflowed;
+ bool inCommit; /* true if within commit critical section */
+
+ uint8 nxids;
+};
+
+typedef struct PGPROC_MINIMAL PGPROC_MINIMAL;
/*
* There is one ProcGlobal struct for the whole database cluster.
@@ -169,6 +182,8 @@ typedef struct PROC_HDR
{
/* Array of PGPROC structures (not including dummies for prepared txns) */
PGPROC *allProcs;
+ /* Array of PGPROC_MINIMAL structures (not including dummies for prepared txns */
+ PGPROC_MINIMAL *allProcs_Minimal;
/* Length of allProcs array */
uint32 allProcCount;
/* Head of list of free PGPROC structures */
@@ -186,6 +201,8 @@ typedef struct PROC_HDR
extern PROC_HDR *ProcGlobal;
+extern PGPROC *PreparedXactProcs;
+
/*
* We set aside some extra PGPROC structures for auxiliary processes,
* ie things that aren't full-fledged backends but need shmem access.
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 1a48485..4bd783f 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1304,8 +1304,8 @@ GetSnapshotData(Snapshot snapshot)
snapshot->xip[count++] = xid;
if (TransactionIdPrecedes(xid, xmin))
xmin = xid;
- }
+ /* XXX indent */
/*
* Save subtransaction XIDs if possible (if we've already
* overflowed, there's no point). Note that the subxact XIDs must
@@ -1338,6 +1338,7 @@ GetSnapshotData(Snapshot snapshot)
}
}
}
+ }
}
}
else
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers