From b35711f5ef0941f79127acbec55a8ede5e27a5eb Mon Sep 17 00:00:00 2001
From: ChangAo Chen <cca5507@qq.com>
Date: Tue, 13 Aug 2024 11:45:07 +0800
Subject: [PATCH v4] Track transactions committed in BUILDING_SNAPSHOT.

The historic snapshot previously didn't track transactions committed
in BUILDING_SNAPSHOT, and this might result in a transaction taking
an incorrect snapshot and logical decoding being interrupted. So we
need track these transactions.

We also need handle the xlog that marks a transaction as containing
catalog changes in BUILDING_SNAPSHOT because the historic snapshot
only tracks catalog modifying transactions.
---
 src/backend/replication/logical/decode.c | 45 +++++++++++++++++++++---
 1 file changed, 40 insertions(+), 5 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index d687ceee33..6df558ad18 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -206,12 +206,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	uint8		info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
 
 	/*
-	 * If the snapshot isn't yet fully built, we cannot decode anything, so
-	 * bail out.
+	 * If the snapshot hasn't started building yet, the transaction won't be
+	 * decoded or tracked by the snapshot, so bail out.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT)
 		return;
 
+	/*
+	 * Note that if the snapshot isn't yet fully built, the xlog is only used
+	 * to build the snapshot and won't be decoded.
+	 */
 	switch (info)
 	{
 		case XLOG_XACT_COMMIT:
@@ -282,9 +286,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				TransactionId xid;
 				xl_xact_invals *invals;
+				bool		has_snapshot;
 
 				xid = XLogRecGetXid(r);
 				invals = (xl_xact_invals *) XLogRecGetData(r);
+				has_snapshot = SnapBuildCurrentState(builder) >= SNAPBUILD_FULL_SNAPSHOT;
 
 				/*
 				 * Execute the invalidations for xid-less transactions,
@@ -293,7 +299,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 */
 				if (TransactionIdIsValid(xid))
 				{
-					if (!ctx->fast_forward)
+					if (!ctx->fast_forward && has_snapshot)
 						ReorderBufferAddInvalidations(reorder, xid,
 													  buf->origptr,
 													  invals->nmsgs,
@@ -301,7 +307,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 					ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
 													  buf->origptr);
 				}
-				else if (!ctx->fast_forward)
+				else if (!ctx->fast_forward && has_snapshot)
 					ReorderBufferImmediateInvalidation(ctx->reorder,
 													   invals->nmsgs,
 													   invals->msgs);
@@ -407,6 +413,8 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
 	SnapBuild  *builder = ctx->snapshot_builder;
+	/* True if the xlog marks the transaction as containing catalog changes */
+	bool	set_catalog_changes = (info == XLOG_HEAP2_NEW_CID);
 
 	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
@@ -416,7 +424,19 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
 		ctx->fast_forward)
+	{
+		/*
+		 * If the transaction contains catalog changes, we need mark it in
+		 * reorder buffer before return as the snapshot only tracks catalog
+		 * modifying transactions. The transaction before BUILDING_SNAPSHOT
+		 * won't be tracked anyway(see SnapBuildCommitTxn), so skip it.
+		 */
+		if (set_catalog_changes && TransactionIdIsValid(xid) &&
+			SnapBuildCurrentState(builder) >= SNAPBUILD_BUILDING_SNAPSHOT)
+			ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+
 		return;
+	}
 
 	switch (info)
 	{
@@ -466,6 +486,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
 	SnapBuild  *builder = ctx->snapshot_builder;
+	/* True if the xlog marks the transaction as containing catalog changes */
+	bool	set_catalog_changes = (info == XLOG_HEAP_INPLACE);
 
 	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
@@ -475,7 +497,19 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
 		ctx->fast_forward)
+	{
+		/*
+		 * If the transaction contains catalog changes, we need mark it in
+		 * reorder buffer before return as the snapshot only tracks catalog
+		 * modifying transactions. The transaction before BUILDING_SNAPSHOT
+		 * won't be tracked anyway(see SnapBuildCommitTxn), so skip it.
+		 */
+		if (set_catalog_changes && TransactionIdIsValid(xid) &&
+			SnapBuildCurrentState(builder) >= SNAPBUILD_BUILDING_SNAPSHOT)
+			ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+
 		return;
+	}
 
 	switch (info)
 	{
@@ -1301,6 +1335,7 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 				  Oid txn_dbid, RepOriginId origin_id)
 {
 	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		SnapBuildCurrentState(ctx->snapshot_builder) < SNAPBUILD_CONSISTENT ||
 		(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
 		FilterByOrigin(ctx, origin_id))
 		return true;
-- 
2.34.1

