To avoid complicating logic we store both, the toplevel and the subxids, in
->xip, first ->xcnt toplevel ones, and then ->subxcnt subxids.
Also skip logging any subxids if the snapshot is suboverflowed, they aren't
useful in that case anyway.

This allows to make some operations cheaper and it allows faster startup for
the future logical decoding feature because that doesn't care about
subtransactions/suboverflow'edness.
---
 src/backend/access/transam/xlog.c   |  2 ++
 src/backend/storage/ipc/procarray.c | 65 ++++++++++++++++++++++++-------------
 src/backend/storage/ipc/standby.c   |  8 +++--
 src/include/storage/standby.h       |  2 ++
 4 files changed, 52 insertions(+), 25 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1faf666..1749f46 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5629,6 +5629,7 @@ StartupXLOG(void)
 				 * subxids are listed with their parent prepared transactions.
 				 */
 				running.xcnt = nxids;
+				running.subxcnt = 0;
 				running.subxid_overflow = false;
 				running.nextXid = checkPoint.nextXid;
 				running.oldestRunningXid = oldestActiveXID;
@@ -7813,6 +7814,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 			 * with their parent prepared transactions.
 			 */
 			running.xcnt = nxids;
+			running.subxcnt = 0;
 			running.subxid_overflow = false;
 			running.nextXid = checkPoint.nextXid;
 			running.oldestRunningXid = oldestActiveXID;
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8c0d7b0..a98358d 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -501,6 +501,13 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	 * Remove stale transactions, if any.
 	 */
 	ExpireOldKnownAssignedTransactionIds(running->oldestRunningXid);
+
+	/*
+	 * Remove stale locks, if any.
+	 *
+	 * Locks are always assigned to the toplevel xid so we don't need to care
+	 * about subxcnt/subxids (and by extension not about ->suboverflowed).
+	 */
 	StandbyReleaseOldLocks(running->xcnt, running->xids);
 
 	/*
@@ -581,13 +588,13 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	 * Allocate a temporary array to avoid modifying the array passed as
 	 * argument.
 	 */
-	xids = palloc(sizeof(TransactionId) * running->xcnt);
+	xids = palloc(sizeof(TransactionId) * (running->xcnt + running->subxcnt));
 
 	/*
 	 * Add to the temp array any xids which have not already completed.
 	 */
 	nxids = 0;
-	for (i = 0; i < running->xcnt; i++)
+	for (i = 0; i < running->xcnt + running->subxcnt; i++)
 	{
 		TransactionId xid = running->xids[i];
 
@@ -1627,15 +1634,13 @@ GetRunningTransactionData(void)
 	oldestRunningXid = ShmemVariableCache->nextXid;
 
 	/*
-	 * Spin over procArray collecting all xids and subxids.
+	 * Spin over procArray collecting all xids
 	 */
 	for (index = 0; index < arrayP->numProcs; index++)
 	{
 		int			pgprocno = arrayP->pgprocnos[index];
-		volatile PGPROC *proc = &allProcs[pgprocno];
 		volatile PGXACT *pgxact = &allPgXact[pgprocno];
 		TransactionId xid;
-		int			nxids;
 
 		/* Fetch xid just once - see GetNewTransactionId */
 		xid = pgxact->xid;
@@ -1652,30 +1657,46 @@ GetRunningTransactionData(void)
 		if (TransactionIdPrecedes(xid, oldestRunningXid))
 			oldestRunningXid = xid;
 
-		/*
-		 * Save subtransaction XIDs. Other backends can't add or remove
-		 * entries while we're holding XidGenLock.
-		 */
-		nxids = pgxact->nxids;
-		if (nxids > 0)
-		{
-			memcpy(&xids[count], (void *) proc->subxids.xids,
-				   nxids * sizeof(TransactionId));
-			count += nxids;
-			subcount += nxids;
+		if (pgxact->overflowed)
+			suboverflowed = true;
+	}
 
-			if (pgxact->overflowed)
-				suboverflowed = true;
+	/*
+	 * Spin over procArray collecting all subxids, but only if there hasn't
+	 * been a suboverflow.
+	 */
+	if (!suboverflowed)
+	{
+		for (index = 0; index < arrayP->numProcs; index++)
+		{
+			int			pgprocno = arrayP->pgprocnos[index];
+			volatile PGPROC *proc = &allProcs[pgprocno];
+			volatile PGXACT *pgxact = &allPgXact[pgprocno];
+			int			nxids;
 
 			/*
-			 * Top-level XID of a transaction is always less than any of its
-			 * subxids, so we don't need to check if any of the subxids are
-			 * smaller than oldestRunningXid
+			 * Save subtransaction XIDs. Other backends can't add or remove
+			 * entries while we're holding XidGenLock.
 			 */
+			nxids = pgxact->nxids;
+			if (nxids > 0)
+			{
+				memcpy(&xids[count], (void *) proc->subxids.xids,
+					   nxids * sizeof(TransactionId));
+				count += nxids;
+				subcount += nxids;
+
+				/*
+				 * Top-level XID of a transaction is always less than any of
+				 * its subxids, so we don't need to check if any of the subxids
+				 * are smaller than oldestRunningXid
+				 */
+			}
 		}
 	}
 
-	CurrentRunningXacts->xcnt = count;
+	CurrentRunningXacts->xcnt = count - subcount;
+	CurrentRunningXacts->subxcnt = subcount;
 	CurrentRunningXacts->subxid_overflow = suboverflowed;
 	CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 905d331..0cab243 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -775,6 +775,7 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record)
 		RunningTransactionsData running;
 
 		running.xcnt = xlrec->xcnt;
+		running.subxcnt = xlrec->subxcnt;
 		running.subxid_overflow = xlrec->subxid_overflow;
 		running.nextXid = xlrec->nextXid;
 		running.latestCompletedXid = xlrec->latestCompletedXid;
@@ -942,6 +943,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	XLogRecPtr	recptr;
 
 	xlrec.xcnt = CurrRunningXacts->xcnt;
+	xlrec.subxcnt = CurrRunningXacts->subxcnt;
 	xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
 	xlrec.nextXid = CurrRunningXacts->nextXid;
 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
@@ -957,7 +959,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	{
 		rdata[0].next = &(rdata[1]);
 		rdata[1].data = (char *) CurrRunningXacts->xids;
-		rdata[1].len = xlrec.xcnt * sizeof(TransactionId);
+		rdata[1].len = (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId);
 		rdata[1].buffer = InvalidBuffer;
 		lastrdata = 1;
 	}
@@ -976,8 +978,8 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 			 CurrRunningXacts->nextXid);
 	else
 		elog(trace_recovery(DEBUG2),
-			 "snapshot of %u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
-			 CurrRunningXacts->xcnt,
+			 "snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
+			 CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
 			 (uint32) (recptr >> 32), (uint32) recptr,
 			 CurrRunningXacts->oldestRunningXid,
 			 CurrRunningXacts->latestCompletedXid,
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 7024fc4..f917b89 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -68,6 +68,7 @@ typedef struct xl_standby_locks
 typedef struct xl_running_xacts
 {
 	int			xcnt;			/* # of xact ids in xids[] */
+	int			subxcnt;			/* # of subxact ids in xids[] */
 	bool		subxid_overflow;	/* snapshot overflowed, subxids missing */
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
@@ -98,6 +99,7 @@ extern void standby_desc(StringInfo buf, uint8 xl_info, char *rec);
 typedef struct RunningTransactionsData
 {
 	int			xcnt;			/* # of xact ids in xids[] */
+	int			subxcnt;			/* # of subxact ids in xids[] */
 	bool		subxid_overflow;	/* snapshot overflowed, subxids missing */
 	TransactionId nextXid;		/* copy of ShmemVariableCache->nextXid */
 	TransactionId oldestRunningXid;		/* *not* oldestXmin */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to