From dba6debb7237e65088a14510f07d876940a67f87 Mon Sep 17 00:00:00 2001
From: Nikhil Sontakke <nikhils@2ndQuadrant.com>
Date: Wed, 13 Jun 2018 16:31:15 +0530
Subject: [PATCH 4/4] Teach test_decoding plugin to work with 2PC

Implement all callbacks required for decoding 2PC in this test_decoding
plugin. Includes relevant test cases as well.

Additionally, includes a new option "check-xid". If this option points
to a valid xid, then the pg_decode_change() API will wait for it to
be aborted externally. This allows us to test concurrent rollback of
a prepared transaction while it's being actually decoded simultaneously.
---
 contrib/test_decoding/Makefile              |   2 +
 contrib/test_decoding/expected/prepared.out | 185 ++++++++++++++++++++++++----
 contrib/test_decoding/sql/prepared.sql      |  77 ++++++++++--
 contrib/test_decoding/t/001_twophase.pl     | 119 ++++++++++++++++++
 contrib/test_decoding/test_decoding.c       | 179 +++++++++++++++++++++++++++
 5 files changed, 530 insertions(+), 32 deletions(-)
 create mode 100644 contrib/test_decoding/t/001_twophase.pl

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 4afb1d963e..6bac8a3fe5 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -12,6 +12,8 @@ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 
+TAP_TESTS = 1
+
 # Disabled because these tests require "wal_level=logical", which
 # typical installcheck users do not have (e.g. buildfarm clients).
 NO_INSTALLCHECK = 1
diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d4ff..934c8f1509 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -6,19 +6,50 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
 COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (2);
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
 ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                data                 
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (4);
 -- test prepared xact containing ddl
 BEGIN;
@@ -26,45 +57,149 @@ INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
-INSERT INTO test_prepared2 VALUES (7);
-COMMIT PREPARED 'test_prepared#3';
--- make sure stuff still works
-INSERT INTO test_prepared1 VALUES (8);
-INSERT INTO test_prepared2 VALUES (9);
--- cleanup
-DROP TABLE test_prepared1;
-DROP TABLE test_prepared2;
--- show results
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation     | locktype |        mode         
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                   data                                   
 -------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- COMMIT
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:2
- COMMIT
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:4
  COMMIT
  BEGIN
- table public.test_prepared2: INSERT: id[integer]:7
- COMMIT
- BEGIN
  table public.test_prepared1: INSERT: id[integer]:5
  table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(7 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#3'
+(1 row)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (8);
+INSERT INTO test_prepared2 VALUES (9);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                
+--------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
  COMMIT
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:9
  COMMIT
-(22 rows)
+(6 rows)
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+ relation | locktype | mode 
+----------+----------+------
+(0 rows)
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ PREPARE TRANSACTION 'test_prepared_lock2'
+ COMMIT PREPARED 'test_prepared_lock2'
+(8 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- will work normally after we commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                 data                 
+--------------------------------------
+ COMMIT PREPARED 'test_prepared_lock'
+(1 row)
+
+-- test savepoints 
+BEGIN;
+SAVEPOINT test_savepoint;
+CREATE TABLE test_prepared_savepoint (a int);
+PREPARE TRANSACTION 'test_prepared_savepoint';
+COMMIT PREPARED 'test_prepared_savepoint';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                   data                    
+-------------------------------------------
+ COMMIT PREPARED 'test_prepared_savepoint'
+(1 row)
+
+-- test that a GID containing "nodecode" gets decoded at commit prepared time
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                 
+---------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:20 data[text]:null
+ COMMIT
+(3 rows)
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
 
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e72639767e..60725419fe 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -2,21 +2,25 @@
 SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
 
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 INSERT INTO test_prepared1 VALUES (2);
 
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 INSERT INTO test_prepared1 VALUES (4);
 
@@ -27,24 +31,83 @@ ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
 
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
 INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (8);
 INSERT INTO test_prepared2 VALUES (9);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+RESET statement_timeout;
+
+COMMIT PREPARED 'test_prepared_lock';
+
+-- will work normally after we commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test savepoints 
+BEGIN;
+SAVEPOINT test_savepoint;
+CREATE TABLE test_prepared_savepoint (a int);
+PREPARE TRANSACTION 'test_prepared_savepoint';
+COMMIT PREPARED 'test_prepared_savepoint';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test that a GID containing "nodecode" gets decoded at commit prepared time
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- cleanup
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 
--- show results
+-- show results. There should be nothing to show
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/t/001_twophase.pl b/contrib/test_decoding/t/001_twophase.pl
new file mode 100644
index 0000000000..50f269bef7
--- /dev/null
+++ b/contrib/test_decoding/t/001_twophase.pl
@@ -0,0 +1,119 @@
+# logical replication of 2PC test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+use Time::HiRes qw(usleep);
+use Scalar::Util qw(looks_like_number);
+
+# Initialize node
+my $node_logical = get_new_node('logical');
+$node_logical->init(allows_streaming => 'logical');
+$node_logical->append_conf(
+        'postgresql.conf', qq(
+        max_prepared_transactions = 10
+));
+$node_logical->start;
+
+# Create some pre-existing content on logical
+$node_logical->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
+$node_logical->safe_psql('postgres',
+	"INSERT INTO tab SELECT generate_series(1,10)");
+$node_logical->safe_psql('postgres',
+	"SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');");
+
+# This test is specifically for testing concurrent abort while logical decode
+# is ongoing. We will pass in the xid of the 2PC to the plugin as an option.
+# On the receipt of a valid "check-xid", the change API in the test decoding
+# plugin will wait for it to be aborted.
+#
+# We will fire off a ROLLBACK from another session when this decode
+# is waiting.
+#
+# The status of "check-xid" will change from in-progress to not-committed
+# (hence aborted) and we will stop decoding because the subsequent
+# system catalog scan will error out.
+
+$node_logical->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab VALUES (11);
+    INSERT INTO tab VALUES (12);
+    ALTER TABLE tab ADD COLUMN b INT;
+    INSERT INTO tab VALUES (13,14);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+# get XID of the above two-phase transaction 
+my $xid2pc = $node_logical->safe_psql('postgres', "SELECT transaction FROM pg_prepared_xacts WHERE gid = 'test_prepared_tab'");
+is(looks_like_number($xid2pc), qq(1), 'Got a valid two-phase XID');
+
+# start decoding the above by passing the "check-xid"
+my $logical_connstr = $node_logical->connstr . ' dbname=postgres';
+
+# decode now, it should include an ABORT entry because of the ROLLBACK below
+system_log("psql -d \"$logical_connstr\" -c \"SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'check-xid', '$xid2pc');\" \&");
+
+# check that decode starts waiting for this $xid2pc
+poll_output_until("waiting for $xid2pc to abort")
+    or die "no wait happened for the abort";
+
+# rollback the prepared transaction
+$node_logical->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab';");
+
+# check for occurrence of the log about stopping this decoding
+poll_output_until("stopping decoding of xid $xid2pc")
+    or die "no decoding stop for the rollback";
+
+# consume any remaining changes
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');");
+
+# Check that commit prepared is decoded properly on immediate restart
+$node_logical->safe_psql('postgres', "
+    BEGIN;
+    INSERT INTO tab VALUES (11);
+    INSERT INTO tab VALUES (12);
+    ALTER TABLE tab ADD COLUMN b INT;
+    INSERT INTO tab VALUES (13, 11);
+    PREPARE TRANSACTION 'test_prepared_tab';");
+# consume changes
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');");
+$node_logical->stop('immediate');
+$node_logical->start;
+
+# commit post the restart
+$node_logical->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+$node_logical->safe_psql('postgres', "SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');");
+
+# check inserts are visible
+my $result = $node_logical->safe_psql('postgres', "SELECT count(*) FROM tab where a IN (11,12) OR b IN (11);");
+is($result, qq(3), 'Rows inserted via 2PC are visible on restart');
+
+$node_logical->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot');");
+$node_logical->stop('fast');
+
+sub poll_output_until
+{
+    my ($expected) = @_;
+
+    $expected = 'xxxxxx' unless defined($expected); # default junk value
+
+    my $max_attempts = 180 * 10;
+    my $attempts     = 0;
+
+    my $output_file = '';
+    while ($attempts < $max_attempts)
+    {
+        $output_file = slurp_file($node_logical->logfile());
+
+        if ($output_file =~ $expected)
+        {
+            return 1;
+        }
+
+        # Wait 0.1 second before retrying.
+        usleep(100_000);
+        $attempts++;
+    }
+
+    # The output result didn't change in 180 seconds. Give up
+    return 0;
+}
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e3f394f512..9687eb293b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -11,12 +11,16 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include "miscadmin.h"
 
+#include "access/transam.h"
 #include "catalog/pg_type.h"
 
 #include "replication/logical.h"
 #include "replication/origin.h"
 
+#include "storage/procarray.h"
+
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -36,6 +40,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	TransactionId	check_xid; /* track abort of this txid */
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -49,6 +54,8 @@ static void pg_output_begin(LogicalDecodingContext *ctx,
 				bool last_write);
 static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 					 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pg_decode_abort_txn(LogicalDecodingContext *ctx,
+					ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
 static void pg_decode_change(LogicalDecodingContext *ctx,
 				 ReorderBufferTXN *txn, Relation rel,
 				 ReorderBufferChange *change);
@@ -62,6 +69,18 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 				  bool transactional, const char *prefix,
 				  Size sz, const char *message);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+						 ReorderBufferTXN *txn,
+						 TransactionId xid, const char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+					  ReorderBufferTXN *txn,
+					  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn,
+							  XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+							 ReorderBufferTXN *txn,
+							 XLogRecPtr abort_lsn);
 
 void
 _PG_init(void)
@@ -80,9 +99,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pg_decode_change;
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
+	cb->abort_cb = pg_decode_abort_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->filter_prepare_cb = pg_decode_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
 }
 
 
@@ -102,11 +126,14 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->check_xid = InvalidTransactionId;
 
 	ctx->output_plugin_private = data;
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
 	opt->receive_rewrites = false;
+	/* this plugin supports decoding of 2pc */
+	opt->enable_twophase = true;
 
 	foreach(option, ctx->output_plugin_options)
 	{
@@ -183,6 +210,32 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "check-xid") == 0)
+		{
+			if (elem->arg)
+			{
+				errno = 0;
+				data->check_xid = (TransactionId)
+					strtoul(strVal(elem->arg), NULL, 0);
+
+				if (errno == EINVAL || errno == ERANGE)
+					ereport(FATAL,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("check-xid is not a valid number: \"%s\"",
+								strVal(elem->arg))));
+			}
+			else
+				ereport(FATAL,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("check-xid needs an input value")));
+
+			if (data->check_xid <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("Specify positive value for parameter \"%s\","
+								" you specified \"%s\"",
+								elem->defname, strVal(elem->arg))));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -251,6 +304,116 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+/* ABORT callback */
+static void
+pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	if (data->include_xids)
+		appendStringInfo(ctx->out, "ABORT %u", txn->xid);
+	else
+		appendStringInfoString(ctx->out, "ABORT");
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here
+ * we demonstrate a simple logic by checking the GID. If the
+ * GID contains the "_nodecode" substring, then we filter
+ * it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						 TransactionId xid, const char *gid)
+{
+	if (strstr(gid, "_nodecode") != NULL)
+		return true;
+
+	return false;
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					  XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							  XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							 XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
 static bool
 pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id)
@@ -409,6 +572,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	}
 	data->xact_wrote_changes = true;
 
+	/* if check_xid is specified */
+	if (TransactionIdIsValid(data->check_xid))
+	{
+		elog(LOG, "waiting for %u to abort", data->check_xid);
+		while (TransactionIdIsInProgress(data->check_xid))
+		{
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(10000L);
+		}
+		if (!TransactionIdIsInProgress(data->check_xid) &&
+			   !TransactionIdDidCommit(data->check_xid))
+			elog(LOG, "%u aborted", data->check_xid);
+
+		Assert(TransactionIdDidAbort(data->check_xid));
+	}
+
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
 
-- 
2.15.2 (Apple Git-101.1)

