On Fri, Jul 29, 2022 at 12:15 PM Amit Kapila <[email protected]> wrote:
>
> >
> > Yeah, your description makes sense to me. I've also considered how to
> > hit this path but I guess it is never hit. Thinking of it in another
> > way, first of all, at least 2 catalog modifying transactions have to
> > be running while writing a xl_running_xacts. The serialized snapshot
> > that is written when we decode the first xl_running_xact has two
> > transactions. Then, one of them is committed before the second
> > xl_running_xacts. The second serialized snapshot has only one
> > transaction. Then, the transaction is also committed after that. Now,
> > in order to execute the path, we need to start decoding from the first
> > serialized snapshot. However, if we start from there, we cannot decode
> > the full contents of the transaction that was committed later.
> >
>
> I think then we should change this code in the master branch patch
> with an additional comment on the lines of: "Either all the xacts got
> purged or none. It is only possible to partially remove the xids from
> this array if one or more of the xids are still running but not all.
> That can happen if we start decoding from a point (LSN where the
> snapshot state became consistent) where all the xacts in this were
> running and then at least one of those got committed and a few are
> still running. We will never start from such a point because we won't
> move the slot's restart_lsn past the point where the oldest running
> transaction's restart_decoding_lsn is."
>
Unfortunately, this theory doesn't turn out to be true. While
investigating the latest buildfarm failure [1], I see that it is
possible that only part of the xacts in the restored catalog modifying
xacts list needs to be purged. See the attached where I have
demonstrated it via a reproducible test. It seems the point we were
missing was that to start from a point where two or more catalog
modifying were serialized, it requires another open transaction before
both get committed, and then we need the checkpoint or other way to
force running_xacts record in-between the commit of initial two
catalog modifying xacts. There could possibly be other ways as well
but the theory above wasn't correct.
[1] -
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=curculio&dt=2022-08-25%2004%3A15%3A34
--
With Regards,
Amit Kapila.
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index dc4f9b7..d2a4bdf 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
@@ -42,3 +42,57 @@ COMMIT
stop
(1 row)
+
+starting permutation: s0_init s0_begin s0_truncate s2_begin s2_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s2_commit s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_truncate: TRUNCATE tbl1;
+step s2_begin: BEGIN;
+step s2_truncate: TRUNCATE tbl2;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+---------------------------------------
+BEGIN
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT
+(3 rows)
+
+step s2_commit: COMMIT;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+---------------------------------------
+BEGIN
+table public.tbl2: TRUNCATE: (no-flags)
+COMMIT
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT
+(3 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index 2971ddc..ff8f684 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -3,12 +3,15 @@
setup
{
DROP TABLE IF EXISTS tbl1;
+ DROP TABLE IF EXISTS tbl2;
CREATE TABLE tbl1 (val1 integer, val2 integer);
+ CREATE TABLE tbl2 (val1 integer, val2 integer);
}
teardown
{
DROP TABLE tbl1;
+ DROP TABLE tbl2;
SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
}
@@ -26,6 +29,12 @@ setup { SET synchronous_commit=on; }
step "s1_checkpoint" { CHECKPOINT; }
step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_begin" { BEGIN; }
+step "s2_truncate" { TRUNCATE tbl2; }
+step "s2_commit" { COMMIT; }
+
# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
# during the first checkpoint execution. This transaction must be marked as
@@ -37,3 +46,14 @@ step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_
# record written by bgwriter. One might think we can either stop the bgwriter or
# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can purge the old catalog modifying transactions after restoring
+# them from the serialized snapshot. The first checkpoint will serialize the list
+# of two catalog modifying xacts. The purpose of the second checkpoint is to allow
+# partial pruning of the list of catalog modifying xact. The third checkpoint
+# followed by get_changes establishes a restart_point at the first checkpoint LSN.
+# The last get_changes will start decoding from the first checkpoint which
+# restores the list of catalog modifying xacts and then while decoding the second
+# checkpoint record it prunes one of the xacts in that list and when decoding the
+# next checkpoint, it will completely prune that list.
+permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 1ff2c12..cbf16c0 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -969,28 +969,40 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder)
pfree(workspace);
/*
- * Either all the xacts got purged or none. It is only possible to
- * partially remove the xids from this array if one or more of the xids
- * are still running but not all. That can happen if we start decoding
- * from a point (LSN where the snapshot state became consistent) where all
- * the xacts in this were running and then at least one of those got
- * committed and a few are still running. We will never start from such a
- * point because we won't move the slot's restart_lsn past the point where
- * the oldest running transaction's restart_decoding_lsn is.
+ * Purge xids in ->catchange as well. The purged array must also be
+ * sorted in xidComparator order.
*/
- if (builder->catchange.xcnt == 0 ||
- TransactionIdFollowsOrEquals(builder->catchange.xip[0],
- builder->xmin))
- return;
+ if (builder->catchange.xcnt > 0)
+ {
+ /*
+ * Since catchange.xip is sorted, we find the lower bound of xids that
+ * are still interesting.
+ */
+ for (off = 0; off < builder->catchange.xcnt; off++)
+ {
+ if (NormalTransactionIdPrecedes(builder->catchange.xip[off],
+ builder->xmin))
+ break;
+ }
- Assert(TransactionIdFollows(builder->xmin,
- builder->catchange.xip[builder->catchange.xcnt - 1]));
- pfree(builder->catchange.xip);
- builder->catchange.xip = NULL;
- builder->catchange.xcnt = 0;
+ surviving_xids = builder->catchange.xcnt - off;
- elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
- builder->xmin);
+ if (surviving_xids > 0)
+ {
+ memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
+ surviving_xids * sizeof(TransactionId));
+ }
+ else
+ {
+ pfree(builder->catchange.xip);
+ builder->catchange.xip = NULL;
+ }
+
+ elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
+ (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
+ builder->xmin, builder->xmax);
+ builder->catchange.xcnt = surviving_xids;
+ }
}
/*