From 8792a530e48196247c52acd4e9c0dc74471ceae5 Mon Sep 17 00:00:00 2001
From: "kuroda.hayato%40jp.fujitsu.com" <kuroda.hayato@jp.fujitsu.com>
Date: Tue, 6 Sep 2022 05:08:33 +0000
Subject: [PATCH] Fix assertion failure during logical decoding.

In SnapBuildCommitTxn(), we assumed that the top transaction needs timetravel
too if one of sub transactions need that.

Normally this assumption was correct, but in some cases the necessity of
timetravel was inconsistent between top and sub transactions. This was caused
because the relation between transactions might not be recorded to the
reorder buffer when CHECKPOINT was occurred.

To fix this problem, this forces the timetravel for the top transaction if one
of sub transactions needs that.

Thanks to off-list discussion with Takamichi Osumi.
---
 .../expected/catalog_change_snapshot.out      | 45 +++++++++++++++++++
 .../specs/catalog_change_snapshot.spec        | 10 +++++
 src/backend/replication/logical/snapbuild.c   |  4 +-
 3 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index d2a4bdfcc1..e36f417e4a 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -96,3 +96,48 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_truncate s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_truncate: TRUNCATE tbl1;
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+table public.tbl1: TRUNCATE: (no-flags)                      
+COMMIT                                                       
+(4 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index ff8f68489b..c0eb50698f 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -57,3 +57,13 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s
 # checkpoint record it prunes one of the xacts in that list and when decoding the
 # next checkpoint, it will completely prune that list.
 permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can force the top transaction to do timetravel when one of sub
+# transactions needs that. This is necessary when we restart decoding from RUNNING_XACT
+# without the wal to associate subtransaction to its top transaction. The last decoding
+# starts from the first checkpoint and NEW_CID of "s0_truncate" doesn't mark the top
+# transaction as catalog modifying transaction. In this scenario, the enforcement sets
+# needs_timetravel to true even if the top transaction is regarded as that it does not
+# have catalog changes and thus the decoding works without a contradition that one
+# subtransaction needed timetravel while its top transaction didn't.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 1d8ebb4c0d..9c18d4c4b4 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1098,7 +1098,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	}
 	else if (sub_needs_timetravel)
 	{
-		/* track toplevel txn as well, subxact alone isn't meaningful */
+		elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransaction",
+			 xid);
+		needs_timetravel = true;
 		SnapBuildAddCommittedTxn(builder, xid);
 	}
 	else if (needs_timetravel)
-- 
2.27.0

