Hello,

I've discovered a couple of bugs in logical decoding code, both leading
to incorrect decoding results in somewhat rare cases. First, xmin of
slots is advanced too early. This affects the results only when
interlocking allows to perform DDL concurrently with looking at the
schema. In fact, I was not aware about such DDL until at
https://www.postgresql.org/message-id/flat/87tvu0p0jm.fsf%40ars-thinkpad#87tvu0p0jm.fsf@ars-thinkpad
I raised this question and Andres pointed out ALTER of composite
types. Probably there are others, I am not sure; it would be interesting
to know them.

Another problem is that new snapshots are never queued to known
subxacts. It means decoding results can be wrong if toplevel doesn't
write anything while subxact does.

Please see detailed description of the issues, tests which reproduce
them and fixes in the attached patch.

--
Arseny Sher
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

>From 0d03ef172a29ce64a6c5c26f484f0f3186895125 Mon Sep 17 00:00:00 2001
From: Arseny Sher <sher-...@yandex.ru>
Date: Sat, 7 Apr 2018 08:22:30 +0300
Subject: [PATCH] Fix slot's xmin advancement and subxact's lost snapshots in
 decoding.

There are two in some way related bugs here. First, xmin of logical slots was
advanced too early. During xl_running_xacts processing, xmin of slot was set to
oldest running xid in the record, which is wrong: actually snapshots which will
be used for not-yet-replayed at this moment xacts might consider older xacts as
running too. The problem wasn't noticed earlier because DDL which allows to
delete tuple (set xmax) while some another not yet committed xact looks at it
is pretty rare, if not unique: e.g. all forms of ALTER TABLE which change schema
acquire ACCESS EXCLUSIVE lock conflicting with any inserts. Included test
case uses ALTER of a composite type, which doesn't have such interlocking.

To deal with this, we must be able to quickly retrieve oldest xmin
(oldest running xid among all assigned snapshots) from ReorderBuffer. Proposed
fix adds yet another list of ReorderBufferTXNs to it, where xacts are sorted by
LSN of the base snapshot, because
 * Plain usage of current ReorderBufferGetOldestTXN (to get base snapshot's
   xmin) is wrong, because order by LSN of the first record is *not* the same as
   order by LSN of record on which base snapshot was assigned: many record types
   don't need the snapshot, e.g. xl_xact_assignment. We could remember
   snapbuilder's xmin during processing of the first record, but that's too
   conservative and we would keep more tuples from vacuuming than needed.
 * On the other hand, we can't fully replace existing order by LSN of the first
   change with LSN of the base snapshot, because if we do that, WAL records
   which didn't forced receival of base snap might be recycled before we
   replay the xact, as ReorderBufferGetOldestTXN determines which LSN still must
   be kept.

Second issue concerns subxacts. Currently SnapBuildDistributeNewCatalogSnapshot
never spares a snapshot to known subxact. It means that if toplevel doesn't have
base snapshot (never did writes), there was an assignment record (subxact is
known) and subxact is writing, no new snapshots are being queued. Probably the
simplest fix would be to employ freshly baked by_base_snapshot_lsn list and just
distribute snapshot to all xacts with base snapshot whether they are subxacts or
not. However, in this case we would queue unneccessary snapshot to all already
known subxacts. To avoid this, in this patch base snapshot of known subxact
is immediately passed to the toplevel as soon as we learn the kinship / base
snap assigned -- we have to do that anyway in ReorderBufferCommitChild; new
snaps are distributed only to top-level xacts (or unknown subxacts) as before.

Also, minor memory leak is fixed: counter of toplevel's old base
snapshot was not decremented when snap has been passed from child.
---
 contrib/test_decoding/Makefile                     |   3 +-
 contrib/test_decoding/expected/oldest_xmin.out     |  27 +++
 .../test_decoding/expected/subxact_snap_pass.out   |  49 ++++++
 contrib/test_decoding/specs/oldest_xmin.spec       |  37 +++++
 contrib/test_decoding/specs/subxact_snap_pass.spec |  42 +++++
 src/backend/replication/logical/reorderbuffer.c    | 183 ++++++++++++++-------
 src/backend/replication/logical/snapbuild.c        |  15 +-
 src/include/replication/reorderbuffer.h            |  15 +-
 8 files changed, 304 insertions(+), 67 deletions(-)
 create mode 100644 contrib/test_decoding/expected/oldest_xmin.out
 create mode 100644 contrib/test_decoding/expected/subxact_snap_pass.out
 create mode 100644 contrib/test_decoding/specs/oldest_xmin.spec
 create mode 100644 contrib/test_decoding/specs/subxact_snap_pass.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 6c18189d9d..c696288d67 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -50,7 +50,8 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install
 	$(pg_regress_installcheck) \
 	    $(REGRESSCHECKS)
 
-ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml
+ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \
+	oldest_xmin subxact_snap_pass
 
 isolationcheck: | submake-isolation submake-test_decoding temp-install
 	$(pg_isolation_regress_check) \
diff --git a/contrib/test_decoding/expected/oldest_xmin.out b/contrib/test_decoding/expected/oldest_xmin.out
new file mode 100644
index 0000000000..d09342c4be
--- /dev/null
+++ b/contrib/test_decoding/expected/oldest_xmin.out
@@ -0,0 +1,27 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_get_changes s1_commit s0_vacuum s0_get_changes
+step s0_begin: BEGIN;
+step s0_getxid: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s1_begin: BEGIN;
+step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
+step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
+step s0_commit: COMMIT;
+step s0_checkpoint: CHECKPOINT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+step s1_commit: COMMIT;
+step s0_vacuum: VACUUM FULL;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
+COMMIT         
+?column?       
+
+stop           
diff --git a/contrib/test_decoding/expected/subxact_snap_pass.out b/contrib/test_decoding/expected/subxact_snap_pass.out
new file mode 100644
index 0000000000..87bed03f76
--- /dev/null
+++ b/contrib/test_decoding/expected/subxact_snap_pass.out
@@ -0,0 +1,49 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub0 s0_commit s0_get_changes
+step s0_begin: BEGIN;
+step s0_begin_sub0: SAVEPOINT s0;
+step s0_log_assignment: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
+step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
+step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
+step s0_end_sub0: RELEASE SAVEPOINT s0;
+step s0_commit: COMMIT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.dummy: INSERT: i[integer]:0
+table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3
+COMMIT         
+?column?       
+
+stop           
+
+starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_begin_sub1 s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub1 s0_end_sub0 s0_commit s0_get_changes
+step s0_begin: BEGIN;
+step s0_begin_sub0: SAVEPOINT s0;
+step s0_log_assignment: SELECT txid_current() IS NULL;
+?column?       
+
+f              
+step s0_begin_sub1: SAVEPOINT s1;
+step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
+step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
+step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
+step s0_end_sub1: RELEASE SAVEPOINT s1;
+step s0_end_sub0: RELEASE SAVEPOINT s0;
+step s0_commit: COMMIT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data           
+
+BEGIN          
+table public.dummy: INSERT: i[integer]:0
+table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3
+COMMIT         
+?column?       
+
+stop           
diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec
new file mode 100644
index 0000000000..4f8af70aa2
--- /dev/null
+++ b/contrib/test_decoding/specs/oldest_xmin.spec
@@ -0,0 +1,37 @@
+# Test advancement of the slot's oldest xmin
+
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+    DROP TYPE IF EXISTS basket;
+    CREATE TYPE basket AS (apples integer, pears integer, mangos integer);
+    DROP TABLE IF EXISTS harvest;
+    CREATE TABLE harvest(fruits basket);
+}
+
+teardown
+{
+    DROP TABLE IF EXISTS harvest;
+    DROP TYPE IF EXISTS basket;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+step "s0_begin" { BEGIN; }
+step "s0_getxid" { SELECT txid_current() IS NULL; }
+step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; }
+step "s0_commit" { COMMIT; }
+step "s0_checkpoint" { CHECKPOINT; }
+step "s0_vacuum" { VACUUM FULL; }
+step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+step "s1_begin" { BEGIN; }
+step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); }
+step "s1_commit" { COMMIT; }
+
+# Checkpoint with following get_changes forces to advance xmin. ALTER of a
+# composite type is a rare form of DDL which allows T1 to see the tuple which
+# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
+# forbid modifying catalog after someone read it (and didn't commit yet).
+permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes"
diff --git a/contrib/test_decoding/specs/subxact_snap_pass.spec b/contrib/test_decoding/specs/subxact_snap_pass.spec
new file mode 100644
index 0000000000..d2d88176ac
--- /dev/null
+++ b/contrib/test_decoding/specs/subxact_snap_pass.spec
@@ -0,0 +1,42 @@
+# Test passing snapshot from subxact to top-level and receival of later snaps.
+
+setup
+{
+    SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+    DROP TABLE IF EXISTS dummy;
+    CREATE TABLE dummy(i int);
+    DROP TABLE IF EXISTS harvest;
+    CREATE TABLE harvest(apples int, pears int);
+}
+
+teardown
+{
+    DROP TABLE IF EXISTS harvest;
+    DROP TABLE IF EXISTS dummy;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+step "s0_begin" { BEGIN; }
+step "s0_begin_sub0" { SAVEPOINT s0; }
+step "s0_log_assignment" { SELECT txid_current() IS NULL; }
+step "s0_begin_sub1" { SAVEPOINT s1; }
+step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); }
+step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); }
+step "s0_end_sub0" { RELEASE SAVEPOINT s0; }
+step "s0_end_sub1" { RELEASE SAVEPOINT s1; }
+step "s0_insert2" { INSERT INTO harvest VALUES (1, 2, 3, 4); }
+step "s0_commit" { COMMIT; }
+step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; }
+
+# start top-level without base snap, get base snap in subxact, then create new
+# snap and make sure it is queued.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0" "s0_commit" "s0_get_changes"
+
+# In previous test, we firstly associated subxact with xact and only then got
+# base snap; now nest one more subxact to get snap first and only then (at
+# commit) associate it with toplevel.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b4016ed52b..1f99697dbb 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
 					  TransactionId xid, bool create, bool *is_new,
 					  XLogRecPtr lsn, bool create_as_top);
+static void ReorderBufferPassBaseSnapshot(ReorderBufferTXN *txn,
+										  ReorderBufferTXN *subtxn);
 
 static void AssertTXNLsnOrder(ReorderBuffer *rb);
 
@@ -272,6 +274,8 @@ ReorderBufferAllocate(void)
 
 	dlist_init(&buffer->toplevel_by_lsn);
 
+	dlist_init(&buffer->by_base_snapshot_lsn);
+
 	/*
 	 * Ensure there's no stale data from prior uses of this slot, in case some
 	 * prior exit avoided calling ReorderBufferFree. Failure to do this can
@@ -470,7 +474,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
 	bool		found;
 
 	Assert(TransactionIdIsValid(xid));
-	Assert(!create || lsn != InvalidXLogRecPtr);
 
 	/*
 	 * Check the one-entry lookup cache first
@@ -514,6 +517,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
 	{
 		/* initialize the new entry, if creation was requested */
 		Assert(ent != NULL);
+		Assert(lsn != InvalidXLogRecPtr);
 
 		ent->txn = ReorderBufferGetTXN(rb);
 		ent->txn->xid = xid;
@@ -659,6 +663,27 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb)
 	return txn;
 }
 
+/*
+ * Returns oldest possibly running xid from POV of snapshots used in xacts
+ * ReorderBuffer currently keeps, or InvalidTransactionId if there are none.
+ * Since snapshots are assigned monotonically, it is enough to check xmin of
+ * base snapshot with minimal base_snapshot_lsn.
+ */
+TransactionId
+ReorderBufferGetOldestXmin(ReorderBuffer *rb)
+{
+	ReorderBufferTXN *txn;
+
+	if (dlist_is_empty(&rb->by_base_snapshot_lsn))
+		return InvalidTransactionId;
+
+	txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
+							 &rb->by_base_snapshot_lsn);
+
+	Assert(txn->base_snapshot != NULL);
+	return txn->base_snapshot->xmin;
+}
+
 void
 ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
 {
@@ -677,32 +702,71 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
 	subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
 
-	if (new_sub)
+	if (new_top && !new_sub)
+		elog(ERROR, "subxact logged without previous toplevel record");
+
+	if (!new_sub)
 	{
-		/*
-		 * we assign subtransactions to top level transaction even if we don't
-		 * have data for it yet, assignment records frequently reference xids
-		 * that have not yet produced any records. Knowing those aren't top
-		 * level xids allows us to make processing cheaper in some places.
-		 */
-		dlist_push_tail(&txn->subtxns, &subtxn->node);
-		txn->nsubtxns++;
+		if (subtxn->is_known_as_subxact)
+			/* already associated, nothing to do */
+			return;
+		else
+		{
+			/*
+			 * We have seen this subxact previously, but created it as a top.
+			 * Now remove it from lsn order list of top-level transactions.
+			 */
+			dlist_delete(&subtxn->node);
+		}
 	}
-	else if (!subtxn->is_known_as_subxact)
-	{
-		subtxn->is_known_as_subxact = true;
-		Assert(subtxn->nsubtxns == 0);
 
-		/* remove from lsn order list of top-level transactions */
-		dlist_delete(&subtxn->node);
+	subtxn->is_known_as_subxact = true;
+	subtxn->toplevel_xid = xid;
+	Assert(subtxn->nsubtxns == 0);
 
-		/* add to toplevel transaction */
-		dlist_push_tail(&txn->subtxns, &subtxn->node);
-		txn->nsubtxns++;
-	}
-	else if (new_top)
+	/* add to subtransaction list */
+	dlist_push_tail(&txn->subtxns, &subtxn->node);
+	txn->nsubtxns++;
+
+	/*
+	 * If subxact already got base snapshot, pass it to toplevel.
+	 * This allows to queue further base snapshots only to toplevel.
+	 */
+	ReorderBufferPassBaseSnapshot(txn, subtxn);
+}
+
+/*
+ * Pass base snapshot of subtxn to the toplevel txn if it doesn't have one, or
+ * subtxn's is earlier. That can happen if there are no changes in the
+ * toplevel transaction but in one of the child transactions (or first change
+ * in subxact has earlier lsn than first change in xact and we learned about
+ * xacts relationship only now). This is required for correct decoding as we
+ * replay all subxacts in one shot with toplevel. We do that as soon as we
+ * become aware of the kinship to avoid queueing extra snapshots to known
+ * subxacts -- only toplevel will receive further snaps.
+ */
+static void
+ReorderBufferPassBaseSnapshot(ReorderBufferTXN *txn, ReorderBufferTXN *subtxn)
+{
+	if (subtxn->base_snapshot != NULL &&
+		(txn->base_snapshot == NULL ||
+		 txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
 	{
-		elog(ERROR, "existing subxact assigned to unknown toplevel xact");
+		/* toplevel has base snapshot, but it's too fresh; purge it */
+		if (txn->base_snapshot != NULL)
+		{
+			SnapBuildSnapDecRefcount(txn->base_snapshot);
+			dlist_delete(&txn->base_snapshot_node);
+		}
+
+		txn->base_snapshot = subtxn->base_snapshot;
+		txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
+		subtxn->base_snapshot = NULL;
+		subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+		/* We must preserve the correct place in by_base_snapshot_lsn list */
+		dlist_insert_before(&subtxn->base_snapshot_node, &txn->base_snapshot_node);
+		/* Now remove subtxn from it */
+		dlist_delete(&subtxn->base_snapshot_node);
 	}
 }
 
@@ -715,7 +779,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
 						 TransactionId subxid, XLogRecPtr commit_lsn,
 						 XLogRecPtr end_lsn)
 {
-	ReorderBufferTXN *txn;
 	ReorderBufferTXN *subtxn;
 
 	subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
@@ -727,42 +790,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
 	if (!subtxn)
 		return;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
-
-	if (txn == NULL)
-		elog(ERROR, "subxact logged without previous toplevel record");
-
-	/*
-	 * Pass our base snapshot to the parent transaction if it doesn't have
-	 * one, or ours is older. That can happen if there are no changes in the
-	 * toplevel transaction but in one of the child transactions. This allows
-	 * the parent to simply use its base snapshot initially.
-	 */
-	if (subtxn->base_snapshot != NULL &&
-		(txn->base_snapshot == NULL ||
-		 txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
-	{
-		txn->base_snapshot = subtxn->base_snapshot;
-		txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
-		subtxn->base_snapshot = NULL;
-		subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
-	}
-
 	subtxn->final_lsn = commit_lsn;
 	subtxn->end_lsn = end_lsn;
 
-	if (!subtxn->is_known_as_subxact)
-	{
-		subtxn->is_known_as_subxact = true;
-		Assert(subtxn->nsubtxns == 0);
-
-		/* remove from lsn order list of top-level transactions */
-		dlist_delete(&subtxn->node);
-
-		/* add to subtransaction list */
-		dlist_push_tail(&txn->subtxns, &subtxn->node);
-		txn->nsubtxns++;
-	}
+	/*
+	 * We already checked subtxn existence and no ReorderBufferTXNs will be
+	 * created in this call, so lsn doesn't matter at all.
+	 */
+	ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
 }
 
 
@@ -1091,6 +1126,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		SnapBuildSnapDecRefcount(txn->base_snapshot);
 		txn->base_snapshot = NULL;
 		txn->base_snapshot_lsn = InvalidXLogRecPtr;
+		dlist_delete(&txn->base_snapshot_node);
 	}
 
 	/*
@@ -1840,6 +1876,9 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
  * Setup the base snapshot of a transaction. The base snapshot is the snapshot
  * that is used to decode all changes until either this transaction modifies
  * the catalog or another catalog modifying transaction commits.
+ * If we know that xid is subxact, we just make sure that toplevel has
+ * the base snapshot. If toplevel already has some base snapshot, it
+ * definitely has also passed snap as its base or queued in its change queue.
  *
  * Needs to be called before any changes are added with
  * ReorderBufferQueueChange().
@@ -1855,8 +1894,18 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
 	Assert(txn->base_snapshot == NULL);
 	Assert(snap != NULL);
 
+	if (txn->is_known_as_subxact)
+	{
+		/* Will operate on toplevel */
+		txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+									NULL, InvalidXLogRecPtr, false);
+		Assert(txn != NULL);
+		Assert(txn->base_snapshot == NULL);
+	}
+
 	txn->base_snapshot = snap;
 	txn->base_snapshot_lsn = lsn;
+	dlist_push_tail(&rb->by_base_snapshot_lsn, &txn->base_snapshot_node);
 }
 
 /*
@@ -1976,6 +2025,9 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
 
 /*
  * Have we already added the first snapshot?
+ * If xid is known subxact, we check toplevel, as known subxacts never keep
+ * base snapshot themselves. This allows to queue new snapshots only to the
+ * toplevel instead of duplicating them in change queues of subxacts.
  */
 bool
 ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
@@ -1989,12 +2041,19 @@ ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
 	if (txn == NULL)
 		return false;
 
-	/*
-	 * TODO: It would be a nice improvement if we would check the toplevel
-	 * transaction in subtransactions, but we'd need to keep track of a bit
-	 * more state.
-	 */
-	return txn->base_snapshot != NULL;
+	if (txn->is_known_as_subxact)
+	{
+		ReorderBufferTXN *toplevel_txn;
+
+		Assert(txn->base_snapshot == NULL);
+		toplevel_txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+											 NULL, InvalidXLogRecPtr, false);
+		Assert(toplevel_txn != NULL);
+		return toplevel_txn->base_snapshot != NULL;
+
+	}
+	else
+		return txn->base_snapshot != NULL;
 }
 
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 4123cdebcf..7f344bdced 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -830,7 +830,7 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 		 * all. We'll add a snapshot when the first change gets queued.
 		 *
 		 * NB: This works correctly even for subtransactions because
-		 * ReorderBufferCommitChild() takes care to pass the parent the base
+		 * ReorderBufferAssignChild() takes care to pass the parent the base
 		 * snapshot, and while iterating the changequeue we'll get the change
 		 * from the subtxn.
 		 */
@@ -1094,6 +1094,7 @@ void
 SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
 {
 	ReorderBufferTXN *txn;
+	TransactionId oldest_xmin;
 
 	/*
 	 * If we're not consistent yet, inspect the record to see whether it
@@ -1132,9 +1133,17 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 
 	/*
 	 * Increase shared memory limits, so vacuum can work on tuples we
-	 * prevented from being pruned till now.
+	 * prevented from being pruned till now. We ask reorderbuffer which
+	 * minimal xid might still be running in snapshots of xacts currently
+	 * being reordered. If rb doesn't have any xacts with snapshots, we need
+	 * to care only about xids which will be considered as running by
+	 * snapshots we will produce later, and min of them can't be less than
+	 * oldest running xid in the record we are reading.
 	 */
-	LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
+	oldest_xmin = ReorderBufferGetOldestXmin(builder->reorder);
+	if (oldest_xmin == InvalidTransactionId)
+		oldest_xmin = running->oldestRunningXid;
+	LogicalIncreaseXminForSlot(lsn, oldest_xmin);
 
 	/*
 	 * Also tell the slot where we can restart decoding from. We don't want to
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index aa430c843c..47178adf68 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -148,9 +148,10 @@ typedef struct ReorderBufferTXN
 	bool		has_catalog_changes;
 
 	/*
-	 * Do we know this is a subxact?
+	 * Do we know this is a subxact? Xid of top-level, if yes.
 	 */
 	bool		is_known_as_subxact;
+	TransactionId	toplevel_xid;
 
 	/*
 	 * LSN of the first data carrying, WAL record with knowledge about this
@@ -200,6 +201,7 @@ typedef struct ReorderBufferTXN
 	 */
 	Snapshot	base_snapshot;
 	XLogRecPtr	base_snapshot_lsn;
+	dlist_node	base_snapshot_node; /* position in by_base_snapshot_lsn list */
 
 	/*
 	 * How many ReorderBufferChange's do we have in this txn.
@@ -317,6 +319,16 @@ struct ReorderBuffer
 	dlist_head	toplevel_by_lsn;
 
 	/*
+	 * Transactions (or subxacts) that have base snapshot, ordered by LSN of
+	 * the first record which forced us to set the base snapshot. Used for
+	 * advancing slot's xmin: xmin of the first snapshot defines which xid we
+	 * still need to consider as running. This is not the same as
+	 * toplevel_by_lsn: we set snapshot only on data-carrying record, e.g.
+	 * heap insert/update/delete, not just the first.
+	 */
+	dlist_head	by_base_snapshot_lsn;
+
+	/*
 	 * one-entry sized cache for by_txn. Very frequently the same txn gets
 	 * looked up over and over again.
 	 */
@@ -400,6 +412,7 @@ bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
+TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
-- 
2.11.0

Reply via email to