On Fri, Jul 29, 2022 at 12:15 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> >
> > Yeah, your description makes sense to me. I've also considered how to
> > hit this path but I guess it is never hit. Thinking of it in another
> > way, first of all, at least 2 catalog modifying transactions have to
> > be running while writing a xl_running_xacts. The serialized snapshot
> > that is written when we decode the first xl_running_xact has two
> > transactions. Then, one of them is committed before the second
> > xl_running_xacts. The second serialized snapshot has only one
> > transaction. Then, the transaction is also committed after that. Now,
> > in order to execute the path, we need to start decoding from the first
> > serialized snapshot. However, if we start from there, we cannot decode
> > the full contents of the transaction that was committed later.
> >
>
> I think then we should change this code in the master branch patch
> with an additional comment on the lines of: "Either all the xacts got
> purged or none. It is only possible to partially remove the xids from
> this array if one or more of the xids are still running but not all.
> That can happen if we start decoding from a point (LSN where the
> snapshot state became consistent) where all the xacts in this were
> running and then at least one of those got committed and a few are
> still running. We will never start from such a point because we won't
> move the slot's restart_lsn past the point where the oldest running
> transaction's restart_decoding_lsn is."
>

Unfortunately, this theory doesn't turn out to be true. While
investigating the latest buildfarm failure [1], I see that it is
possible that only part of the xacts in the restored catalog modifying
xacts list needs to be purged. See the attached where I have
demonstrated it via a reproducible test. It seems the point we were
missing was that to start from a point where two or more catalog
modifying were serialized, it requires another open transaction before
both get committed, and then we need the checkpoint or other way to
force running_xacts record in-between the commit of initial two
catalog modifying xacts. There could possibly be other ways as well
but the theory above wasn't correct.

[1] - 
https://buildfarm.postgresql.org/cgi-bin/show_log.pl?nm=curculio&dt=2022-08-25%2004%3A15%3A34


--
With Regards,
Amit Kapila.
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index dc4f9b7..d2a4bdf 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
 starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
 step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
@@ -42,3 +42,57 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_init s0_begin s0_truncate s2_begin s2_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s2_commit s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_truncate: TRUNCATE tbl1;
+step s2_begin: BEGIN;
+step s2_truncate: TRUNCATE tbl2;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                   
+---------------------------------------
+BEGIN                                  
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT                                 
+(3 rows)
+
+step s2_commit: COMMIT;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                   
+---------------------------------------
+BEGIN                                  
+table public.tbl2: TRUNCATE: (no-flags)
+COMMIT                                 
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index 2971ddc..ff8f684 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -3,12 +3,15 @@
 setup
 {
     DROP TABLE IF EXISTS tbl1;
+    DROP TABLE IF EXISTS tbl2;
     CREATE TABLE tbl1 (val1 integer, val2 integer);
+    CREATE TABLE tbl2 (val1 integer, val2 integer);
 }
 
 teardown
 {
     DROP TABLE tbl1;
+    DROP TABLE tbl2;
     SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
 }
 
@@ -26,6 +29,12 @@ setup { SET synchronous_commit=on; }
 step "s1_checkpoint" { CHECKPOINT; }
 step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_begin" { BEGIN; }
+step "s2_truncate" { TRUNCATE tbl2; }
+step "s2_commit" { COMMIT; }
+
 # For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
 # only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
 # during the first checkpoint execution.  This transaction must be marked as
@@ -37,3 +46,14 @@ step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_
 # record written by bgwriter.  One might think we can either stop the bgwriter or
 # increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
 permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can purge the old catalog modifying transactions after restoring
+# them from the serialized snapshot. The first checkpoint will serialize the list
+# of two catalog modifying xacts. The purpose of the second checkpoint is to allow
+# partial pruning of the list of catalog modifying xact. The third checkpoint
+# followed by get_changes establishes a restart_point at the first checkpoint LSN.
+# The last get_changes will start decoding from the first checkpoint which
+# restores the list of catalog modifying xacts and then while decoding the second
+# checkpoint record it prunes one of the xacts in that list and when decoding the
+# next checkpoint, it will completely prune that list.
+permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 1ff2c12..cbf16c0 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -969,28 +969,40 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder)
 	pfree(workspace);
 
 	/*
-	 * Either all the xacts got purged or none. It is only possible to
-	 * partially remove the xids from this array if one or more of the xids
-	 * are still running but not all. That can happen if we start decoding
-	 * from a point (LSN where the snapshot state became consistent) where all
-	 * the xacts in this were running and then at least one of those got
-	 * committed and a few are still running. We will never start from such a
-	 * point because we won't move the slot's restart_lsn past the point where
-	 * the oldest running transaction's restart_decoding_lsn is.
+	 * Purge xids in ->catchange as well. The purged array must also be
+	 * sorted in xidComparator order.
 	 */
-	if (builder->catchange.xcnt == 0 ||
-		TransactionIdFollowsOrEquals(builder->catchange.xip[0],
-									 builder->xmin))
-		return;
+	if (builder->catchange.xcnt > 0)
+	{
+		/*
+		 * Since catchange.xip is sorted, we find the lower bound of xids that
+		 * are still interesting.
+		 */
+		for (off = 0; off < builder->catchange.xcnt; off++)
+		{
+			if (NormalTransactionIdPrecedes(builder->catchange.xip[off],
+											builder->xmin))
+				break;
+		}
 
-	Assert(TransactionIdFollows(builder->xmin,
-								builder->catchange.xip[builder->catchange.xcnt - 1]));
-	pfree(builder->catchange.xip);
-	builder->catchange.xip = NULL;
-	builder->catchange.xcnt = 0;
+		surviving_xids = builder->catchange.xcnt - off;
 
-	elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
-		 builder->xmin);
+		if (surviving_xids > 0)
+		{
+			memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
+					surviving_xids * sizeof(TransactionId));
+		}
+		else
+		{
+			pfree(builder->catchange.xip);
+			builder->catchange.xip = NULL;
+		}
+
+		elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
+			 (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
+			 builder->xmin, builder->xmax);
+		builder->catchange.xcnt = surviving_xids;
+	}
 }
 
 /*

Reply via email to