From fec9567d30f333fb1b4b94bca4b7289146beadbc Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Tue, 11 Feb 2025 03:25:34 -0500
Subject: [PATCH v13 1/3] Track transactions with internal snapshot changes

Track transactions that make changes to the current snapshot with a new flag
RBTXN_HAS_SNAPSHOT_CHANGES. This will allow logical decoding to accumulate changes
which modify the snapshot before creating a historic snaphot for a transaction.
---
 src/backend/replication/logical/reorderbuffer.c | 11 +++++++++++
 src/include/replication/reorderbuffer.h         |  7 +++++++
 2 files changed, 18 insertions(+)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 10a3766..3d22aa9 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -823,6 +823,14 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 
 		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
 	}
+	else if (change->action == REORDER_BUFFER_CHANGE_INVALIDATION ||
+			 change->action == REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT ||
+			 change->action == REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID)
+	{
+		ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
+
+		toptxn->txn_flags |= RBTXN_HAS_SNAPSHOT_CHANGES;
+	}
 
 	change->lsn = lsn;
 	change->txn = txn;
@@ -1747,6 +1755,9 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
 	}
 
+	/* All snapshot changes up to this point have been processed. */
+	txn->txn_flags &= ~RBTXN_HAS_SNAPSHOT_CHANGES;
+
 	/* also reset the number of entries in the transaction */
 	txn->nentries_mem = 0;
 	txn->nentries = 0;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index a669658..b0f26c7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -173,6 +173,7 @@ typedef struct ReorderBufferChange
 #define RBTXN_PREPARE             	0x0040
 #define RBTXN_SKIPPED_PREPARE	  	0x0080
 #define RBTXN_HAS_STREAMABLE_CHANGE	0x0100
+#define RBTXN_HAS_SNAPSHOT_CHANGES	0x0200
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -210,6 +211,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
 )
 
+/* Does this transaction make changes to the current snapshot? */
+#define rbtxn_has_snapshot_changes(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_SNAPSHOT_CHANGES) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
-- 
1.8.3.1

