From e79dbf562133b4cf2f34a175aa494609f251bb13 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Fri, 23 Aug 2024 14:02:20 +0530
Subject: [PATCH v19_REL_13 1/2] Distribute invalidatons if change in catalog
 tables

Distribute invalidations to inprogress transactions if the current
committed transaction change any catalog table.
---
 contrib/test_decoding/Makefile                |  2 +-
 .../expected/invalidation_distrubution.out    | 20 ++++++
 .../specs/invalidation_distrubution.spec      | 32 ++++++++++
 .../replication/logical/reorderbuffer.c       | 64 ++++++++++++++++---
 src/backend/replication/logical/snapbuild.c   | 63 ++++++++++++++----
 src/include/replication/reorderbuffer.h       |  4 ++
 6 files changed, 164 insertions(+), 21 deletions(-)
 create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out
 create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 735b7e7653c..f122dc3a82d 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	spill slot truncate
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot \
-	skip_snapshot_restore
+	skip_snapshot_restore invalidation_distrubution
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out b/contrib/test_decoding/expected/invalidation_distrubution.out
new file mode 100644
index 00000000000..eb70eda9042
--- /dev/null
+++ b/contrib/test_decoding/expected/invalidation_distrubution.out
@@ -0,0 +1,20 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_commit: COMMIT;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec b/contrib/test_decoding/specs/invalidation_distrubution.spec
new file mode 100644
index 00000000000..ca051fc1e85
--- /dev/null
+++ b/contrib/test_decoding/specs/invalidation_distrubution.spec
@@ -0,0 +1,32 @@
+# Test that catalog cache invalidation messages are distributed to ongoing
+# transactions, ensuring they can access the updated catalog content after
+# processing these messages.
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput');
+    CREATE TABLE tbl1(val1 integer, val2 integer);
+    CREATE PUBLICATION pub;
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    DROP PUBLICATION pub;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
+step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
+
+# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
+permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 56c25e3a6da..fa9413fa2a0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2264,20 +2264,45 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 							  SharedInvalidationMessage *msgs)
 {
 	ReorderBufferTXN *txn;
+	MemoryContext oldcontext;
 
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
-	if (txn->ninvalidations != 0)
-		elog(ERROR, "only ever add one set of invalidations");
+	oldcontext = MemoryContextSwitchTo(rb->context);
+
+	/*
+	 * Collect all the invalidations under the top transaction, if available,
+	 * so that we can execute them all together.
+	 */
+	if (txn->toplevel_xid)
+	{
+		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, true, NULL, lsn,
+									true);
+	}
 
 	Assert(nmsgs > 0);
 
-	txn->ninvalidations = nmsgs;
-	txn->invalidations = (SharedInvalidationMessage *)
-		MemoryContextAlloc(rb->context,
-						   sizeof(SharedInvalidationMessage) * nmsgs);
-	memcpy(txn->invalidations, msgs,
-		   sizeof(SharedInvalidationMessage) * nmsgs);
+	/* Accumulate invalidations. */
+	if (txn->ninvalidations == 0)
+	{
+		txn->ninvalidations = nmsgs;
+		txn->invalidations = (SharedInvalidationMessage *)
+			palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+		memcpy(txn->invalidations, msgs,
+			   sizeof(SharedInvalidationMessage) * nmsgs);
+	}
+	else
+	{
+		txn->invalidations = (SharedInvalidationMessage *)
+			repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
+					 (txn->ninvalidations + nmsgs));
+
+		memcpy(txn->invalidations + txn->ninvalidations, msgs,
+			   nmsgs * sizeof(SharedInvalidationMessage));
+		txn->ninvalidations += nmsgs;
+	}
+
+	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
@@ -3895,3 +3920,26 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Count invalidation messages of specified transaction.
+ *
+ * Returns number of messages, and msgs is set to the pointer of the linked
+ * list for the messages.
+ */
+uint32
+ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
+							  SharedInvalidationMessage **msgs)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	if (txn == NULL)
+		return 0;
+
+	*msgs = txn->invalidations;
+
+	return txn->ninvalidations;
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 7546de96763..3bda41c5251 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -292,7 +292,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
 
 static void SnapBuildSnapIncRefcount(Snapshot snap);
 
-static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
 
 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
@@ -861,15 +861,15 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 }
 
 /*
- * Add a new Snapshot to all transactions we're decoding that currently are
- * in-progress so they can see new catalog contents made by the transaction
- * that just committed. This is necessary because those in-progress
- * transactions will use the new catalog's contents from here on (at the very
- * least everything they do needs to be compatible with newer catalog
- * contents).
+ * Add a new Snapshot and invalidation messages to all transactions we're
+ * decoding that currently are in-progress so they can see new catalog contents
+ * made by the transaction that just committed. This is necessary because those
+ * in-progress transactions will use the new catalog's contents from here on
+ * (at the very least everything they do needs to be compatible with newer
+ * catalog contents).
  */
 static void
-SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
 {
 	dlist_iter	txn_i;
 	ReorderBufferTXN *txn;
@@ -877,7 +877,8 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 	/*
 	 * Iterate through all toplevel transactions. This can include
 	 * subtransactions which we just don't yet know to be that, but that's
-	 * fine, they will just get an unnecessary snapshot queued.
+	 * fine, they will just get an unnecessary snapshot and invalidations
+	 * queued.
 	 */
 	dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
 	{
@@ -890,6 +891,14 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		 * transaction which in turn implies we don't yet need a snapshot at
 		 * all. We'll add a snapshot when the first change gets queued.
 		 *
+		 * Similarly, we don't need to add invalidations to a transaction whose
+		 * base snapshot is not yet set. Once a base snapshot is built, it will
+		 * include the xids of committed transactions that have modified the
+		 * catalog, thus reflecting the new catalog contents. The existing
+		 * catalog cache will have already been invalidated after processing
+		 * the invalidations in the transaction that modified catalogs,
+		 * ensuring that a fresh cache is constructed during decoding.
+		 *
 		 * NB: This works correctly even for subtransactions because
 		 * ReorderBufferAssignChild() takes care to transfer the base snapshot
 		 * to the top-level transaction, and while iterating the changequeue
@@ -898,7 +907,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
 			continue;
 
-		elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
+		elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
 			 txn->xid, (uint32) (lsn >> 32), (uint32) lsn);
 
 		/*
@@ -908,6 +917,33 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		SnapBuildSnapIncRefcount(builder->snapshot);
 		ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
 								 builder->snapshot);
+
+		/*
+		 * Add invalidation messages to the reorder buffer of in-progress
+		 * transactions except the current committed transaction, for which we
+		 * will execute invalidations at the end.
+		 *
+		 * It is required, otherwise, we will end up using the stale catcache
+		 * contents built by the current transaction even after its decoding,
+		 * which should have been invalidated due to concurrent catalog
+		 * changing transaction.
+		 */
+		if (txn->xid != xid)
+		{
+			uint32 ninvalidations;
+			SharedInvalidationMessage *msgs = NULL;
+
+			ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
+														   xid, &msgs);
+
+			if (ninvalidations > 0)
+			{
+				Assert(msgs != NULL);
+
+				ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
+											  ninvalidations, msgs);
+			}
+		}
 	}
 }
 
@@ -1186,8 +1222,11 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		/* refcount of the snapshot builder for the new snapshot */
 		SnapBuildSnapIncRefcount(builder->snapshot);
 
-		/* add a new catalog snapshot to all currently running transactions */
-		SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
+		/*
+		 * Add a new catalog snapshot and invalidations messages to all
+		 * currently running transactions.
+		 */
+		SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
 	}
 }
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5347597e92b..545cee891ed 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -463,6 +463,10 @@ TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
+uint32		ReorderBufferGetInvalidations(ReorderBuffer *rb,
+										  TransactionId xid,
+										  SharedInvalidationMessage **msgs);
+
 void		StartupReorderBuffer(void);
 
 #endif
-- 
2.43.5

