From 6140ffb31521bd2bc89201bcb7b550a9adf992ee Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Sun, 17 Jul 2022 21:28:35 +0900
Subject: [PATCH v6] Fix catalog lookup with the wrong snapshot during logical
 decoding.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION
records to know if the transaction has modified the catalog, and that
information is not serialized to snapshot. Therefore, if the logical
decoding decodes only the commit record of the transaction that
actually has modified a catalog, we missed adding its XID to the
snapshot. We ended up looking at catalogs with the wrong snapshot.

To fix this problem, this change the reorder buffer so that it
remembers the initial running transaction written in the
xl_running_xacts record that we decoded first, and mark the
transaction as containing catalog changes if it’s in the list of the
initial running transactions and its commit record has
XACT_XINFO_HAS_INVALS.

This has false positive; we could end up adding the transaction that
didn't change catalog to the snapshot since we cannot distinguish
whether the transaction has catalog changes only by checking the
COMMIT record. It doesn’t have the information on which (sub)
transaction has catalog changes, and XACT_XINFO_HAS_INVALS doesn't
necessarily indicate that the transaction has catalog change. But it
doesn't become a problem since we use historic snapshot only for
reading system catalogs.

On the master branch, we took a more future-proof approach of writing
catalog modifying transactions to the serialized snapshot. But we
cannot backpatch it because of change in SnapBuild.

Back-patch to all supported released.
---
 contrib/test_decoding/Makefile                |   2 +-
 .../expected/catalog_change_snapshot.out      |  41 +++++++
 .../specs/catalog_change_snapshot.spec        |  39 ++++++
 src/backend/replication/logical/decode.c      |  17 +++
 .../replication/logical/reorderbuffer.c       | 116 ++++++++++++++++++
 src/include/replication/reorderbuffer.h       |  36 ++++++
 6 files changed, 250 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/catalog_change_snapshot.out
 create mode 100644 contrib/test_decoding/specs/catalog_change_snapshot.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 2db2b2774b..73bc0fe1fe 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -51,7 +51,7 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install
 	    $(REGRESSCHECKS)
 
 ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \
-	oldest_xmin snapshot_transfer subxact_without_top
+	oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot
 
 isolationcheck: | submake-isolation submake-test_decoding temp-install
 	$(pg_isolation_regress_check) \
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
new file mode 100644
index 0000000000..15f9540b3f
--- /dev/null
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -0,0 +1,41 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
new file mode 100644
index 0000000000..662760fbcf
--- /dev/null
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -0,0 +1,39 @@
+# Test decoding only the commit record of the transaction that have
+# modified catalogs.
+setup
+{
+    DROP TABLE IF EXISTS tbl1;
+    CREATE TABLE tbl1 (val1 integer, val2 integer);
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACT record emitted
+# during the first checkpoint execution.  This transaction must be marked as
+# containing catalog changes while decoding the COMMIT record and the decoding
+# of the INSERT record must read the pg_class with the correct historic snapshot.
+#
+# Note that in a case where bgwriter wrote the XACT_RUNNING record between "s0_commit"
+# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACT
+# record written by bgwriter.  One might think we can either stop the bgwriter or
+# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 6f8920f52c..fa0e9b1f38 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -316,6 +316,9 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
 
+				/* Process the initial running transactions, if any */
+				ReorderBufferProcessInitialXacts(ctx->reorder, running);
+
 				SnapBuildProcessRunningXacts(builder, buf->origptr, running);
 
 				/*
@@ -552,6 +555,20 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 		commit_time = parsed->origin_timestamp;
 	}
 
+	/*
+	 * If the COMMIT record has invalidation messages, it could have catalog
+	 * changes. We check if it's in the list of the initial running transactions
+	 * and then mark it as containing catalog change.
+	 *
+	 * This must be done before SnapBuildCommitTxn() so that we can include
+	 * catalog change transactions to the historic snapshot.
+	 */
+	if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
+		ReorderBufferInitialXactsSetCatalogChanges(ctx->reorder, xid,
+												   parsed->nsubxacts,
+												   parsed->subxacts,
+												   buf->origptr);
+
 	/*
 	 * Process invalidation messages, even if we're not interested in the
 	 * transaction's contents, since the various caches need to always be
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 061555652c..661436622f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -272,6 +272,9 @@ ReorderBufferAllocate(void)
 	buffer->outbuf = NULL;
 	buffer->outbufsize = 0;
 
+	buffer->initial_running_xacts = NULL;
+	buffer->n_initial_running_xacts = 0;
+
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
@@ -3509,3 +3512,116 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Process the transactions in xl_running_xacts record, and remember the
+ * transactions first and later remove those that aren't needed anymore.
+ *
+ * We can ideally remove the transactions from the initial running xacts array
+ * once it is finished (committed/aborted) but that could be costly as we need
+ * to maintain the xids order in the array.
+ */
+void
+ReorderBufferProcessInitialXacts(ReorderBuffer *rb, xl_running_xacts *running)
+{
+	LogicalDecodingContext *ctx = rb->private_data;
+	SnapBuild  *builder = ctx->snapshot_builder;
+	TransactionId *workspace;
+	int			surviving_xids = 0;
+
+	/* Build the initial running transactions list for the first call */
+	if (unlikely(SnapBuildCurrentState(builder) == SNAPBUILD_START))
+	{
+		int			nxacts = running->subxcnt + running->xcnt;
+		Size		sz = sizeof(TransactionId) * nxacts;
+
+		Assert(rb->n_initial_running_xacts == 0);
+
+		rb->n_initial_running_xacts = nxacts;
+		rb->initial_running_xacts = MemoryContextAlloc(rb->context, sz);
+		memcpy(rb->initial_running_xacts, running->xids, sz);
+		qsort(rb->initial_running_xacts, nxacts, sizeof(TransactionId),
+			  xidComparator);
+
+		return;
+	}
+
+	/* Quick exit if there is no initial running transactions */
+	if (likely(rb->n_initial_running_xacts == 0))
+		return;
+
+	/* bound check if there is at least one transaction to remove */
+	if (!NormalTransactionIdPrecedes(rb->initial_running_xacts[0],
+									 running->oldestRunningXid))
+		return;
+
+	/*
+	 * Remove transactions that would have been processed and we don't need to
+	 * keep track off anymore.
+	 *
+	 * The purged array must also be sorted in xidComparator order.
+	 */
+	workspace = MemoryContextAlloc(rb->context,
+								   rb->n_initial_running_xacts * sizeof(TransactionId));
+	for (int i = 0; i < rb->n_initial_running_xacts; i++)
+	{
+		if (NormalTransactionIdPrecedes(rb->initial_running_xacts[i],
+										running->oldestRunningXid))
+			;					/* remove */
+		else
+			workspace[surviving_xids++] = rb->initial_running_xacts[i];
+	}
+
+	if (surviving_xids > 0)
+		memcpy(rb->initial_running_xacts, workspace,
+			   sizeof(TransactionId) * surviving_xids);
+	else
+	{
+		pfree(rb->initial_running_xacts);
+		rb->initial_running_xacts = NULL;
+	}
+
+	elog(DEBUG3, "purged catalog modifying transactions from %u to %u, oldest running xid %u",
+		 (uint32) rb->n_initial_running_xacts,
+		 (uint32) surviving_xids,
+		 running->oldestRunningXid);
+
+	rb->n_initial_running_xacts = surviving_xids;
+	pfree(workspace);
+}
+
+/*
+ * If the given xid is in the list of the initial running xacts, we mark both it
+ * and its subtransactions as containing catalog changes if not yet.
+ */
+void
+ReorderBufferInitialXactsSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
+										   int subxcnt, TransactionId *subxacts,
+										   XLogRecPtr lsn)
+{
+	/*
+	 * Skip if there is no initial running xacts information or the
+	 * transaction is already marked as containing catalog changes.
+	 */
+	if (likely(rb->n_initial_running_xacts == 0 ||
+			   ReorderBufferXidHasCatalogChanges(rb, xid)))
+		return;
+
+	/*
+	 * If this committed transaction is the one that was running at the time
+	 * when decoding the first RUNNING_XACTS record and have done catalog
+	 * changes, we can mark both the top transaction and its subtransactions
+	 * as containing catalog changes.
+	 */
+	if (bsearch(&xid, rb->initial_running_xacts, rb->n_initial_running_xacts,
+				sizeof(TransactionId), xidComparator) != NULL)
+	{
+		ReorderBufferXidSetCatalogChanges(rb, xid, lsn);
+
+		for (int i = 0; i < subxcnt; i++)
+		{
+			ReorderBufferAssignChild(rb, xid, subxacts[i], lsn);
+			ReorderBufferXidSetCatalogChanges(rb, subxacts[i], lsn);
+		}
+	}
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index d4555676d6..956d4e3329 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -12,6 +12,7 @@
 #include "access/htup_details.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
+#include "storage/standby.h"
 #include "utils/hsearch.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -376,6 +377,35 @@ struct ReorderBuffer
 
 	XLogRecPtr	current_restart_decoding_lsn;
 
+	/*
+	 * Array of transactions and subtransactions that were running when
+	 * the xl_running_xacts record that we decoded first was written.
+	 * The array is sorted in xidComparator order. Xids are removed from
+	 * the array when decoding xl_running_xacts record, and then the array
+	 * eventually becomes an empty.
+	 *
+	 * We rely on HEAP2_NEW_CID records and XACT_INVALIDATIONS to know
+	 * if the transaction has changed the catalog, and that information
+	 * is not serialized to SnapBuilder. Therefore, if the logical
+	 * decoding decodes the commit record of the transaction that actually
+	 * has done catalog changes without these records, we miss to add
+	 * the xid to the snapshot, and end up looking at catalogs with the
+	 * wrong snapshot. To avoid this problem, if the COMMIT record of
+	 * the xid listed in initial_running_xacts has XACT_XINFO_HAS_INVALS
+	 * flag, we mark both the top transaction and its substransactions
+	 * as containing catalog changes.
+	 *
+	 * We could end up adding the transaction that didn't change catalog
+	 * to the snapshot since we cannot distinguish whether the transaction
+	 * has catalog changes only by checking the COMMIT record. It doesn't
+	 * have the information on which (sub) transaction has catalog changes,
+	 * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the
+	 * transaction has catalog change. But it doesn't become a problem since
+	 * we use historic snapshot only for reading system catalogs.
+	 */
+	TransactionId *initial_running_xacts;
+	int n_initial_running_xacts;
+
 	/* buffer for disk<->memory conversions */
 	char	   *outbuf;
 	Size		outbufsize;
@@ -427,4 +457,10 @@ void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
 
+void		ReorderBufferProcessInitialXacts(ReorderBuffer *rb,
+											 xl_running_xacts *running);
+void		ReorderBufferInitialXactsSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
+													   int subxcnt,
+													   TransactionId *subxacts,
+													   XLogRecPtr lsn);
 #endif
-- 
2.24.3 (Apple Git-128)

