From 540c3c289caa9d9b40f6555dccdbc081bf4859da Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Thu, 5 Nov 2020 22:03:33 -0500
Subject: [PATCH v17] Support 2PC test cases for test_decoding.

Add sql and tap tests to test_decoding for 2PC.
---
 contrib/test_decoding/Makefile                     |   4 +-
 contrib/test_decoding/expected/two_phase.out       | 228 +++++++++++++++++++++
 .../test_decoding/expected/two_phase_stream.out    | 177 ++++++++++++++++
 contrib/test_decoding/sql/two_phase.sql            | 119 +++++++++++
 contrib/test_decoding/sql/two_phase_stream.sql     |  63 ++++++
 contrib/test_decoding/t/001_twophase.pl            | 121 +++++++++++
 6 files changed, 711 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/two_phase.out
 create mode 100644 contrib/test_decoding/expected/two_phase_stream.out
 create mode 100644 contrib/test_decoding/sql/two_phase.sql
 create mode 100644 contrib/test_decoding/sql/two_phase_stream.sql
 create mode 100644 contrib/test_decoding/t/001_twophase.pl

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 9a4c76f..49523fe 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -4,11 +4,13 @@ MODULES = test_decoding
 PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
-	decoding_into_rel binary prepared replorigin time messages \
+	decoding_into_rel binary prepared replorigin time two_phase two_phase_stream messages \
 	spill slot truncate stream stats
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream
 
+TAP_TESTS = 1
+
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 
diff --git a/contrib/test_decoding/expected/two_phase.out b/contrib/test_decoding/expected/two_phase.out
new file mode 100644
index 0000000..e5e34b4
--- /dev/null
+++ b/contrib/test_decoding/expected/two_phase.out
@@ -0,0 +1,228 @@
+-- Test two-phased transactions. When two-phase-commit is enabled, transactions are
+-- decoded at PREPARE time rather than at COMMIT PREPARED time.
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+-- Test 1:
+-- Test that commands in a two phase xact are only decoded at PREPARE.
+-- Decoding after COMMIT PREPARED should only have the COMMIT PREPARED command and not the
+-- rest of the commands in the transaction.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+INSERT INTO test_prepared1 VALUES (2);
+-- should show nothing because the xact has not been prepared yet.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+PREPARE TRANSACTION 'test_prepared#1';
+-- should show both the above inserts and the PREPARE TRANSACTION.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:2
+ PREPARE TRANSACTION 'test_prepared#1'
+(4 rows)
+
+COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
+-- Test 2:
+-- Test that rollback of a prepared xact is decoded.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(3 rows)
+
+ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                data                 
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
+-- Test 3:
+-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
+BEGIN;
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (4, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation     | locktype |        mode         
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
+-- The insert should show the newly altered column but not the DDL.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                                  data                                   
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(3 rows)
+
+-- Test 4:
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+INSERT INTO test_prepared2 VALUES (5);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:5
+ COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#3'
+(1 row)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (6);
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
+ COMMIT
+(6 rows)
+
+-- Test 5:
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (8, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (9, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+SELECT 'test_prepared1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation    | locktype |        mode         
+----------------+----------+---------------------
+ test_prepared1 | relation | RowExclusiveLock
+ test_prepared1 | relation | ShareLock
+ test_prepared1 | relation | AccessExclusiveLock
+(3 rows)
+
+-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The call should return
+-- within a second.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                                   data                                    
+---------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+(4 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                 data                 
+--------------------------------------
+ COMMIT PREPARED 'test_prepared_lock'
+(1 row)
+
+-- Test 6:
+-- Test savepoints and sub-xacts. Creating savepoints will create sub-xacts implicitly.
+BEGIN;
+CREATE TABLE test_prepared_savepoint (a int);
+INSERT INTO test_prepared_savepoint VALUES (1);
+SAVEPOINT test_savepoint;
+INSERT INTO test_prepared_savepoint VALUES (2);
+ROLLBACK TO SAVEPOINT test_savepoint;
+PREPARE TRANSACTION 'test_prepared_savepoint';
+-- should show only 1, not 2
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                            data                            
+------------------------------------------------------------
+ BEGIN
+ table public.test_prepared_savepoint: INSERT: a[integer]:1
+ PREPARE TRANSACTION 'test_prepared_savepoint'
+(3 rows)
+
+COMMIT PREPARED 'test_prepared_savepoint';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                   data                    
+-------------------------------------------
+ COMMIT PREPARED 'test_prepared_savepoint'
+(1 row)
+
+-- Test 7:
+-- test that a GID containing "_nodecode" gets decoded at commit prepared time
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                 
+---------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:20 data[text]:null
+ COMMIT
+(3 rows)
+
+-- Test 8:
+-- cleanup and make sure results are also empty
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/expected/two_phase_stream.out b/contrib/test_decoding/expected/two_phase_stream.out
new file mode 100644
index 0000000..957c198
--- /dev/null
+++ b/contrib/test_decoding/expected/two_phase_stream.out
@@ -0,0 +1,177 @@
+-- Test streaming of two-phase commits
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE stream_test(data text);
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column? 
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1';
+-- should show the inserts after a PREPARE
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+                           data                           
+----------------------------------------------------------
+ streaming message: transactional: 1 prefix: test, sz: 50
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ preparing streamed transaction 'test1'
+(24 rows)
+
+COMMIT PREPARED 'test1';
+--should show the COMMIT PREPARED
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+          data           
+-------------------------
+ COMMIT PREPARED 'test1'
+(1 row)
+
+-- streaming test with sub-transaction and PREPARE/ROLLBACK PREPARED
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column? 
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test2';
+-- should show the inserts after a PREPARE
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+                           data                           
+----------------------------------------------------------
+ streaming message: transactional: 1 prefix: test, sz: 50
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ preparing streamed transaction 'test2'
+(24 rows)
+
+ROLLBACK PREPARED 'test2';
+-- should show the ROLLBACK PREPARED
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+           data            
+---------------------------
+ ROLLBACK PREPARED 'test2'
+(1 row)
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with filtered gid
+-- gids with '_nodecode' should not be handled as a two-phase commit.
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column? 
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1_nodecode';
+-- should NOT show inserts after a PREPARE
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+                           data                           
+----------------------------------------------------------
+ streaming message: transactional: 1 prefix: test, sz: 50
+(1 row)
+
+COMMIT PREPARED 'test1_nodecode';
+-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+                            data                             
+-------------------------------------------------------------
+ BEGIN
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
+ table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
+ COMMIT
+(22 rows)
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/two_phase.sql b/contrib/test_decoding/sql/two_phase.sql
new file mode 100644
index 0000000..4ed5266
--- /dev/null
+++ b/contrib/test_decoding/sql/two_phase.sql
@@ -0,0 +1,119 @@
+-- Test two-phased transactions. When two-phase-commit is enabled, transactions are
+-- decoded at PREPARE time rather than at COMMIT PREPARED time.
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Test 1:
+-- Test that commands in a two phase xact are only decoded at PREPARE.
+-- Decoding after COMMIT PREPARED should only have the COMMIT PREPARED command and not the
+-- rest of the commands in the transaction.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+INSERT INTO test_prepared1 VALUES (2);
+-- should show nothing because the xact has not been prepared yet.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+PREPARE TRANSACTION 'test_prepared#1';
+-- should show both the above inserts and the PREPARE TRANSACTION.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 2:
+-- Test that rollback of a prepared xact is decoded.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 3:
+-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
+BEGIN;
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (4, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+-- The insert should show the newly altered column but not the DDL.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 4:
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+INSERT INTO test_prepared2 VALUES (5);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (6);
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 5:
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (8, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (9, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+SELECT 'test_prepared1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The call should return
+-- within a second.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 6:
+-- Test savepoints and sub-xacts. Creating savepoints will create sub-xacts implicitly.
+BEGIN;
+CREATE TABLE test_prepared_savepoint (a int);
+INSERT INTO test_prepared_savepoint VALUES (1);
+SAVEPOINT test_savepoint;
+INSERT INTO test_prepared_savepoint VALUES (2);
+ROLLBACK TO SAVEPOINT test_savepoint;
+PREPARE TRANSACTION 'test_prepared_savepoint';
+-- should show only 1, not 2
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_savepoint';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 7:
+-- test that a GID containing "_nodecode" gets decoded at commit prepared time
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 8:
+-- cleanup and make sure results are also empty
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/two_phase_stream.sql b/contrib/test_decoding/sql/two_phase_stream.sql
new file mode 100644
index 0000000..01510e4
--- /dev/null
+++ b/contrib/test_decoding/sql/two_phase_stream.sql
@@ -0,0 +1,63 @@
+-- Test streaming of two-phase commits
+
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE stream_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1';
+-- should show the inserts after a PREPARE
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1';
+--should show the COMMIT PREPARED
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+-- streaming test with sub-transaction and PREPARE/ROLLBACK PREPARED
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test2';
+-- should show the inserts after a PREPARE
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+ROLLBACK PREPARED 'test2';
+-- should show the ROLLBACK PREPARED
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with filtered gid
+-- gids with '_nodecode' should not be handled as a two-phase commit.
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1_nodecode';
+-- should NOT show inserts after a PREPARE
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1_nodecode';
+-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl
new file mode 100644
index 0000000..1555582
--- /dev/null
+++ b/contrib/test_decoding/t/001_twophase.pl
@@ -0,0 +1,121 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+use Time::HiRes qw(usleep);
+use Scalar::Util qw(looks_like_number);
+
+# Initialize node
+my $node_logical = get_new_node('logical');
+$node_logical->init(allows_streaming => 'logical');
+$node_logical->append_conf(
+        'postgresql.conf', qq(
+        max_prepared_transactions = 10
+));
+$node_logical->start;
+
+# Create some pre-existing content on logical
+$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+$node_logical->safe_psql('postgres',
+	"INSERT INTO tab SELECT generate_series(1,10)");
+$node_logical->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');");
+
+#Test 1:
+# This test is specifically for testing concurrent abort while logical decode
+# is ongoing. We will pass in the xid of the 2PC to the plugin as an option.
+# On the receipt of a valid "check-xid-aborted", the change API in the test decoding
+# plugin will wait for it to be aborted.
+#
+# We will fire off a ROLLBACK from another session when this decode
+# is waiting.
+#
+# The status of "check-xid-aborted" will change from in-progress to not-committed
+# (hence aborted) and we will stop decoding because the subsequent
+# system catalog scan will error out.
+
+$node_logical->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab VALUES (11);
+    INSERT INTO tab VALUES (12);
+    ALTER TABLE tab ADD COLUMN b INT;
+    INSERT INTO tab VALUES (13,14);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+# get XID of the above two-phase transaction
+my $xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test_prepared_tab'");
+is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID');
+
+# start decoding the above by passing the "check-xid-aborted"
+my $logical_connstr = $node_logical->connstr . ' dbname=postgres';
+
+# decode now, it should include an ABORT entry because of the ROLLBACK below
+system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid-aborted', '$xid2pc');\" \&");
+
+# check that decode starts waiting for this $xid2pc
+poll_output_until("waiting for $xid2pc to abort")
+    or die "no wait happened for the abort";
+
+# rollback the prepared transaction
+$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+# check for occurrence of the log about stopping this decoding
+poll_output_until("stopping decoding of test_prepared_tab")
+    or die "no decoding stop for the rollback";
+
+# consume any remaining changes
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');");
+
+# Test 2:
+# Check that commit prepared is decoded properly on immediate restart
+$node_logical->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab VALUES (11);
+    INSERT INTO tab VALUES (12);
+    ALTER TABLE tab ADD COLUMN b INT;
+    INSERT INTO tab VALUES (13, 11);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+# consume changes
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');");
+$node_logical->stop('immediate');
+$node_logical->start;
+
+# commit post the restart
+$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');");
+
+# check inserts are visible
+my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);");
+is($result, qq(3), 'Rows inserted via 2PC are visible on restart');
+
+$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');");
+$node_logical->stop('fast');
+
+sub poll_output_until
+{
+    my ($expected) = @_;
+
+    $expected = 'xxxxxx' unless defined($expected); # default junk value
+
+    my $max_attempts = 180 * 10;
+    my $attempts     = 0;
+
+    my $output_file = '';
+    while ($attempts < $max_attempts)
+    {
+        $output_file = slurp_file($node_logical->logfile());
+
+        if ($output_file =~ $expected)
+        {
+            return 1;
+        }
+
+        # Wait 0.1 second before retrying.
+        usleep(100_000);
+        $attempts++;
+    }
+
+    # The output result didn't change in 180 seconds. Give up
+    return 0;
+}
-- 
1.8.3.1

