From 80fc576bda483798919653991bef6dc198625d90 Mon Sep 17 00:00:00 2001
From: Nikhil Sontakke <nikhils@2ndQuadrant.com>
Date: Wed, 13 Jun 2018 16:31:15 +0530
Subject: [PATCH 4/5] Teach test_decoding plugin to work with 2PC

Includes a new option "enable_twophase". Depending on this options
value, PREPARE TRANSACTION will either be decoded or treated as
a single phase commit later.
---
 contrib/test_decoding/expected/prepared.out | 257 +++++++++++++++++++++++++---
 contrib/test_decoding/sql/prepared.sql      |  84 ++++++++-
 contrib/test_decoding/test_decoding.c       | 137 +++++++++++++++
 3 files changed, 451 insertions(+), 27 deletions(-)

diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d4ff..5df7b7ff20 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -6,19 +6,82 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
 COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ COMMIT
+(3 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (2);
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+(3 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
 ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+                data                 
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (4);
 -- test prepared xact containing ddl
 BEGIN;
@@ -26,45 +89,193 @@ INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
-INSERT INTO test_prepared2 VALUES (7);
-COMMIT PREPARED 'test_prepared#3';
--- make sure stuff still works
-INSERT INTO test_prepared1 VALUES (8);
-INSERT INTO test_prepared2 VALUES (9);
--- cleanup
-DROP TABLE test_prepared1;
-DROP TABLE test_prepared2;
--- show results
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation     | locktype |        mode         
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4
+ COMMIT
+(3 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
                                   data                                   
 -------------------------------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:4
  COMMIT
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:2
- COMMIT
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(7 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:4
+ table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+                        data                        
+----------------------------------------------------
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                  data                                   
+-------------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:5
  table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
  COMMIT
+(4 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#3'
+(1 row)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (8);
+INSERT INTO test_prepared2 VALUES (9);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                
+--------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
  COMMIT
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:9
  COMMIT
-(22 rows)
+(6 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+                                data                                
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(6 rows)
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+ relation | locktype | mode 
+----------+----------+------
+(0 rows)
+
+-- Shouldn't see anything with 2pc decoding off
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                         
+-----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ COMMIT
+(3 rows)
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ PREPARE TRANSACTION 'test_prepared_lock2'
+ COMMIT PREPARED 'test_prepared_lock2'
+(8 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- Both will work normally after we commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ COMMIT
+(4 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+                 data                 
+--------------------------------------
+ COMMIT PREPARED 'test_prepared_lock'
+(1 row)
+
+-- test savepoints 
+BEGIN;
+SAVEPOINT test_savepoint;
+CREATE TABLE test_prepared_savepoint (a int);
+PREPARE TRANSACTION 'test_prepared_savepoint';
+COMMIT PREPARED 'test_prepared_savepoint';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+                   data                    
+-------------------------------------------
+ COMMIT PREPARED 'test_prepared_savepoint'
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+ data 
+------
+(0 rows)
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+ data 
+------
+(0 rows)
 
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
@@ -72,3 +283,9 @@ SELECT pg_drop_replication_slot('regression_slot');
  
 (1 row)
 
+SELECT pg_drop_replication_slot('regression_slot_2pc');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e72639767e..e8eb8ad8d6 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -1,22 +1,31 @@
 -- predictability
 SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
 
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
 COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
 INSERT INTO test_prepared1 VALUES (2);
 
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
 ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
 
 INSERT INTO test_prepared1 VALUES (4);
 
@@ -27,24 +36,85 @@ ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
 
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
 
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
 INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
 
 COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (8);
 INSERT INTO test_prepared2 VALUES (9);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+
+-- Shouldn't see anything with 2pc decoding off
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+RESET statement_timeout;
+
+COMMIT PREPARED 'test_prepared_lock';
+
+-- Both will work normally after we commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+
+-- test savepoints 
+BEGIN;
+SAVEPOINT test_savepoint;
+CREATE TABLE test_prepared_savepoint (a int);
+PREPARE TRANSACTION 'test_prepared_savepoint';
+COMMIT PREPARED 'test_prepared_savepoint';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
 
 -- cleanup
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 
--- show results
+-- show results. There should be nothing to show
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_2pc', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'enable-twophase', '1');
+
 
 SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_drop_replication_slot('regression_slot_2pc');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 1c439b57b0..140010a8b1 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -36,6 +36,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		enable_twophase;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -49,6 +50,8 @@ static void pg_output_begin(LogicalDecodingContext *ctx,
 				bool last_write);
 static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 					 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pg_decode_abort_txn(LogicalDecodingContext *ctx,
+					ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
 static void pg_decode_change(LogicalDecodingContext *ctx,
 				 ReorderBufferTXN *txn, Relation rel,
 				 ReorderBufferChange *change);
@@ -62,6 +65,18 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 				  bool transactional, const char *prefix,
 				  Size sz, const char *message);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+						 ReorderBufferTXN *txn,
+						 TransactionId xid, const char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+					  ReorderBufferTXN *txn,
+					  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn,
+							  XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+							 ReorderBufferTXN *txn,
+							 XLogRecPtr abort_lsn);
 
 void
 _PG_init(void)
@@ -80,9 +95,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pg_decode_change;
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
+	cb->abort_cb = pg_decode_abort_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->filter_prepare_cb = pg_decode_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
 }
 
 
@@ -102,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->enable_twophase = false;
 
 	ctx->output_plugin_private = data;
 
@@ -183,6 +204,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "enable-twophase") == 0)
+		{
+			if (elem->arg == NULL)
+				data->enable_twophase = true;
+			else if (!parse_bool(strVal(elem->arg), &data->enable_twophase))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -251,6 +282,112 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+/* ABORT callback */
+static void
+pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	if (data->include_xids)
+		appendStringInfo(ctx->out, "ABORT %u", txn->xid);
+	else
+		appendStringInfoString(ctx->out, "ABORT");
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* Filter out two-phase transactions, if decoding not enabled. */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						 TransactionId xid, const char *gid)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* treat all transactions as one-phase */
+	if (!data->enable_twophase)
+		return true;
+
+	return false;
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					  XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							  XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							 XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
 static bool
 pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id)
-- 
2.15.2 (Apple Git-101.1)

