From a6c8b8e7d8bd2f59e7ef45eaf521629dd2963085 Mon Sep 17 00:00:00 2001
From: nkey <nkey@toloka.ai>
Date: Wed, 3 Sep 2025 19:08:55 +0200
Subject: [PATCH v12 2/2] Fix logical replication conflict detection during
 tuple lookup

SNAPSHOT_DIRTY scans could miss conflict detection with concurrent transactions during logical replication.
Replace SNAPSHOT_DIRTY scan with the GetLatestSnapshot in RelationFindReplTupleByIndex and RelationFindReplTupleSeq.
---
 src/backend/executor/execReplication.c | 63 ++++++++------------------
 1 file changed, 18 insertions(+), 45 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index b409d4ecbf5..0de40aec733 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -186,8 +186,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 	ScanKeyData skey[INDEX_MAX_KEYS];
 	int			skey_attoff;
 	IndexScanDesc scan;
-	SnapshotData snap;
-	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
 	TypeCacheEntry **eq = NULL;
@@ -198,17 +196,17 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 
 	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
 
-	InitDirtySnapshot(snap);
-
 	/* Build scan key. */
 	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
 
-	/* Start an index scan. */
-	scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
+	/* Start an index scan. SnapshotAny will be replaced below. */
+	scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
 
 retry:
 	found = false;
-
+	PushActiveSnapshot(GetLatestSnapshot());
+	/* Update the actual scan snapshot each retry */
+	scan->xs_snapshot = GetActiveSnapshot();
 	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
@@ -229,19 +227,6 @@ retry:
 
 		ExecMaterializeSlot(outslot);
 
-		xwait = TransactionIdIsValid(snap.xmin) ?
-			snap.xmin : snap.xmax;
-
-		/*
-		 * If the tuple is locked, wait for locking transaction to finish and
-		 * retry.
-		 */
-		if (TransactionIdIsValid(xwait))
-		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
-		}
-
 		/* Found our tuple and it's not locked */
 		found = true;
 		break;
@@ -253,8 +238,6 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
-		PushActiveSnapshot(GetLatestSnapshot());
-
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
@@ -263,13 +246,15 @@ retry:
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
-		PopActiveSnapshot();
-
 		if (should_refetch_tuple(res, &tmfd))
+		{
+			PopActiveSnapshot();
 			goto retry;
+		}
 	}
 
 	index_endscan(scan);
+	PopActiveSnapshot();
 
 	/* Don't release lock until commit. */
 	index_close(idxrel, NoLock);
@@ -370,9 +355,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 {
 	TupleTableSlot *scanslot;
 	TableScanDesc scan;
-	SnapshotData snap;
 	TypeCacheEntry **eq;
-	TransactionId xwait;
 	bool		found;
 	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
 
@@ -380,13 +363,15 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 	eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 
-	/* Start a heap scan. */
-	InitDirtySnapshot(snap);
-	scan = table_beginscan(rel, &snap, 0, NULL);
+	/* Start a heap scan. SnapshotAny will be replaced below. */
+	scan = table_beginscan(rel, SnapshotAny, 0, NULL);
 	scanslot = table_slot_create(rel, NULL);
 
 retry:
 	found = false;
+	PushActiveSnapshot(GetLatestSnapshot());
+	/* Update the actual scan snapshot each retry */
+	scan->rs_snapshot = GetActiveSnapshot();
 
 	table_rescan(scan, NULL);
 
@@ -399,19 +384,6 @@ retry:
 		found = true;
 		ExecCopySlot(outslot, scanslot);
 
-		xwait = TransactionIdIsValid(snap.xmin) ?
-			snap.xmin : snap.xmax;
-
-		/*
-		 * If the tuple is locked, wait for locking transaction to finish and
-		 * retry.
-		 */
-		if (TransactionIdIsValid(xwait))
-		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
-		}
-
 		/* Found our tuple and it's not locked */
 		break;
 	}
@@ -422,8 +394,6 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
-		PushActiveSnapshot(GetLatestSnapshot());
-
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
@@ -432,13 +402,16 @@ retry:
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
-		PopActiveSnapshot();
 
 		if (should_refetch_tuple(res, &tmfd))
+		{
+			PopActiveSnapshot();
 			goto retry;
+		}
 	}
 
 	table_endscan(scan);
+	PopActiveSnapshot();
 	ExecDropSingleTupleTableSlot(scanslot);
 
 	return found;
-- 
2.43.0

