From 7fddf151b27e185cf27ae0f6d1a965a0b928c22a Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 11 Jul 2022 21:49:06 +0900
Subject: [PATCH v6] Fix catalog lookup with the wrong snapshot during logical
 decoding.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION
records to know if the transaction has modified the catalog, and that
information is not serialized to snapshot. Therefore, if the logical
decoding decodes only the commit record of the transaction that
actually has modified a catalog, we missed adding its XID to the
snapshot. We ended up looking at catalogs with the wrong snapshot.

To fix this problem, this change the reorder buffer so that it
remembers the initial running transaction written in the
xl_running_xacts record that we decoded first, and mark the
transaction as containing catalog changes if it’s in the list of the
initial running transactions and its commit record has
XACT_XINFO_HAS_INVALS.

This has false positive; we could end up adding the transaction that
didn't change catalog to the snapshot since we cannot distinguish
whether the transaction has catalog changes only by checking the
COMMIT record. It doesn’t have the information on which (sub)
transaction has catalog changes, and XACT_XINFO_HAS_INVALS doesn't
necessarily indicate that the transaction has catalog change. But it
doesn't become a problem since we use historic snapshot only for
reading system catalogs.

On the master branch, we took a more future-proof approach of writing
catalog modifying transactions to the serialized snapshot. But we
cannot backpatch it because of change in SnapBuild.

Back-patch to all supported released.
---
 contrib/test_decoding/Makefile                |   2 +-
 .../expected/catalog_change_snapshot.out      |  44 +++++++
 .../specs/catalog_change_snapshot.spec        |  39 ++++++
 src/backend/replication/logical/decode.c      |  17 +++
 .../replication/logical/reorderbuffer.c       | 116 ++++++++++++++++++
 src/include/replication/reorderbuffer.h       |  36 ++++++
 6 files changed, 253 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/catalog_change_snapshot.out
 create mode 100644 contrib/test_decoding/specs/catalog_change_snapshot.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a31e0b879..4553252d75 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-	twophase_snapshot
+	twophase_snapshot catalog_change_snapshot
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
new file mode 100644
index 0000000000..dc4f9b7018
--- /dev/null
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -0,0 +1,44 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                   
+---------------------------------------
+BEGIN                                  
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT                                 
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
new file mode 100644
index 0000000000..662760fbcf
--- /dev/null
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -0,0 +1,39 @@
+# Test decoding only the commit record of the transaction that have
+# modified catalogs.
+setup
+{
+    DROP TABLE IF EXISTS tbl1;
+    CREATE TABLE tbl1 (val1 integer, val2 integer);
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACT record emitted
+# during the first checkpoint execution.  This transaction must be marked as
+# containing catalog changes while decoding the COMMIT record and the decoding
+# of the INSERT record must read the pg_class with the correct historic snapshot.
+#
+# Note that in a case where bgwriter wrote the XACT_RUNNING record between "s0_commit"
+# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACT
+# record written by bgwriter.  One might think we can either stop the bgwriter or
+# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 92dfafc632..8d72c5af1f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -407,6 +407,9 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
 
+				/* Process the initial running transactions, if any */
+				ReorderBufferProcessInitialXacts(ctx->reorder, running);
+
 				SnapBuildProcessRunningXacts(builder, buf->origptr, running);
 
 				/*
@@ -691,6 +694,20 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 		commit_time = parsed->origin_timestamp;
 	}
 
+	/*
+	 * If the COMMIT record has invalidation messages, it could have catalog
+	 * changes. We check if it's in the list of the initial running transactions
+	 * and then mark it as containing catalog change.
+	 *
+	 * This must be done before SnapBuildCommitTxn() so that we can include
+	 * catalog change transactions to the historic snapshot.
+	 */
+	if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
+		ReorderBufferInitialXactsSetCatalogChanges(ctx->reorder, xid,
+												   parsed->nsubxacts,
+												   parsed->subxacts,
+												   buf->origptr);
+
 	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
 					   parsed->nsubxacts, parsed->subxacts);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e59d1396b5..bef9fb9bb2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	buffer->initial_running_xacts = NULL;
+	buffer->n_initial_running_xacts = 0;
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -5154,3 +5157,116 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Process the transactions in xl_running_xacts record, and remember the
+ * transactions first and later remove those that aren't needed anymore.
+ *
+ * We can ideally remove the transactions from the initial running xacts array
+ * once it is finished (committed/aborted) but that could be costly as we need
+ * to maintain the xids order in the array.
+ */
+void
+ReorderBufferProcessInitialXacts(ReorderBuffer *rb, xl_running_xacts *running)
+{
+	LogicalDecodingContext *ctx = rb->private_data;
+	SnapBuild  *builder = ctx->snapshot_builder;
+	TransactionId *workspace;
+	int			surviving_xids = 0;
+
+	/* Build the initial running transactions list for the first call */
+	if (unlikely(SnapBuildCurrentState(builder) == SNAPBUILD_START))
+	{
+		int			nxacts = running->subxcnt + running->xcnt;
+		Size		sz = sizeof(TransactionId) * nxacts;
+
+		Assert(rb->n_initial_running_xacts == 0);
+
+		rb->n_initial_running_xacts = nxacts;
+		rb->initial_running_xacts = MemoryContextAlloc(rb->context, sz);
+		memcpy(rb->initial_running_xacts, running->xids, sz);
+		qsort(rb->initial_running_xacts, nxacts, sizeof(TransactionId),
+			  xidComparator);
+
+		return;
+	}
+
+	/* Quick exit if there is no initial running transactions */
+	if (likely(rb->n_initial_running_xacts == 0))
+		return;
+
+	/* bound check if there is at least one transaction to remove */
+	if (!NormalTransactionIdPrecedes(rb->initial_running_xacts[0],
+									 running->oldestRunningXid))
+		return;
+
+	/*
+	 * Remove transactions that would have been processed and we don't need to
+	 * keep track off anymore.
+	 *
+	 * The purged array must also be sorted in xidComparator order.
+	 */
+	workspace = MemoryContextAlloc(rb->context,
+								   rb->n_initial_running_xacts * sizeof(TransactionId));
+	for (int i = 0; i < rb->n_initial_running_xacts; i++)
+	{
+		if (NormalTransactionIdPrecedes(rb->initial_running_xacts[i],
+										running->oldestRunningXid))
+			;					/* remove */
+		else
+			workspace[surviving_xids++] = rb->initial_running_xacts[i];
+	}
+
+	if (surviving_xids > 0)
+		memcpy(rb->initial_running_xacts, workspace,
+			   sizeof(TransactionId) * surviving_xids);
+	else
+	{
+		pfree(rb->initial_running_xacts);
+		rb->initial_running_xacts = NULL;
+	}
+
+	elog(DEBUG3, "purged catalog modifying transactions from %u to %u, oldest running xid %u",
+		 (uint32) rb->n_initial_running_xacts,
+		 (uint32) surviving_xids,
+		 running->oldestRunningXid);
+
+	rb->n_initial_running_xacts = surviving_xids;
+	pfree(workspace);
+}
+
+/*
+ * If the given xid is in the list of the initial running xacts, we mark both it
+ * and its subtransactions as containing catalog changes if not yet.
+ */
+void
+ReorderBufferInitialXactsSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
+										   int subxcnt, TransactionId *subxacts,
+										   XLogRecPtr lsn)
+{
+	/*
+	 * Skip if there is no initial running xacts information or the
+	 * transaction is already marked as containing catalog changes.
+	 */
+	if (likely(rb->n_initial_running_xacts == 0 ||
+			   ReorderBufferXidHasCatalogChanges(rb, xid)))
+		return;
+
+	/*
+	 * If this committed transaction is the one that was running at the time
+	 * when decoding the first RUNNING_XACTS record and have done catalog
+	 * changes, we can mark both the top transaction and its subtransactions
+	 * as containing catalog changes.
+	 */
+	if (bsearch(&xid, rb->initial_running_xacts, rb->n_initial_running_xacts,
+				sizeof(TransactionId), xidComparator) != NULL)
+	{
+		ReorderBufferXidSetCatalogChanges(rb, xid, lsn);
+
+		for (int i = 0; i < subxcnt; i++)
+		{
+			ReorderBufferAssignChild(rb, xid, subxacts[i], lsn);
+			ReorderBufferXidSetCatalogChanges(rb, subxacts[i], lsn);
+		}
+	}
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index ba257d81b5..fe0f52d4e1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -12,6 +12,7 @@
 #include "access/htup_details.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
+#include "storage/standby.h"
 #include "utils/hsearch.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -589,6 +590,35 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	/*
+	 * Array of transactions and subtransactions that were running when
+	 * the xl_running_xacts record that we decoded first was written.
+	 * The array is sorted in xidComparator order. Xids are removed from
+	 * the array when decoding xl_running_xacts record, and then the array
+	 * eventually becomes an empty.
+	 *
+	 * We rely on HEAP2_NEW_CID records and XACT_INVALIDATIONS to know
+	 * if the transaction has changed the catalog, and that information
+	 * is not serialized to SnapBuilder. Therefore, if the logical
+	 * decoding decodes the commit record of the transaction that actually
+	 * has done catalog changes without these records, we miss to add
+	 * the xid to the snapshot, and end up looking at catalogs with the
+	 * wrong snapshot. To avoid this problem, if the COMMIT record of
+	 * the xid listed in initial_running_xacts has XACT_XINFO_HAS_INVALS
+	 * flag, we mark both the top transaction and its substransactions
+	 * as containing catalog changes.
+	 *
+	 * We could end up adding the transaction that didn't change catalog
+	 * to the snapshot since we cannot distinguish whether the transaction
+	 * has catalog changes only by checking the COMMIT record. It doesn't
+	 * have the information on which (sub) transaction has catalog changes,
+	 * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the
+	 * transaction has catalog change. But it doesn't become a problem since
+	 * we use historic snapshot only for reading system catalogs.
+	 */
+	TransactionId *initial_running_xacts;
+	int n_initial_running_xacts;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
@@ -678,4 +708,10 @@ void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 void		StartupReorderBuffer(void);
 
+void		ReorderBufferProcessInitialXacts(ReorderBuffer *rb,
+											 xl_running_xacts *running);
+void		ReorderBufferInitialXactsSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
+													   int subxcnt,
+													   TransactionId *subxacts,
+													   XLogRecPtr lsn);
 #endif
-- 
2.24.3 (Apple Git-128)

