From 446767f492ad9a893dc5aee8b71ef629e45d5504 Mon Sep 17 00:00:00 2001
From: ChangAo Chen <cca5507@qq.com>
Date: Sat, 10 Aug 2024 17:00:21 +0800
Subject: [PATCH v3 1/2] Track transactions committed in BUILDING_SNAPSHOT.

The historic snapshot previously didn't track transactions
committed in BUILDING_SNAPSHOT, and this might result in a
transaction taking an incorrect snapshot and logical decoding
being interrupted. So we must track these transactions.

Because the historic snapshot only tracks catalog modifying
transactions, we also need handle the xlog that might mark a
transaction as catalog modifying during BUILDING_SNAPSHOT.
---
 src/backend/replication/logical/decode.c | 48 +++++++++++++++++++++---
 1 file changed, 43 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index d687ceee33..ec4a43d5a7 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -206,12 +206,17 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	uint8		info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
 
 	/*
-	 * If the snapshot isn't yet fully built, we cannot decode anything, so
-	 * bail out.
+	 * Before BUILDING_SNAPSHOT, we cannot decode anything because we don't
+	 * have snapshot and the transaction will not be tracked by snapshot
+	 * anyway(see SnapBuildCommitTxn() for details), so bail out.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT)
 		return;
 
+	/*
+	 * Note that during BUILDING_SNAPSHOT, the xlog is only used to build
+	 * snapshot and will not be decoded because we don't have snapshot yet.
+	 */
 	switch (info)
 	{
 		case XLOG_XACT_COMMIT:
@@ -282,9 +287,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				TransactionId xid;
 				xl_xact_invals *invals;
+				bool		has_snapshot;
 
 				xid = XLogRecGetXid(r);
 				invals = (xl_xact_invals *) XLogRecGetData(r);
+				has_snapshot = SnapBuildCurrentState(builder) >= SNAPBUILD_FULL_SNAPSHOT;
 
 				/*
 				 * Execute the invalidations for xid-less transactions,
@@ -293,7 +300,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 */
 				if (TransactionIdIsValid(xid))
 				{
-					if (!ctx->fast_forward)
+					if (!ctx->fast_forward && has_snapshot)
 						ReorderBufferAddInvalidations(reorder, xid,
 													  buf->origptr,
 													  invals->nmsgs,
@@ -301,7 +308,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 					ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
 													  buf->origptr);
 				}
-				else if (!ctx->fast_forward)
+				else if (!ctx->fast_forward && has_snapshot)
 					ReorderBufferImmediateInvalidation(ctx->reorder,
 													   invals->nmsgs,
 													   invals->msgs);
@@ -416,7 +423,22 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
 		ctx->fast_forward)
+	{
+		/*
+		 * Note that during or after BUILDING_SNAPSHOT, we need handle the xlog
+		 * that might mark a transaction as catalog modifying because the snapshot
+		 * only tracks catalog modifying transactions. The transaction before
+		 * BUILDING_SNAPSHOT will not be tracked anyway(see SnapBuildCommitTxn()
+		 * for details), so just return.
+	 	 */
+		if (SnapBuildCurrentState(builder) >= SNAPBUILD_BUILDING_SNAPSHOT)
+		{
+			/* Currently only XLOG_HEAP2_NEW_CID means a catalog modifying */
+			if (info == XLOG_HEAP2_NEW_CID && TransactionIdIsValid(xid))
+				ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+		}
 		return;
+	}
 
 	switch (info)
 	{
@@ -475,7 +497,22 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
 		ctx->fast_forward)
+	{
+		/*
+		 * Note that during or after BUILDING_SNAPSHOT, we need handle the xlog
+		 * that might mark a transaction as catalog modifying because the snapshot
+		 * only tracks catalog modifying transactions. The transaction before
+		 * BUILDING_SNAPSHOT will not be tracked anyway(see SnapBuildCommitTxn()
+		 * for details), so just return.
+	 	 */
+		if (SnapBuildCurrentState(builder) >= SNAPBUILD_BUILDING_SNAPSHOT)
+		{
+			/* Currently only XLOG_HEAP_INPLACE means a catalog modifying */
+			if (info == XLOG_HEAP_INPLACE && TransactionIdIsValid(xid))
+				ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+		}
 		return;
+	}
 
 	switch (info)
 	{
@@ -1301,6 +1338,7 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 				  Oid txn_dbid, RepOriginId origin_id)
 {
 	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		SnapBuildCurrentState(ctx->snapshot_builder) < SNAPBUILD_CONSISTENT ||
 		(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
 		FilterByOrigin(ctx, origin_id))
 		return true;
-- 
2.34.1

