From 269184738e6b5b5e888bb4dd44d7f3ebec6151cf Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Fri, 24 Jan 2025 00:11:29 -0500
Subject: [PATCH v12 1/3] Track transactions with internal snapshot changes

Track transactions which have snapshot changes with a new flag RBTXN_HAS_SNAPSHOT_CHANGES
---
 src/backend/replication/logical/reorderbuffer.c | 11 +++++++++++
 src/include/replication/reorderbuffer.h         |  7 +++++++
 2 files changed, 18 insertions(+)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 79b60df..121a1c2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -823,6 +823,14 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 
 		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
 	}
+	else if (change->action == REORDER_BUFFER_CHANGE_INVALIDATION ||
+			 change->action == REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT ||
+			 change->action == REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID)
+	{
+		ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
+
+		toptxn->txn_flags |= RBTXN_HAS_SNAPSHOT_CHANGES;
+	}
 
 	change->lsn = lsn;
 	change->txn = txn;
@@ -1747,6 +1755,9 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
 	}
 
+	/* All snapshot changes up to this point have been processed. */
+	txn->txn_flags &= ~RBTXN_HAS_SNAPSHOT_CHANGES;
+
 	/* also reset the number of entries in the transaction */
 	txn->nentries_mem = 0;
 	txn->nentries = 0;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index a669658..0329a69 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -173,6 +173,7 @@ typedef struct ReorderBufferChange
 #define RBTXN_PREPARE             	0x0040
 #define RBTXN_SKIPPED_PREPARE	  	0x0080
 #define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
+#define RBTXN_HAS_SNAPSHOT_CHANGES	0x0200
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -210,6 +211,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
 )
 
+/* Does this transaction have snapshot changes? */
+#define rbtxn_has_snapshot_changes(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_SNAPSHOT_CHANGES) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
-- 
1.8.3.1

