From 625b98a4030281eae7a1e23be973849200fe5c91 Mon Sep 17 00:00:00 2001
From: ChangAo Chen <cca5507@qq.com>
Date: Thu, 8 Aug 2024 12:21:07 +0800
Subject: [PATCH v2 1/2] Track transactions committed in BUILDING_SNAPSHOT.

Transactions committed in BUILDING_SNAPSHOT are useful for build
historic snapshot, but we didn't track them previously. This results
in a transaction taking an incorrect snapshot and logical decoding
being interrupted. So we must track these transactions.
---
 src/backend/replication/logical/decode.c    | 30 +++++++++++++--------
 src/backend/replication/logical/snapbuild.c |  7 +++++
 2 files changed, 26 insertions(+), 11 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index d687ceee33..267e886ca3 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -206,10 +206,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	uint8		info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
 
 	/*
-	 * If the snapshot isn't yet fully built, we cannot decode anything, so
-	 * bail out.
+	 * Before BUILDING_SNAPSHOT, the xlog is useless, so bail out. Note
+	 * that the xlog in BUILDING_SNAPSHOT is only useful for build the
+	 * snapshot and will not be decoded.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT)
 		return;
 
 	switch (info)
@@ -282,9 +283,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				TransactionId xid;
 				xl_xact_invals *invals;
+				bool		has_snapshot;
 
 				xid = XLogRecGetXid(r);
 				invals = (xl_xact_invals *) XLogRecGetData(r);
+				has_snapshot = SnapBuildCurrentState(builder) >= SNAPBUILD_FULL_SNAPSHOT;
 
 				/*
 				 * Execute the invalidations for xid-less transactions,
@@ -293,7 +296,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 */
 				if (TransactionIdIsValid(xid))
 				{
-					if (!ctx->fast_forward)
+					if (!ctx->fast_forward && has_snapshot)
 						ReorderBufferAddInvalidations(reorder, xid,
 													  buf->origptr,
 													  invals->nmsgs,
@@ -301,7 +304,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 					ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
 													  buf->origptr);
 				}
-				else if (!ctx->fast_forward)
+				else if (!ctx->fast_forward && has_snapshot)
 					ReorderBufferImmediateInvalidation(ctx->reorder,
 													   invals->nmsgs,
 													   invals->msgs);
@@ -411,10 +414,12 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
 	/*
-	 * If we don't have snapshot or we are just fast-forwarding, there is no
-	 * point in decoding changes.
+	 * Before BUILDING_SNAPSHOT or we are just fast-forwarding, there is no
+	 * point in decoding changes. Note that we only handle XLOG_HEAP2_NEW_CID
+	 * which mark a transaction as catalog modifying in BUILDING_SNAPSHOT, it's
+	 * useful for build the snapshot.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT ||
 		ctx->fast_forward)
 		return;
 
@@ -470,10 +475,12 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
 	/*
-	 * If we don't have snapshot or we are just fast-forwarding, there is no
-	 * point in decoding data changes.
+	 * Before BUILDING_SNAPSHOT or we are just fast-forwarding, there is no
+	 * point in decoding changes. Note that we only handle XLOG_HEAP_INPLACE
+	 * which mark a transaction as catalog modifying in BUILDING_SNAPSHOT, it's
+	 * useful for build the snapshot.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT ||
 		ctx->fast_forward)
 		return;
 
@@ -1301,6 +1308,7 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 				  Oid txn_dbid, RepOriginId origin_id)
 {
 	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		SnapBuildCurrentState(ctx->snapshot_builder) < SNAPBUILD_CONSISTENT ||
 		(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
 		FilterByOrigin(ctx, origin_id))
 		return true;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ae676145e6..5443b6dce8 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -836,6 +836,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 	 */
 	ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
 
+	/*
+	 * If we don't have snapshot, the transaction won't be
+	 * replayed, just return here.
+	 */
+	if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
+		return;
+
 	ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
 								 xlrec->target_locator, xlrec->target_tid,
 								 xlrec->cmin, xlrec->cmax,
-- 
2.34.1

