From 21d3b1ae822e82d87988d60b1d7c8a2e003565c0 Mon Sep 17 00:00:00 2001
From: nkey <nkey@toloka.ai>
Date: Wed, 3 Sep 2025 19:08:55 +0200
Subject: [PATCH v18 2/2] Fix logical replication conflict detection during
 tuple lookup

SNAPSHOT_DIRTY scans could miss conflict detection with concurrent transactions during logical replication.
Replace SNAPSHOT_DIRTY scan with the GetLatestSnapshot in RelationFindReplTupleByIndex and RelationFindReplTupleSeq.
---
 src/backend/catalog/index.c            |  2 +-
 src/backend/executor/execReplication.c | 64 ++++++++------------------
 2 files changed, 19 insertions(+), 47 deletions(-)

diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9407c357f27..05d21a3ccfe 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -1733,7 +1733,7 @@ index_concurrently_swap(Oid newIndexId, Oid oldIndexId, const char *oldName)
 
 			tgForm->tgconstrindid = newIndexId;
 
-			CatalogTupleUpdate(pg_trigger, &triggerTuple->t_self, triggerTuple);
+			CatalogTupleUpdate(pg_trigger, &triggerTuple->t_self,  triggerTuple);
 
 			heap_freetuple(triggerTuple);
 		}
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index b2ca5cbf117..82629aa33d6 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -187,8 +187,6 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 	ScanKeyData skey[INDEX_MAX_KEYS];
 	int			skey_attoff;
 	IndexScanDesc scan;
-	SnapshotData snap;
-	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
 	TypeCacheEntry **eq = NULL;
@@ -199,18 +197,18 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 
 	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
 
-	InitDirtySnapshot(snap);
-
 	/* Build scan key. */
 	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
 
-	/* Start an index scan. */
+	/* Start an index scan. SnapshotAny will be replaced below. */
 	scan = index_beginscan(rel, idxrel,
-						   &snap, NULL, skey_attoff, 0, SO_NONE);
+						   SnapshotAny, NULL, skey_attoff, 0, SO_NONE);
 
 retry:
 	found = false;
-
+	PushActiveSnapshot(GetLatestSnapshot());
+	/* Update the actual scan snapshot each retry */
+	scan->xs_snapshot = GetActiveSnapshot();
 	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
@@ -231,19 +229,6 @@ retry:
 
 		ExecMaterializeSlot(outslot);
 
-		xwait = TransactionIdIsValid(snap.xmin) ?
-			snap.xmin : snap.xmax;
-
-		/*
-		 * If the tuple is locked, wait for locking transaction to finish and
-		 * retry.
-		 */
-		if (TransactionIdIsValid(xwait))
-		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
-		}
-
 		/* Found our tuple and it's not locked */
 		found = true;
 		break;
@@ -255,8 +240,6 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
-		PushActiveSnapshot(GetLatestSnapshot());
-
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
@@ -265,13 +248,15 @@ retry:
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
-		PopActiveSnapshot();
-
 		if (should_refetch_tuple(res, &tmfd))
+		{
+			PopActiveSnapshot();
 			goto retry;
+		}
 	}
 
 	index_endscan(scan);
+	PopActiveSnapshot();
 
 	/* Don't release lock until commit. */
 	index_close(idxrel, NoLock);
@@ -372,9 +357,7 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 {
 	TupleTableSlot *scanslot;
 	TableScanDesc scan;
-	SnapshotData snap;
 	TypeCacheEntry **eq;
-	TransactionId xwait;
 	bool		found;
 	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
 
@@ -382,14 +365,15 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 	eq = palloc0_array(TypeCacheEntry *, outslot->tts_tupleDescriptor->natts);
 
-	/* Start a heap scan. */
-	InitDirtySnapshot(snap);
-	scan = table_beginscan(rel, &snap, 0, NULL,
-						   SO_NONE);
+	/* Start a heap scan. SnapshotAny will be replaced below. */
+	scan = table_beginscan(rel, SnapshotAny, 0, NULL, SO_NONE);
 	scanslot = table_slot_create(rel, NULL);
 
 retry:
 	found = false;
+	PushActiveSnapshot(GetLatestSnapshot());
+	/* Update the actual scan snapshot each retry */
+	scan->rs_snapshot = GetActiveSnapshot();
 
 	table_rescan(scan, NULL);
 
@@ -402,19 +386,6 @@ retry:
 		found = true;
 		ExecCopySlot(outslot, scanslot);
 
-		xwait = TransactionIdIsValid(snap.xmin) ?
-			snap.xmin : snap.xmax;
-
-		/*
-		 * If the tuple is locked, wait for locking transaction to finish and
-		 * retry.
-		 */
-		if (TransactionIdIsValid(xwait))
-		{
-			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-			goto retry;
-		}
-
 		/* Found our tuple and it's not locked */
 		break;
 	}
@@ -425,8 +396,6 @@ retry:
 		TM_FailureData tmfd;
 		TM_Result	res;
 
-		PushActiveSnapshot(GetLatestSnapshot());
-
 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
 							   outslot,
 							   GetCurrentCommandId(false),
@@ -435,13 +404,16 @@ retry:
 							   0 /* don't follow updates */ ,
 							   &tmfd);
 
-		PopActiveSnapshot();
 
 		if (should_refetch_tuple(res, &tmfd))
+		{
+			PopActiveSnapshot();
 			goto retry;
+		}
 	}
 
 	table_endscan(scan);
+	PopActiveSnapshot();
 	ExecDropSingleTupleTableSlot(scanslot);
 
 	return found;
-- 
2.43.0

