From 201936ed1238951e8a13b67da2507e1ea3769537 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 5 Jan 2021 03:30:11 -0500
Subject: [PATCH v36] Support 2PC consistent snapshot isolation tests.

Added isolation test-case to test that if a consistent snapshot is created
between a PREPARE and a COMMIT PREPARED, then the whole transaction is decoded
on COMMIT PREPARED.
---
 contrib/test_decoding/Makefile                     |  3 +-
 .../test_decoding/expected/twophase_snapshot.out   | 41 +++++++++++++++++++
 contrib/test_decoding/specs/twophase_snapshot.spec | 47 ++++++++++++++++++++++
 3 files changed, 90 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/twophase_snapshot.out
 create mode 100644 contrib/test_decoding/specs/twophase_snapshot.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 76d4a69..c5e28ce 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -7,7 +7,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
 	spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
-	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
+	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
+	twophase_snapshot
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/twophase_snapshot.out b/contrib/test_decoding/expected/twophase_snapshot.out
new file mode 100644
index 0000000..3b7f23b
--- /dev/null
+++ b/contrib/test_decoding/expected/twophase_snapshot.out
@@ -0,0 +1,41 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s2b s2txid s1init s3b s3txid s2c s2b s2insert s2p s3c s1insert s1start s2cp s1start
+step s2b: BEGIN;
+step s2txid: SELECT pg_current_xact_id() IS NULL;
+?column?       
+
+f              
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
+step s3b: BEGIN;
+step s3txid: SELECT pg_current_xact_id() IS NULL;
+?column?       
+
+f              
+step s2c: COMMIT;
+step s2b: BEGIN;
+step s2insert: INSERT INTO do_write DEFAULT VALUES;
+step s2p: PREPARE TRANSACTION 'test1';
+step s3c: COMMIT;
+step s1init: <... completed>
+?column?       
+
+init           
+step s1insert: INSERT INTO do_write DEFAULT VALUES;
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'two-phase-commit', '1');
+data           
+
+BEGIN          
+table public.do_write: INSERT: id[integer]:2
+COMMIT         
+step s2cp: COMMIT PREPARED 'test1';
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'two-phase-commit', '1');
+data           
+
+BEGIN          
+table public.do_write: INSERT: id[integer]:1
+PREPARE TRANSACTION 'test1'
+COMMIT PREPARED 'test1'
+?column?       
+
+stop           
diff --git a/contrib/test_decoding/specs/twophase_snapshot.spec b/contrib/test_decoding/specs/twophase_snapshot.spec
new file mode 100644
index 0000000..1127d89
--- /dev/null
+++ b/contrib/test_decoding/specs/twophase_snapshot.spec
@@ -0,0 +1,47 @@
+# Test decoding of two-phase transactions during the build of a consistent snapshot.
+setup
+{
+    DROP TABLE IF EXISTS do_write;
+    CREATE TABLE do_write(id serial primary key);
+}
+
+teardown
+{
+    DROP TABLE do_write;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
+step "s1start" {SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'two-phase-commit', '1');}
+step "s1insert" { INSERT INTO do_write DEFAULT VALUES; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2b" { BEGIN; }
+step "s2txid" { SELECT pg_current_xact_id() IS NULL; }
+step "s2c" { COMMIT; }
+step "s2insert" { INSERT INTO do_write DEFAULT VALUES; }
+step "s2p" { PREPARE TRANSACTION 'test1'; }
+step "s2cp" { COMMIT PREPARED 'test1'; }
+
+
+session "s3"
+setup { SET synchronous_commit=on; }
+
+step "s3b" { BEGIN; }
+step "s3txid" { SELECT pg_current_xact_id() IS NULL; }
+step "s3c" { COMMIT; }
+
+# 's1init' step will initialize the replication slot and cause logical decoding to wait
+# in initial starting point till the in-progress transaction in s2 is committed.
+# 's2c' step will cause logical decoding to go to initial consistent point and wait for
+# in-progress transaction in s3 to commit.
+# 's3c' step will cause logical decoding to find a consistent point while the transaction 
+# in s2 is prepared and not yet committed.
+# After the 's2cp' step, ensure that the whole transaction is decoded fresh at 's1start'.
+permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2c" "s2b" "s2insert" "s2p" "s3c" "s1insert" "s1start" "s2cp" "s1start"
-- 
1.8.3.1

