On Sat, Sep 3, 2016 at 10:26 PM, Michael Paquier
<michael.paqu...@gmail.com> wrote:
> On Fri, Sep 2, 2016 at 5:06 AM, Simon Riggs <si...@2ndquadrant.com> wrote:
>> On 13 April 2016 at 15:31, Stas Kelvich <s.kelv...@postgrespro.ru> wrote:
>>
>>> Fixed patch attached. There already was infrastructure to skip currently
>>> held locks during replay of standby_redo() and I’ve extended that with 
>>> check for
>>> prepared xids.
>>
>> Please confirm that everything still works on current HEAD for the new
>> CF, so review can start.
>
> The patch does not apply cleanly. Stas, could you rebase? I am
> switching the patch to "waiting on author" for now.

So, I have just done the rebase myself and did an extra round of
reviews of the patch. Here are couple of comments after this extra
lookup.

LockGXactByXid() is aimed to be used only in recovery, so it seems
adapted to be to add an assertion with RecoveryInProgress(). Using
this routine in non-recovery code paths is risky because it assumes
that a PGXACT could be missing, which is fine in recovery as prepared
transactions could be moved to twophase files because of a checkpoint,
but not in normal cases. We could also add an assertion of the kind
gxact->locking_backend == InvalidBackendId before locking the PGXACT
but as this routine is just normally used by the startup process (in
non-standalone mode!) that's fine without.

The handling of invalidation messages and removal of relfilenodes done
in FinishGXact can be grouped together, checking only once for
!RecoveryInProgress().

+ *
+ *     The same procedure happens during replication and crash recovery.
  *
"during WAL replay" is more generic and applies here.

+
+next_file:
+       continue;
+
That can be removed and replaced by a "continue;".

+   /*
+    * At first check prepared tx that wasn't yet moved to disk.
+    */
+   LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+   for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+   {
+       GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+       PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+       if (TransactionIdEquals(pgxact->xid, xid))
+       {
+           LWLockRelease(TwoPhaseStateLock);
+           return true;
+       }
+   }
+   LWLockRelease(TwoPhaseStateLock);
This overlaps with TwoPhaseGetGXact but I'd rather keep both things
separated: it does not seem worth complicating the existing interface,
and putting that in cache during recovery has no meaning.

I have also reworked the test format, and fixed many typos and grammar
problems in the patch as well as in the tests.

After review the result is attached. Perhaps a committer could get a look at it?
-- 
Michael
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 9f55adc..eb7c339 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -45,8 +45,8 @@
  *		  fsynced
  *		* If COMMIT happens after checkpoint then backend reads state data from
  *		  files
- *		* In case of crash replay will move data from xlog to files, if that
- *		  hasn't happened before. XXX TODO - move to shmem in replay also
+ *
+ *		The same procedure happens during WAL replay.
  *
  *-------------------------------------------------------------------------
  */
@@ -578,6 +578,45 @@ LockGXact(const char *gid, Oid user)
 }
 
 /*
+ * LockGXactByXid
+ *
+ * Find prepared transaction by xid and lock corresponding GXACT.
+ * This is used during recovery as an alternative to LockGXact(), and
+ * should only be used in recovery. No entries found means that a checkpoint
+ * has moved the searched prepared transaction data to a twophase file.
+ *
+ * Returns the transaction data if found, or NULL if nothing has been locked.
+ */
+static GlobalTransaction
+LockGXactByXid(TransactionId xid)
+{
+	int		i;
+	GlobalTransaction gxact = NULL;
+
+	Assert(RecoveryInProgress());
+
+	LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
+	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		PGXACT	   *pgxact;
+
+		gxact = TwoPhaseState->prepXacts[i];
+		pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+		if (TransactionIdEquals(xid, pgxact->xid))
+		{
+			/* ok to lock it */
+			gxact->locking_backend = MyBackendId;
+			MyLockedGxact = gxact;
+			break;
+		}
+	}
+	LWLockRelease(TwoPhaseStateLock);
+
+	return gxact;
+}
+
+/*
  * RemoveGXact
  *		Remove the prepared transaction from the shared memory array.
  *
@@ -1241,9 +1280,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
  * twophase files and ReadTwoPhaseFile should be used instead.
  *
- * Note clearly that this function accesses WAL during normal operation, similarly
- * to the way WALSender or Logical Decoding would do. It does not run during
- * crash recovery or standby processing.
+ * Note that this function accesses WAL not only during recovery but also
+ * during normal operation, similarly to the way WALSender or Logical
+ * Decoding would do.
  */
 static void
 XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
@@ -1252,8 +1291,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
 	XLogReaderState *xlogreader;
 	char	   *errormsg;
 
-	Assert(!RecoveryInProgress());
-
 	xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
 	if (!xlogreader)
 		ereport(ERROR,
@@ -1296,13 +1333,35 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
 	char	   *buf;
 	TwoPhaseFileHeader *hdr;
 	bool		result;
+	int			i;
 
 	Assert(TransactionIdIsValid(xid));
 
 	if (max_prepared_xacts <= 0)
 		return false;			/* nothing to do */
 
-	/* Read and validate file */
+	/*
+	 * First check if this prepared transaction has its information in
+	 * shared memory, and use it.
+	 */
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+		PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+		if (TransactionIdEquals(pgxact->xid, xid))
+		{
+			LWLockRelease(TwoPhaseStateLock);
+			return true;
+		}
+	}
+	LWLockRelease(TwoPhaseStateLock);
+
+	/*
+	 * Nothing in shared memory? Then just read its corresponding twophase
+	 * file and validate it.
+	 */
 	buf = ReadTwoPhaseFile(xid, false);
 	if (buf == NULL)
 		return false;
@@ -1316,12 +1375,17 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
 }
 
 /*
- * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
+ * FinishGXact
+ *
+ * Do the actual finish of COMMIT/ABORT PREPARED. Calls is responsible
+ * for locking the transaction this routine is working on.
+ *
+ * This function can be called during replay to clean memory state for
+ * previously prepared xact. In that case actions are the same as in
+ * normal operations but without any writes to WAL or files.
  */
-void
-FinishPreparedTransaction(const char *gid, bool isCommit)
+static void FinishGXact(GlobalTransaction gxact, bool isCommit)
 {
-	GlobalTransaction gxact;
 	PGPROC	   *proc;
 	PGXACT	   *pgxact;
 	TransactionId xid;
@@ -1332,16 +1396,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	TransactionId *children;
 	RelFileNode *commitrels;
 	RelFileNode *abortrels;
-	RelFileNode *delrels;
-	int			ndelrels;
 	SharedInvalidationMessage *invalmsgs;
 	int			i;
 
-	/*
-	 * Validate the GID, and lock the GXACT to ensure that two backends do not
-	 * try to commit the same GID at once.
-	 */
-	gxact = LockGXact(gid, GetUserId());
 	proc = &ProcGlobal->allProcs[gxact->pgprocno];
 	pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 	xid = pgxact->xid;
@@ -1383,17 +1440,23 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	 * TransactionIdIsInProgress will stop saying the prepared xact is in
 	 * progress), then run the post-commit or post-abort callbacks. The
 	 * callbacks will release the locks the transaction held.
+	 *
+	 * In recovery nothing needs to happen here as this generates WAL
+	 * records.
 	 */
-	if (isCommit)
-		RecordTransactionCommitPrepared(xid,
+	if (!RecoveryInProgress())
+	{
+		if (isCommit)
+			RecordTransactionCommitPrepared(xid,
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ninvalmsgs, invalmsgs,
 										hdr->initfileinval);
-	else
-		RecordTransactionAbortPrepared(xid,
+		else
+			RecordTransactionAbortPrepared(xid,
 									   hdr->nsubxacts, children,
 									   hdr->nabortrels, abortrels);
+	}
 
 	ProcArrayRemove(proc, latestXid);
 
@@ -1408,41 +1471,50 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	gxact->valid = false;
 
 	/*
-	 * We have to remove any files that were supposed to be dropped. For
-	 * consistency with the regular xact.c code paths, must do this before
-	 * releasing locks, so do it before running the callbacks.
-	 *
-	 * NB: this code knows that we couldn't be dropping any temp rels ...
+	 * Perform actions needed only during normal operation, but *not* recovery.
 	 */
-	if (isCommit)
-	{
-		delrels = commitrels;
-		ndelrels = hdr->ncommitrels;
-	}
-	else
+	if (!RecoveryInProgress())
 	{
-		delrels = abortrels;
-		ndelrels = hdr->nabortrels;
-	}
-	for (i = 0; i < ndelrels; i++)
-	{
-		SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
+		RelFileNode *delrels;
+		int			ndelrels;
 
-		smgrdounlink(srel, false);
-		smgrclose(srel);
-	}
+		/*
+		 * We have to remove any files that were supposed to be dropped. For
+		 * consistency with the regular xact.c code paths, must do this before
+		 * releasing locks, so do it before running the callbacks.
+		 *
+		 * NB: this code knows that we couldn't be dropping any temp rels ...
+		 */
+		if (isCommit)
+		{
+			delrels = commitrels;
+			ndelrels = hdr->ncommitrels;
+		}
+		else
+		{
+			delrels = abortrels;
+			ndelrels = hdr->nabortrels;
+		}
+		for (i = 0; i < ndelrels; i++)
+		{
+			SMgrRelation srel = smgropen(delrels[i], InvalidBackendId);
 
-	/*
-	 * Handle cache invalidation messages.
-	 *
-	 * Relcache init file invalidation requires processing both before and
-	 * after we send the SI messages. See AtEOXact_Inval()
-	 */
-	if (hdr->initfileinval)
-		RelationCacheInitFilePreInvalidate();
-	SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
-	if (hdr->initfileinval)
-		RelationCacheInitFilePostInvalidate();
+			smgrdounlink(srel, false);
+			smgrclose(srel);
+		}
+
+		/*
+		 * Handle cache invalidation messages.
+		 *
+		 * Relcache init file invalidation requires processing both before and
+		 * after we send the SI messages. See AtEOXact_Inval()
+		 */
+		if (hdr->initfileinval)
+			RelationCacheInitFilePreInvalidate();
+		SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
+		if (hdr->initfileinval)
+			RelationCacheInitFilePostInvalidate();
+	}
 
 	/* And now do the callbacks */
 	if (isCommit)
@@ -1468,6 +1540,50 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 }
 
 /*
+ * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
+ */
+void
+FinishPreparedTransaction(const char *gid, bool isCommit)
+{
+	GlobalTransaction gxact;
+
+	/*
+	 * Validate the GID, and lock the GXACT to ensure that two backends do not
+	 * try to commit the same GID at once.
+	 */
+	gxact = LockGXact(gid, GetUserId());
+	FinishGXact(gxact, isCommit);
+}
+
+/*
+ * XlogRedoFinishPrepared()
+ *
+ * This function is called during recovery for WAL records working on COMMIT
+ * PREPARED or ABORT PREPARED. That function cleans up memory state that was
+ * created while replaying its corresponding PREPARE record if its information
+ * was not on disk in a twophase file.
+ */
+void
+XlogRedoFinishPrepared(TransactionId xid, bool isCommit)
+{
+	GlobalTransaction gxact;
+
+	Assert(RecoveryInProgress());
+
+	gxact = LockGXactByXid(xid);
+
+	/*
+	 * If requested xid was not found that means that the PREPARE record was
+	 * moved to a twophase file because of a checkpoint or a restart point.
+	 * There is nothing else to do in this case.
+	 */
+	if (!gxact)
+		return;
+
+	FinishGXact(gxact, isCommit);
+}
+
+/*
  * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
  */
 static void
@@ -1690,7 +1806,47 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 	TransactionId *xids = NULL;
 	int			nxids = 0;
 	int			allocsize = 0;
+	int			i;
+
+	/*
+	 * We need to check the PGXACT array for prepared transactions that doesn't
+	 * have any state file in case of a slave restart with the master being off.
+	 */
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+	for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+		PGXACT	   *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+		if (!gxact->valid)
+			continue;
+
+		if (TransactionIdPrecedes(pgxact->xid, result))
+			result = pgxact->xid;
+
+		if (xids_p)
+		{
+			if (nxids == allocsize)
+			{
+				if (nxids == 0)
+				{
+					allocsize = 10;
+					xids = palloc(allocsize * sizeof(TransactionId));
+				}
+				else
+				{
+					allocsize = allocsize * 2;
+					xids = repalloc(xids, allocsize * sizeof(TransactionId));
+				}
+			}
+			xids[nxids++] = pgxact->xid;
+		}
+	}
+	LWLockRelease(TwoPhaseStateLock);
 
+	/*
+	 * And now scan files in pg_twophase directory
+	 */
 	cldir = AllocateDir(TWOPHASE_DIR);
 	while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
 	{
@@ -1701,7 +1857,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 			char	   *buf;
 			TwoPhaseFileHeader *hdr;
 			TransactionId *subxids;
-			int			i;
 
 			xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
 
@@ -1809,102 +1964,105 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
 }
 
 /*
- * StandbyRecoverPreparedTransactions
+ * RecoverPreparedFromBuffer
  *
- * Scan the pg_twophase directory and setup all the required information to
- * allow standby queries to treat prepared transactions as still active.
- * This is never called at the end of recovery - we use
- * RecoverPreparedTransactions() at that point.
+ * Parse data in given buffer (that can be a pointer to WAL record holding
+ * this information or data read from a twophase file) and build the
+ * shared-memory state for that prepared transaction.
  *
- * Currently we simply call SubTransSetParent() for any subxids of prepared
- * transactions. If overwriteOK is true, it's OK if some XIDs have already
- * been marked in pg_subtrans.
+ * Caller is responsible for calling MarkAsPrepared() on the returned gxact.
  */
-void
-StandbyRecoverPreparedTransactions(bool overwriteOK)
+static GlobalTransaction
+RecoverPreparedFromBuffer(char *buf, bool forceOverwriteOK)
 {
-	DIR		   *cldir;
-	struct dirent *clde;
+	char			*bufptr;
+	const char		*gid;
+	TransactionId	*subxids;
+	bool			overwriteOK = false;
+	int				i;
+	GlobalTransaction gxact;
+	TwoPhaseFileHeader	*hdr;
 
-	cldir = AllocateDir(TWOPHASE_DIR);
-	while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
-	{
-		if (strlen(clde->d_name) == 8 &&
-			strspn(clde->d_name, "0123456789ABCDEF") == 8)
-		{
-			TransactionId xid;
-			char	   *buf;
-			TwoPhaseFileHeader *hdr;
-			TransactionId *subxids;
-			int			i;
+	/* Deconstruct header */
+	hdr = (TwoPhaseFileHeader *) buf;
+	bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
+	gid = (const char *) bufptr;
+	bufptr += MAXALIGN(hdr->gidlen);
+	subxids = (TransactionId *) bufptr;
+	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
 
-			xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
+	/*
+	 * It's possible that SubTransSetParent has been set before, if
+	 * the prepared transaction generated xid assignment records. Test
+	 * here must match one used in AssignTransactionId().
+	 */
+	if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
+						 XLogLogicalInfoActive()))
+		overwriteOK = true;
 
-			/* Already processed? */
-			if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
-			{
-				ereport(WARNING,
-						(errmsg("removing stale two-phase state file \"%s\"",
-								clde->d_name)));
-				RemoveTwoPhaseFile(xid, true);
-				continue;
-			}
+	/*
+	 * Caller can also force overwriteOK.
+	 */
+	if (forceOverwriteOK)
+		overwriteOK = true;
 
-			/* Read and validate file */
-			buf = ReadTwoPhaseFile(xid, true);
-			if (buf == NULL)
-			{
-				ereport(WARNING,
-					  (errmsg("removing corrupt two-phase state file \"%s\"",
-							  clde->d_name)));
-				RemoveTwoPhaseFile(xid, true);
-				continue;
-			}
+	/*
+	 * Reconstruct subtrans state for the transaction --- needed
+	 * because pg_subtrans is not preserved over a restart.  Note that
+	 * we are linking all the subtransactions directly to the
+	 * top-level XID; there may originally have been a more complex
+	 * hierarchy, but there's no need to restore that exactly.
+	 */
+	for (i = 0; i < hdr->nsubxacts; i++)
+		SubTransSetParent(subxids[i], hdr->xid, overwriteOK);
 
-			/* Deconstruct header */
-			hdr = (TwoPhaseFileHeader *) buf;
-			if (!TransactionIdEquals(hdr->xid, xid))
-			{
-				ereport(WARNING,
-					  (errmsg("removing corrupt two-phase state file \"%s\"",
-							  clde->d_name)));
-				RemoveTwoPhaseFile(xid, true);
-				pfree(buf);
-				continue;
-			}
+	/*
+	 * Recreate its GXACT and dummy PGPROC
+	 */
+	gxact = MarkAsPreparing(hdr->xid, gid,
+							hdr->prepared_at,
+							hdr->owner, hdr->database);
+	GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
 
-			/*
-			 * Examine subtransaction XIDs ... they should all follow main
-			 * XID.
-			 */
-			subxids = (TransactionId *)
-				(buf + MAXALIGN(sizeof(TwoPhaseFileHeader)));
-			for (i = 0; i < hdr->nsubxacts; i++)
-			{
-				TransactionId subxid = subxids[i];
+	/*
+	 * Recover other state (notably locks) using resource managers
+	 */
+	ProcessRecords(bufptr, hdr->xid, twophase_recover_callbacks);
 
-				Assert(TransactionIdFollows(subxid, xid));
-				SubTransSetParent(xid, subxid, overwriteOK);
-			}
-		}
-	}
-	FreeDir(cldir);
+	/*
+	 * Release locks held by the standby process after we process each
+	 * prepared transaction. As a result, we don't need too many
+	 * additional locks at any one time.
+	 */
+	if (InHotStandby)
+		StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids);
+
+	/*
+	 * We're done with recovering this transaction. Clear
+	 * MyLockedGxact, like we do in PrepareTransaction() during normal
+	 * operation.
+	 */
+	PostPrepare_Twophase();
+
+	return gxact;
 }
 
 /*
- * RecoverPreparedTransactions
+ * RecoverPreparedFromFiles
  *
  * Scan the pg_twophase directory and reload shared-memory state for each
  * prepared transaction (reacquire locks, etc).  This is run during database
  * startup.
  */
 void
-RecoverPreparedTransactions(void)
+RecoverPreparedFromFiles(bool forceOverwriteOK)
 {
 	char		dir[MAXPGPATH];
 	DIR		   *cldir;
 	struct dirent *clde;
-	bool		overwriteOK = false;
 
 	snprintf(dir, MAXPGPATH, "%s", TWOPHASE_DIR);
 
@@ -1916,15 +2074,30 @@ RecoverPreparedTransactions(void)
 		{
 			TransactionId xid;
 			char	   *buf;
-			char	   *bufptr;
-			TwoPhaseFileHeader *hdr;
-			TransactionId *subxids;
 			GlobalTransaction gxact;
-			const char *gid;
 			int			i;
+			bool		recovered = false;
+			PGXACT	   *pgxact;
 
 			xid = (TransactionId) strtoul(clde->d_name, NULL, 16);
 
+			/* Already recovered from WAL? */
+			LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+			for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+			{
+				gxact = TwoPhaseState->prepXacts[i];
+				pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
+
+				if (TransactionIdEquals(xid, pgxact->xid))
+				{
+					recovered = true;
+					break;
+				}
+			}
+			LWLockRelease(TwoPhaseStateLock);
+			if (recovered)
+				continue;
+
 			/* Already processed? */
 			if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
 			{
@@ -1949,73 +2122,39 @@ RecoverPreparedTransactions(void)
 			ereport(LOG,
 					(errmsg("recovering prepared transaction %u", xid)));
 
-			/* Deconstruct header */
-			hdr = (TwoPhaseFileHeader *) buf;
-			Assert(TransactionIdEquals(hdr->xid, xid));
-			bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
-			gid = (const char *) bufptr;
-			bufptr += MAXALIGN(hdr->gidlen);
-			subxids = (TransactionId *) bufptr;
-			bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
-			bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
-			bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
-			bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
-
-			/*
-			 * It's possible that SubTransSetParent has been set before, if
-			 * the prepared transaction generated xid assignment records. Test
-			 * here must match one used in AssignTransactionId().
-			 */
-			if (InHotStandby && (hdr->nsubxacts >= PGPROC_MAX_CACHED_SUBXIDS ||
-								 XLogLogicalInfoActive()))
-				overwriteOK = true;
-
-			/*
-			 * Reconstruct subtrans state for the transaction --- needed
-			 * because pg_subtrans is not preserved over a restart.  Note that
-			 * we are linking all the subtransactions directly to the
-			 * top-level XID; there may originally have been a more complex
-			 * hierarchy, but there's no need to restore that exactly.
-			 */
-			for (i = 0; i < hdr->nsubxacts; i++)
-				SubTransSetParent(subxids[i], xid, overwriteOK);
-
-			/*
-			 * Recreate its GXACT and dummy PGPROC
-			 */
-			gxact = MarkAsPreparing(xid, gid,
-									hdr->prepared_at,
-									hdr->owner, hdr->database);
+			gxact = RecoverPreparedFromBuffer(buf, forceOverwriteOK);
 			gxact->ondisk = true;
-			GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
 			MarkAsPrepared(gxact);
 
-			/*
-			 * Recover other state (notably locks) using resource managers
-			 */
-			ProcessRecords(bufptr, xid, twophase_recover_callbacks);
-
-			/*
-			 * Release locks held by the standby process after we process each
-			 * prepared transaction. As a result, we don't need too many
-			 * additional locks at any one time.
-			 */
-			if (InHotStandby)
-				StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
-
-			/*
-			 * We're done with recovering this transaction. Clear
-			 * MyLockedGxact, like we do in PrepareTransaction() during normal
-			 * operation.
-			 */
-			PostPrepare_Twophase();
-
 			pfree(buf);
 		}
 	}
 	FreeDir(cldir);
 }
 
+
+/*
+ * RecoverPreparedFromXLOG
+ *
+ * To avoid the creation of twophase state files during replay we register
+ * WAL records for prepared transactions in shared memory in the same way
+ * during normal operations. If replay faces a WAL record for a COMMIT
+ * PREPARED transaction before a checkpoint or restartpoint happens then
+ * no files are used, limiting the I/O impact of such operations during
+ * recovery.
+ */
+void
+RecoverPreparedFromXLOG(XLogReaderState *record)
+{
+	GlobalTransaction gxact;
+
+	gxact = RecoverPreparedFromBuffer((char *) XLogRecGetData(record), false);
+	gxact->prepare_start_lsn = record->ReadRecPtr;
+	gxact->prepare_end_lsn = record->EndRecPtr;
+	MarkAsPrepared(gxact);
+}
+
+
 /*
  *	RecordTransactionCommitPrepared
  *
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e11b229..6a40425 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5602,7 +5602,7 @@ xact_redo(XLogReaderState *record)
 			Assert(TransactionIdIsValid(parsed.twophase_xid));
 			xact_redo_commit(&parsed, parsed.twophase_xid,
 							 record->EndRecPtr, XLogRecGetOrigin(record));
-			RemoveTwoPhaseFile(parsed.twophase_xid, false);
+			XlogRedoFinishPrepared(parsed.twophase_xid, true);
 		}
 	}
 	else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
@@ -5622,14 +5622,12 @@ xact_redo(XLogReaderState *record)
 		{
 			Assert(TransactionIdIsValid(parsed.twophase_xid));
 			xact_redo_abort(&parsed, parsed.twophase_xid);
-			RemoveTwoPhaseFile(parsed.twophase_xid, false);
+			XlogRedoFinishPrepared(parsed.twophase_xid, false);
 		}
 	}
 	else if (info == XLOG_XACT_PREPARE)
 	{
-		/* the record contents are exactly the 2PC file */
-		RecreateTwoPhaseFile(XLogRecGetXid(record),
-						  XLogRecGetData(record), XLogRecGetDataLen(record));
+		RecoverPreparedFromXLOG(record);
 	}
 	else if (info == XLOG_XACT_ASSIGNMENT)
 	{
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2189c22..613097f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6704,7 +6704,7 @@ StartupXLOG(void)
 
 				ProcArrayApplyRecoveryInfo(&running);
 
-				StandbyRecoverPreparedTransactions(false);
+				RecoverPreparedFromFiles(false);
 			}
 		}
 
@@ -7463,7 +7463,7 @@ StartupXLOG(void)
 	TrimMultiXact();
 
 	/* Reload shared-memory state for prepared transactions */
-	RecoverPreparedTransactions();
+	RecoverPreparedFromFiles(false);
 
 	/*
 	 * Shutdown the recovery environment. This must occur after
@@ -9377,7 +9377,7 @@ xlog_redo(XLogReaderState *record)
 
 			ProcArrayApplyRecoveryInfo(&running);
 
-			StandbyRecoverPreparedTransactions(true);
+			RecoverPreparedFromFiles(true);
 		}
 
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 547f1a8..5ea2530 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -608,7 +608,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
 	/* Already processed? */
 	if (!TransactionIdIsValid(xid) ||
 		TransactionIdDidCommit(xid) ||
-		TransactionIdDidAbort(xid))
+		TransactionIdDidAbort(xid) ||
+		StandbyTransactionIdIsPrepared(xid))
 		return;
 
 	elog(trace_recovery(DEBUG4),
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index b7ce0c6..416ef5e 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -17,6 +17,7 @@
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "access/xlogreader.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
@@ -46,8 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 							int *nxids_p);
-extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
-extern void RecoverPreparedTransactions(void);
+extern void RecoverPreparedFromFiles(bool overwriteOK);
+extern void RecoverPreparedFromXLOG(XLogReaderState *record);
 
 extern void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
 extern void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
@@ -56,4 +57,5 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
 
 extern void FinishPreparedTransaction(const char *gid, bool isCommit);
 
+extern void XlogRedoFinishPrepared(TransactionId xid, bool isCommit);
 #endif   /* TWOPHASE_H */
diff --git a/src/test/recovery/t/008_twophase.pl b/src/test/recovery/t/008_twophase.pl
new file mode 100644
index 0000000..3c203cd
--- /dev/null
+++ b/src/test/recovery/t/008_twophase.pl
@@ -0,0 +1,249 @@
+# Tests dedicated to two-phase commit in recovery
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 12;
+
+# Setup master node
+my $node_master = get_new_node("master");
+$node_master->init(allows_streaming => 1);
+$node_master->append_conf('postgresql.conf', qq(
+	max_prepared_transactions = 10
+));
+$node_master->start;
+$node_master->backup('master_backup');
+$node_master->psql('postgres', "create table t(id int)");
+
+# Setup master node
+my $node_slave = get_new_node('slave');
+$node_slave->init_from_backup($node_master, 'master_backup', has_streaming => 1);
+$node_slave->start;
+
+# Switch to synchronous replication
+$node_master->append_conf('postgresql.conf', qq(
+	synchronous_standby_names = '*'
+));
+$node_master->psql('postgres', "select pg_reload_conf()");
+
+my $psql_out = '';
+my $psql_rc = '';
+
+###############################################################################
+# Check that we can commit and abort tx after soft restart.
+# Here checkpoint happens before shutdown and no WAL replay will occur at next
+# startup. In this case postgres re-create shared-memory state from twophase
+# files.
+###############################################################################
+
+$node_master->psql('postgres', "
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';
+	begin;
+	insert into t values (142);
+	prepare transaction 'y';");
+$node_master->stop;
+$node_master->start;
+
+$psql_rc = $node_master->psql('postgres', "commit prepared 'x'");
+is($psql_rc, '0', 'Commit prepared transaction after restart.');
+
+$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'");
+is($psql_rc, '0', 'Rollback prepared transaction after restart.');
+
+###############################################################################
+# Check that we can commit and abort after hard restart.
+# At next startup, WAL replay will re-create shared memory state for preaped
+# using dedicated WAL records.
+###############################################################################
+
+$node_master->psql('postgres', "
+	checkpoint;
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';
+	begin;
+	insert into t values (142);
+	prepare transaction 'y';");
+$node_master->teardown_node;
+$node_master->start;
+
+$psql_rc = $node_master->psql('postgres', "commit prepared 'x'");
+is($psql_rc, '0', 'Commit prepared tx after teardown.');
+
+$psql_rc = $node_master->psql('postgres', "rollback prepared 'y'");
+is($psql_rc, '0', 'Rollback prepared transaction after teardown.');
+
+###############################################################################
+# Check that WAL replay can handle several transactions with same name GID.
+###############################################################################
+
+$node_master->psql('postgres', "
+	checkpoint;
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';
+	commit prepared 'x';
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';");
+$node_master->teardown_node;
+$node_master->start;
+
+$psql_rc = $node_master->psql('postgres', "commit prepared 'x'");
+is($psql_rc, '0', 'Replay several transactions with same GID.');
+
+###############################################################################
+# Check that WAL replay cleans up its shared memory state and releases locks
+# while replaying transaction commits.
+###############################################################################
+
+$node_master->psql('postgres', "
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';
+	commit prepared 'x';");
+$node_master->teardown_node;
+$node_master->start;
+$psql_rc = $node_master->psql('postgres', "begin;
+	insert into t values (42);
+	-- This prepare can fail due to conflicting GID or locks conflicts if
+	-- replay did not fully cleanup its state on previous commit.
+	prepare transaction 'x';");
+is($psql_rc, '0', "Cleanup of shared memory state for 2PC commit");
+
+$node_master->psql('postgres', "commit prepared 'x'");
+
+###############################################################################
+# Check that WAL replay will cleanup its shared memory state on running slave.
+###############################################################################
+
+$node_master->psql('postgres', "
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';
+	commit prepared 'x';");
+$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;",
+				  stdout => \$psql_out);
+is($psql_out, '0',
+   "Cleanup of shared memory state on running standby without checkpoint.");
+
+###############################################################################
+# Same as in previous case, but let's force checkpoint on slave between
+# prepare and commit to use on-disk twophase files.
+###############################################################################
+
+$node_master->psql('postgres', "
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';");
+$node_slave->psql('postgres', "checkpoint;");
+$node_master->psql('postgres', "commit prepared 'x';");
+$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts;",
+				  stdout => \$psql_out);
+is($psql_out, '0',
+   "Cleanup of shared memory state on running standby after checkpoint.");
+
+###############################################################################
+# Check that prepared transactions can be committed on promoted slave.
+###############################################################################
+
+$node_master->psql('postgres', "
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';");
+$node_master->teardown_node;
+$node_slave->promote;
+$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true");
+
+$psql_rc = $node_slave->psql('postgres', "commit prepared 'x';");
+is($psql_rc, '0', "Restore of prepared transaction on promoted slave.");
+
+# change roles
+($node_master, $node_slave) = ($node_slave, $node_master);
+$node_slave->enable_streaming($node_master);
+$node_slave->append_conf('recovery.conf', qq(
+recovery_target_timeline='latest'
+));
+$node_slave->start;
+
+###############################################################################
+# Check that prepared transactions are replayed after soft restart of standby
+# while master is down. Since standby knows that master is down it uses
+# different code path on start to be sure that the status of transactions is
+# consistent.
+###############################################################################
+
+$node_master->psql('postgres', "
+	begin;
+	insert into t values (42);
+	prepare transaction 'x';");
+$node_master->stop;
+$node_slave->restart;
+$node_slave->promote;
+$node_slave->poll_query_until('postgres', "SELECT pg_is_in_recovery() <> true");
+
+$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts",
+				  stdout => \$psql_out);
+is($psql_out, '1',
+   "Restore prepared transactions from files with master down.");
+
+# restore state
+($node_master, $node_slave) = ($node_slave, $node_master);
+$node_slave->enable_streaming($node_master);
+$node_slave->append_conf('recovery.conf', qq(
+recovery_target_timeline='latest'
+));
+$node_slave->start;
+$node_master->psql('postgres', "commit prepared 'x'");
+
+###############################################################################
+# Check that prepared transactions are correctly replayed after slave hard
+# restart while master is down.
+###############################################################################
+
+$node_master->psql('postgres', "
+	begin;
+	insert into t values (242);
+	prepare transaction 'x';
+	");
+$node_master->stop;
+$node_slave->teardown_node;
+$node_slave->start;
+$node_slave->promote;
+$node_slave->poll_query_until('postgres',
+							  "SELECT pg_is_in_recovery() <> true");
+
+$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts",
+				  stdout => \$psql_out);
+is($psql_out, '1',
+   "Restore prepared transactions from records with master down.");
+
+# restore state
+($node_master, $node_slave) = ($node_slave, $node_master);
+$node_slave->enable_streaming($node_master);
+$node_slave->append_conf('recovery.conf', qq(
+recovery_target_timeline='latest'
+));
+$node_slave->start;
+$node_master->psql('postgres', "commit prepared 'x'");
+
+
+###############################################################################
+# Check for a lock confcict between prepared tx with DDL inside and replay of
+# XLOG_STANDBY_LOCK wal record.
+###############################################################################
+
+$node_master->psql('postgres', "
+	begin;
+	create table t2(id int);
+	prepare transaction 'x';
+	-- checkpoint will issue XLOG_STANDBY_LOCK that can conflict with lock
+	-- held by 'create table' statement
+	checkpoint;
+	commit prepared 'x';");
+
+$node_slave->psql('postgres', "select count(*) from pg_prepared_xacts",
+				  stdout => \$psql_out);
+is($psql_out, '0', "Replay prepared transaction with DDL.");
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to