From 6d3c0bc289a6f0fc6dd8fb9a980ef4cfd42e346b Mon Sep 17 00:00:00 2001
From: "kuroda.hayato%40jp.fujitsu.com" <kuroda.hayato@jp.fujitsu.com>
Date: Tue, 6 Sep 2022 05:08:33 +0000
Subject: [PATCH v3] Fix assertion failure during logical decoding.

In SnapBuildCommitTxn(), we assumed that the top transaction needs timetravel
too if one of sub transactions need that.

Normally this assumption was correct, but in some cases the necessity of
timetravel was inconsistent between top and sub transactions. This was caused
because the relation between transactions might not be recorded to the
reorder buffer when CHECKPOINT was occurred.

To fix this problem, this forces the timetravel for the top transaction if one
of sub transactions needs that.

Thanks to off-list discussion with Takamichi Osumi.
---
 .../expected/catalog_change_snapshot.out      | 45 +++++++++++++++++++
 .../specs/catalog_change_snapshot.spec        |  9 ++++
 src/backend/replication/logical/snapbuild.c   |  3 ++
 3 files changed, 57 insertions(+)

diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index d2a4bdfcc1..e36f417e4a 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -96,3 +96,48 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_truncate s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_truncate: TRUNCATE tbl1;
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+table public.tbl1: TRUNCATE: (no-flags)                      
+COMMIT                                                       
+(4 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index ff8f68489b..7ff28ed7e8 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -57,3 +57,12 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s
 # checkpoint record it prunes one of the xacts in that list and when decoding the
 # next checkpoint, it will completely prune that list.
 permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can handle the case where only subtransaction is marked as
+# containing catalog changes. The last decoding starts from NEW_CID generated
+# by "s0_truncate" and marks only the subtransaction as containing catalog
+# changes but we don't create the association between top-level transaction and
+# subtransaction yet. When decoding the commit record of the top-level
+# transaction, we must force the top-level transaction to do timetravel since
+# one of its subtransactions is marked as containing catalog changes.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 54499c06fe..f892d19bfb 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1099,6 +1099,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	else if (sub_needs_timetravel)
 	{
 		/* track toplevel txn as well, subxact alone isn't meaningful */
+		elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
+			 xid);
+		needs_timetravel = true;
 		SnapBuildAddCommittedTxn(builder, xid);
 	}
 	else if (needs_timetravel)
-- 
2.27.0

