From 996e9dbc78f61729cfb1295363620cbc95e974cc Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 22 Sep 2020 06:17:43 -0400
Subject: [PATCH v5] Support decoding of two-phase transactions

Until now two-phase transactions were decoded at COMMIT, just like
regular transaction. During replay, two-phase transactions were
translated into regular transactions on the subscriber, and the GID
was not forwarded to it.

This patch allows PREPARE-time decoding two-phase transactions (if
the output plugin supports this capability), in which case the
transactions are replayed at PREPARE and then committed later when
COMMIT PREPARED arrives.

Includes documentation changes.
---
 contrib/test_decoding/expected/prepared.out     | 187 ++++++++++++--
 contrib/test_decoding/sql/prepared.sql          |  79 +++++-
 contrib/test_decoding/test_decoding.c           | 166 +++++++++++++
 doc/src/sgml/logicaldecoding.sgml               | 110 ++++++++-
 src/backend/replication/logical/decode.c        | 129 +++++++++-
 src/backend/replication/logical/logical.c       | 175 ++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 309 +++++++++++++++++++++---
 src/include/replication/logical.h               |   5 +
 src/include/replication/output_plugin.h         |  37 +++
 src/include/replication/reorderbuffer.h         |  75 +++++-
 10 files changed, 1185 insertions(+), 87 deletions(-)

diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d..fd0e8a4 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -6,19 +6,50 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
 COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (2);
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
 ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                data                 
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (4);
 -- test prepared xact containing ddl
 BEGIN;
@@ -26,45 +57,149 @@ INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
-INSERT INTO test_prepared2 VALUES (7);
-COMMIT PREPARED 'test_prepared#3';
--- make sure stuff still works
-INSERT INTO test_prepared1 VALUES (8);
-INSERT INTO test_prepared2 VALUES (9);
--- cleanup
-DROP TABLE test_prepared1;
-DROP TABLE test_prepared2;
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation     | locktype |        mode         
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
                                   data                                   
 -------------------------------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- COMMIT
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:2
- COMMIT
- BEGIN
  table public.test_prepared1: INSERT: id[integer]:4
  COMMIT
  BEGIN
- table public.test_prepared2: INSERT: id[integer]:7
- COMMIT
- BEGIN
  table public.test_prepared1: INSERT: id[integer]:5
  table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(7 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#3'
+(1 row)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (8);
+INSERT INTO test_prepared2 VALUES (9);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                
+--------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
  COMMIT
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:9
  COMMIT
-(22 rows)
+(6 rows)
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+ relation | locktype | mode 
+----------+----------+------
+(0 rows)
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ PREPARE TRANSACTION 'test_prepared_lock2'
+ COMMIT PREPARED 'test_prepared_lock2'
+(8 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- will work normally after we commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                 data                 
+--------------------------------------
+ COMMIT PREPARED 'test_prepared_lock'
+(1 row)
+
+-- test savepoints
+BEGIN;
+SAVEPOINT test_savepoint;
+CREATE TABLE test_prepared_savepoint (a int);
+PREPARE TRANSACTION 'test_prepared_savepoint';
+COMMIT PREPARED 'test_prepared_savepoint';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                   data                    
+-------------------------------------------
+ COMMIT PREPARED 'test_prepared_savepoint'
+(1 row)
+
+-- test that a GID containing "nodecode" gets decoded at commit prepared time
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                 
+---------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:20 data[text]:null
+ COMMIT
+(3 rows)
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
 
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e726397..162fe43 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -2,21 +2,25 @@
 SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
 
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 INSERT INTO test_prepared1 VALUES (2);
 
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 
 INSERT INTO test_prepared1 VALUES (4);
 
@@ -27,24 +31,83 @@ ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
 
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
 INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 
 COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (8);
 INSERT INTO test_prepared2 VALUES (9);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+RESET statement_timeout;
+
+COMMIT PREPARED 'test_prepared_lock';
+
+-- will work normally after we commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test savepoints
+BEGIN;
+SAVEPOINT test_savepoint;
+CREATE TABLE test_prepared_savepoint (a int);
+PREPARE TRANSACTION 'test_prepared_savepoint';
+COMMIT PREPARED 'test_prepared_savepoint';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test that a GID containing "nodecode" gets decoded at commit prepared time
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- cleanup
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
 
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e60ab34..1bb17a6 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -11,12 +11,16 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include "miscadmin.h"
 
+#include "access/transam.h"
 #include "catalog/pg_type.h"
 
 #include "replication/logical.h"
 #include "replication/origin.h"
 
+#include "storage/procarray.h"
+
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -36,6 +40,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	TransactionId	check_xid; /* track abort of this txid */
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -88,6 +93,19 @@ static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn,
 									  int nrelations, Relation relations[],
 									  ReorderBufferChange *change);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 ReorderBufferTXN *txn,
+									 TransactionId xid, const char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+								  ReorderBufferTXN *txn,
+								  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+										  ReorderBufferTXN *txn,
+										  XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr abort_lsn);
+
 
 void
 _PG_init(void)
@@ -116,6 +134,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_change_cb = pg_decode_stream_change;
 	cb->stream_message_cb = pg_decode_stream_message;
 	cb->stream_truncate_cb = pg_decode_stream_truncate;
+	cb->filter_prepare_cb = pg_decode_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
+
 }
 
 
@@ -127,6 +150,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	ListCell   *option;
 	TestDecodingData *data;
 	bool		enable_streaming = false;
+	bool 		enable_2pc = false;
 
 	data = palloc0(sizeof(TestDecodingData));
 	data->context = AllocSetContextCreate(ctx->context,
@@ -136,6 +160,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->check_xid = InvalidTransactionId;
 
 	ctx->output_plugin_private = data;
 
@@ -227,6 +252,42 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "two-phase-commit") == 0)
+		{
+			if (elem->arg == NULL)
+				continue;
+			else if (!parse_bool(strVal(elem->arg), &enable_2pc))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
+		else if (strcmp(elem->defname, "check-xid") == 0)
+		{
+			if (elem->arg)
+			{
+				errno = 0;
+				data->check_xid = (TransactionId)
+					strtoul(strVal(elem->arg), NULL, 0);
+
+				if (errno == EINVAL || errno == ERANGE)
+					ereport(FATAL,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("check-xid is not a valid number: \"%s\"",
+								strVal(elem->arg))));
+			}
+			else
+				ereport(FATAL,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("check-xid needs an input value")));
+
+			if (data->check_xid <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("Specify positive value for parameter \"%s\","
+								" you specified \"%s\"",
+								elem->defname, strVal(elem->arg))));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -238,6 +299,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	}
 
 	ctx->streaming &= enable_streaming;
+	ctx->enable_twophase &= enable_2pc;
 }
 
 /* cleanup this plugin's resources */
@@ -297,6 +359,94 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here
+ * we demonstrate a simple logic by checking the GID. If the
+ * GID contains the "_nodecode" substring, then we filter
+ * it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						 TransactionId xid, const char *gid)
+{
+	if (strstr(gid, "_nodecode") != NULL)
+		return true;
+
+	return false;
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					  XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							  XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							 XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
 static bool
 pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id)
@@ -455,6 +605,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	}
 	data->xact_wrote_changes = true;
 
+	/* if check_xid is specified */
+	if (TransactionIdIsValid(data->check_xid))
+	{
+		elog(LOG, "waiting for %u to abort", data->check_xid);
+		while (TransactionIdIsInProgress(data->check_xid))
+		{
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(10000L);
+		}
+		if (!TransactionIdIsInProgress(data->check_xid) &&
+			   !TransactionIdDidCommit(data->check_xid))
+			elog(LOG, "%u aborted", data->check_xid);
+
+		Assert(TransactionIdDidAbort(data->check_xid));
+	}
+
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
 
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 813a037..bd4542e 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -387,6 +387,10 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeTruncateCB truncate_cb;
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
+    LogicalDecodeFilterPrepareCB filter_prepare_cb;
+    LogicalDecodePrepareCB prepare_cb;
+    LogicalDecodeCommitPreparedCB commit_prepared_cb;
+    LogicalDecodeAbortPreparedCB abort_prepared_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeStreamStartCB stream_start_cb;
@@ -477,7 +481,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      never get
      decoded. Successful savepoints are
      folded into the transaction containing them in the order they were
-     executed within that transaction.
+     executed within that transaction. A transaction that is prepared for
+     a two-phase commit using <command>PREPARE TRANSACTION</command> will
+     also be decoded if the output plugin callbacks needed for decoding
+     them are provided. It is possible that the current transaction which
+     is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
+     command. In that case, the logical decoding of this transaction will
+     be aborted too.
     </para>
 
     <note>
@@ -578,6 +588,55 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-prepare">
+     <title>Transaction Prepare Callback</title>
+
+     <para>
+      The optional <function>prepare_cb</function> callback is called whenever
+      a transaction which is prepared for two-phase commit has been
+      decoded. The <function>change_cb</function> callbacks for all modified
+      rows will have been called before this, if there have been any modified
+      rows.
+<programlisting>
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn,
+                                        XLogRecPtr prepare_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+     <title>Commit Prepared Transaction Callback</title>
+
+     <para>
+      The optional <function>commit_prepared_cb</function> callback is called whenever
+      a commit prepared transaction has been decoded. The <parameter>gid</parameter> field,
+      which is part of the <parameter>txn</parameter> parameter can be used in this
+      callback.
+<programlisting>
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn,
+                                               XLogRecPtr commit_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-abort-prepared">
+     <title>Rollback Prepared Transaction Callback</title>
+
+     <para>
+      The optional <function>abort_prepared_cb</function> callback is called whenever
+      a rollback prepared transaction has been decoded. The <parameter>gid</parameter> field,
+      which is part of the <parameter>txn</parameter> parameter can be used in this
+      callback.
+<programlisting>
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr abort_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-change">
      <title>Change Callback</title>
 
@@ -587,7 +646,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
       an <command>INSERT</command>, <command>UPDATE</command>,
       or <command>DELETE</command>. Even if the original command modified
       several rows at once the callback will be called individually for each
-      row.
+      row. The <function>change_cb</function> callback may access system or
+      user catalog tables to aid in the process of outputting the row
+      modification details. In case of decoding a prepared (but yet
+      uncommitted) transaction or decoding of an uncommitted transaction, this
+      change callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
 <programlisting>
 typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
@@ -664,6 +729,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
      </para>
      </sect3>
 
+     <sect3 id="logicaldecoding-output-plugin-filter-prepare">
+     <title>Prepare Filter Callback</title>
+
+     <para>
+       The optional <function>filter_prepare_cb</function> callback
+       is called to determine whether data that is part of the current
+       two-phase commit transaction should be considered for decode
+       at this prepare stage or as a regular one-phase transaction at
+       <command>COMMIT PREPARED</command> time later. To signal that
+       decoding should be skipped, return <literal>true</literal>;
+       <literal>false</literal> otherwise. When the callback is not
+       defined, <literal>false</literal> is assumed (i.e. nothing is
+       filtered).
+<programlisting>
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              TransactionId xid,
+                                              const char *gid);
+</programlisting>
+      The <parameter>ctx</parameter> parameter has the same contents
+      as for the other callbacks. The <parameter>txn</parameter> parameter
+      contains meta information about the transaction. The <parameter>xid</parameter>
+      contains the XID because <parameter>txn</parameter> can be NULL in some cases.
+      The <parameter>gid</parameter> is the identifier that later identifies this
+      transaction for <command>COMMIT PREPARED</command> or <command>ROLLBACK PREPARED</command>.
+     </para>
+     <para>
+      The callback has to provide the same static answer for a given combination of
+      <parameter>xid</parameter> and <parameter>gid</parameter> every time it is
+      called.
+     </para>
+     </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-message">
      <title>Generic Message Callback</title>
 
@@ -685,7 +783,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
       non-transactional and the XID was not assigned yet in the transaction
       which logged the message. The <parameter>lsn</parameter> has WAL
       location of the message. The <parameter>transactional</parameter> says
-      if the message was sent as transactional or not.
+      if the message was sent as transactional or not. Similar to the change
+      callback, in case of decoding a prepared (but yet uncommitted)
+      transaction or decoding of an uncommitted transaction, this message 
+      callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
+
       The <parameter>prefix</parameter> is arbitrary null-terminated prefix
       which can be used for identifying interesting messages for the current
       plugin. And finally the <parameter>message</parameter> parameter holds
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index f21f61d..b37b62d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+						xl_xact_parsed_prepare * parsed);
+
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			}
 			break;
 		case XLOG_XACT_PREPARE:
+			{
+				xl_xact_parsed_prepare parsed;
+				xl_xact_prepare *xlrec;
+				/* check that output plugin is capable of twophase decoding */
+				if (!ctx->enable_twophase)
+				{
+					ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+					break;
+				}
 
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
-			break;
+				/* ok, parse it */
+				xlrec = (xl_xact_prepare *)XLogRecGetData(r);
+				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+									xlrec, &parsed);
+
+				/* does output plugin want this particular transaction? */
+				if (ctx->callbacks.filter_prepare_cb &&
+					ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
+												parsed.twophase_gid))
+				{
+					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+											buf->origptr);
+					break;
+				}
+
+				DecodePrepare(ctx, buf, &parsed);
+				break;
+			}
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
@@ -647,9 +667,69 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	/*
+	 * Decide if we're processing COMMIT PREPARED, or a regular COMMIT.
+	 * Regular commit simply triggers a replay of transaction changes from the
+	 * reorder buffer. For COMMIT PREPARED that however already happened at
+	 * PREPARE time, and so we only need to notify the subscriber that the GID
+	 * finally committed.
+	 *
+	 * For output plugins that do not support PREPARE-time decoding of
+	 * two-phase transactions, we never even see the PREPARE and all two-phase
+	 * transactions simply fall through to the second branch.
+	 */
+	if (TransactionIdIsValid(parsed->twophase_xid) &&
+		ReorderBufferTxnIsPrepared(ctx->reorder,
+								   parsed->twophase_xid, parsed->twophase_gid))
+	{
+		Assert(xid == parsed->twophase_xid);
+		/* we are processing COMMIT PREPARED */
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									commit_time, origin_id, origin_lsn,
+									parsed->twophase_gid, true);
+	}
+	else
+	{
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+/*
+ * Decode PREPARE record. Similar logic as in COMMIT
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			  xl_xact_parsed_prepare * parsed)
+{
+	XLogRecPtr	origin_lsn = parsed->origin_lsn;
+	TimestampTz commit_time = parsed->origin_timestamp;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	int			i;
+	TransactionId xid = parsed->twophase_xid;
+
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		ctx->fast_forward || FilterByOrigin(ctx, origin_id))
+		return;
+
+	/*
+	 * Tell the reorderbuffer about the surviving subtransactions. We need to
+	 * do this because the main transaction itself has not committed since we
+	 * are in the prepare phase right now. So we need to be sure the snapshot
+	 * is setup correctly for the main transaction in case all changes
+	 * happened in subtransanctions
+	 */
+	for (i = 0; i < parsed->nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+								 buf->origptr, buf->endptr);
+	}
+
 	/* replay actions of all transaction + subtransactions in order */
-	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time, origin_id, origin_lsn);
+	ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
+						 commit_time, origin_id, origin_lsn, parsed->twophase_gid);
 }
 
 /*
@@ -661,6 +741,31 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
+	}
+
+	/*
+	 * If it's ROLLBACK PREPARED then handle it via callbacks.
+	 */
+	if (TransactionIdIsValid(xid) &&
+		!SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
+		parsed->dbId == ctx->slot->data.database &&
+		!FilterByOrigin(ctx, origin_id) &&
+		ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid))
+	{
+
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									commit_time, origin_id, origin_lsn,
+									parsed->twophase_gid, false);
+		return;
+	}
 
 	for (i = 0; i < parsed->nsubxacts; i++)
 	{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0f6af95..4e95337 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -58,6 +58,14 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							  XLogRecPtr commit_lsn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									  TransactionId xid, const char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									   XLogRecPtr commit_lsn);
+static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									  XLogRecPtr abort_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							  Relation relation, ReorderBufferChange *change);
 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -206,6 +214,10 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+	ctx->reorder->prepare = prepare_cb_wrapper;
+	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+	ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
 	/*
@@ -225,6 +237,19 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_message_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
+ 	/*
+	 * To support two phase logical decoding, we require prepare/commit-prepare/abort-prepare
+	 * callbacks. The filter-prepare callback is optional. We however enable two phase logical
+	 * decoding when at least one of the methods is enabled so that we can easily identify
+	 * missing methods.
+	 *
+	 * We decide it here, but only check it later in the wrappers.
+	 */
+	ctx->enable_twophase = (ctx->callbacks.prepare_cb != NULL) ||
+		(ctx->callbacks.commit_prepared_cb != NULL) ||
+		(ctx->callbacks.abort_prepared_cb != NULL) ||
+		(ctx->callbacks.filter_prepare_cb != NULL);
+
 	/*
 	 * streaming callbacks
 	 *
@@ -782,6 +807,111 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				   XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "prepare";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* If the plugin support 2 phase commits then prepare callback is mandatory */
+	if (ctx->enable_twophase && ctx->callbacks.prepare_cb == NULL)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Output plugin did not register prepare_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   XLogRecPtr commit_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "commit_prepared";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* If the plugin support 2 phase commits then commit prepared callback is mandatory */
+	if (ctx->enable_twophase && ctx->callbacks.commit_prepared_cb == NULL)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Output plugin did not register commit_prepared_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort_prepared";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* If the plugin support 2 phase commits then abort prepared callback is mandatory */
+	if (ctx->enable_twophase && ctx->callbacks.abort_prepared_cb == NULL)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Output plugin did not register abort_prepared_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
 {
@@ -858,6 +988,51 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  TransactionId xid, const char *gid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/*
+	 * Skip if decoding of twophase at PREPARE time is not enabled. In that
+	 * case all twophase transactions are considered filtered out and will be
+	 * applied as regular transactions at COMMIT PREPARED.
+	 */
+	if (!ctx->enable_twophase)
+		return true;
+
+	/*
+	 * The filter_prepare callback is optional. When not supplied, all
+	 * prepared transactions should go through.
+	 */
+	if (!ctx->callbacks.filter_prepare_cb)
+		return false;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_prepare";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 1975d62..d96be77 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -251,7 +251,8 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
-static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+									bool txn_prepared);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
 										TransactionId xid, XLogSegNo segno);
@@ -413,6 +414,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	}
 
 	/* free data that's contained */
+	if (txn->gid != NULL)
+	{
+		pfree(txn->gid);
+		txn->gid = NULL;
+	}
 
 	if (txn->tuplecid_hash != NULL)
 	{
@@ -1401,6 +1407,59 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 }
 
 /*
+ * Cleanup the leftover contents of a transaction, usually after the transaction
+ * has been COMMIT PREPARED or ROLLBACK PREPARED. This does the rest of the cleanup
+ * that was not done when the transaction was PREPARED
+ */
+static void
+ReorderBufferCleanupPreparedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	bool	found;
+
+	/*
+	 * Cleanup the base snapshot, if set.
+	 */
+	if (txn->base_snapshot != NULL)
+	{
+		SnapBuildSnapDecRefcount(txn->base_snapshot);
+		dlist_delete(&txn->base_snapshot_node);
+	}
+
+	/*
+	 * Cleanup the snapshot for the last streamed run.
+	 */
+	if (txn->snapshot_now != NULL)
+	{
+		Assert(rbtxn_is_streamed(txn));
+		ReorderBufferFreeSnap(rb, txn->snapshot_now);
+	}
+
+	/*
+	 * Remove TXN from its containing list.
+	 *
+	 * Note: if txn is known as subxact, we are deleting the TXN from its
+	 * parent's list of known subxacts; this leaves the parent's nsubxacts
+	 * count too high, but we don't care.  Otherwise, we are deleting the TXN
+	 * from the LSN-ordered list of toplevel TXNs.
+	 */
+	dlist_delete(&txn->node);
+
+	/* now remove reference from buffer */
+	hash_search(rb->by_txn,
+				(void *) &txn->xid,
+				HASH_REMOVE,
+				&found);
+	Assert(found);
+
+	/* remove entries spilled to disk */
+	if (rbtxn_is_serialized(txn))
+		ReorderBufferRestoreCleanup(rb, txn);
+
+	/* deallocate */
+	ReorderBufferReturnTXN(rb, txn);
+}
+
+/*
  * Cleanup the contents of a transaction, usually after the transaction
  * committed or aborted.
  */
@@ -1502,12 +1561,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 }
 
 /*
- * Discard changes from a transaction (and subtransactions), after streaming
- * them.  Keep the remaining info - transactions, tuplecids, invalidations and
- * snapshots.
+ * Discard changes from a transaction (and subtransactions), either after streaming or 
+ * after a PREPARE.
+ * The flag txn_prepared indicates if this is called after a PREPARE.
+ * If streaming, keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.If after a PREPARE, keep only the invalidations and snapshots.
  */
 static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 {
 	dlist_mutable_iter iter;
 
@@ -1526,7 +1587,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		Assert(rbtxn_is_known_subxact(subtxn));
 		Assert(subtxn->nsubtxns == 0);
 
-		ReorderBufferTruncateTXN(rb, subtxn);
+		ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
 	}
 
 	/* cleanup changes in the toplevel txn */
@@ -1560,9 +1621,30 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	 * about the toplevel xact (we send the XID in all messages), but we never
 	 * stream XIDs of empty subxacts.
 	 */
-	if ((!txn->toptxn) || (txn->nentries_mem != 0))
+	if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
 		txn->txn_flags |= RBTXN_IS_STREAMED;
 
+	if (txn_prepared)
+	{
+		/*
+		 * If this is a prepared txn, cleanup the tuplecids we stored for decoding
+		 * catalog snapshot access.
+	 	 * They are always stored in the toplevel transaction.
+	 	 */
+		dlist_foreach_modify(iter, &txn->tuplecids)
+		{
+			ReorderBufferChange *change;
+
+			change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+			/* Check we're not mixing changes from different transactions. */
+			Assert(change->txn == txn);
+			Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
+
+			ReorderBufferReturnChange(rb, change, true);
+		}
+	}
+
 	/*
 	 * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
 	 * memory. We could also keep the hash table and update it with new ctid
@@ -1880,7 +1962,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					  ReorderBufferChange *specinsert)
 {
 	/* Discard the changes that we just streamed */
-	ReorderBufferTruncateTXN(rb, txn);
+	ReorderBufferTruncateTXN(rb, txn, false);
 
 	/* Free all resources allocated for toast reconstruction */
 	ReorderBufferToastReset(rb, txn);
@@ -1987,7 +2069,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			prev_lsn = change->lsn;
 
 			/* Set the current xid to detect concurrent aborts. */
-			if (streaming)
+			if (streaming || rbtxn_prepared(change->txn))
 			{
 				curtxn = change->txn;
 				SetupCheckXidLive(curtxn->xid);
@@ -2249,7 +2331,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					break;
 			}
 		}
-
 		/*
 		 * There's a speculative insertion remaining, just clean in up, it
 		 * can't have been successful, otherwise we'd gotten a confirmation
@@ -2278,7 +2359,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			}
 		}
 		else
-			rb->commit(rb, txn, commit_lsn);
+		{
+			/*
+			 * Call either PREPARE (for twophase transactions) or COMMIT
+			 * (for regular ones).
+			 */
+			if (rbtxn_prepared(txn))
+				rb->prepare(rb, txn, commit_lsn);
+			else
+				rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -2319,11 +2409,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 */
 		if (streaming)
 		{
-			ReorderBufferTruncateTXN(rb, txn);
+			ReorderBufferTruncateTXN(rb, txn, false);
 
 			/* Reset the CheckXidAlive */
 			CheckXidAlive = InvalidTransactionId;
 		}
+		else if (rbtxn_prepared(txn))
+		{
+			ReorderBufferTruncateTXN(rb, txn, true);
+			/* Reset the CheckXidAlive */
+			CheckXidAlive = InvalidTransactionId;
+		}
 		else
 			ReorderBufferCleanupTXN(rb, txn);
 	}
@@ -2352,17 +2448,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 		/*
 		 * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
-		 * abort of the (sub)transaction we are streaming. We need to do the
+		 * abort of the (sub)transaction we are streaming or preparing. We need to do the
 		 * cleanup and return gracefully on this error, see SetupCheckXidLive.
 		 */
 		if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
 		{
 			/*
-			 * This error can only occur when we are sending the data in
-			 * streaming mode and the streaming is not finished yet.
+			 * This error can only occur either when we are sending the data in
+			 * streaming mode and the streaming is not finished yet or when we are
+			 * sending the data out on a PREPARE during a twoi phase commit.
 			 */
-			Assert(streaming);
-			Assert(stream_started);
+			Assert(streaming || rbtxn_prepared(txn));
+			Assert(stream_started  || rbtxn_prepared(txn));
 
 			/* Cleanup the temporary error state. */
 			FlushErrorState();
@@ -2370,10 +2467,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			errdata = NULL;
 			curtxn->concurrent_abort = true;
 
-			/* Reset the TXN so that it is allowed to stream remaining data. */
-			ReorderBufferResetTXN(rb, txn, snapshot_now,
-								  command_id, prev_lsn,
-								  specinsert);
+			/* If streaming, reset the TXN so that it is allowed to stream remaining data. */
+			if (streaming)
+			{
+				ReorderBufferResetTXN(rb, txn, snapshot_now,
+									  command_id, prev_lsn,
+									  specinsert);
+			}
+			else
+			{
+				elog(LOG, "stopping decoding of %s (%u)",
+						txn->gid[0] != '\0'? txn->gid:"", txn->xid);
+				ReorderBufferTruncateTXN(rb, txn, true);
+			}
 		}
 		else
 		{
@@ -2395,23 +2501,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
  * This interface is called once a toplevel commit is read for both streamed
  * as well as non-streamed transactions.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
-					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time,
-					RepOriginId origin_id, XLogRecPtr origin_lsn)
+static void
+ReorderBufferCommitInternal(ReorderBufferTXN *txn,
+                            ReorderBuffer *rb, TransactionId xid,
+					        XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					        TimestampTz commit_time,
+					        RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	ReorderBufferTXN *txn;
 	Snapshot	snapshot_now;
 	CommandId	command_id = FirstCommandId;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-
-	/* unknown transaction, nothing to replay */
-	if (txn == NULL)
-		return;
-
 	txn->final_lsn = commit_lsn;
 	txn->end_lsn = end_lsn;
 	txn->commit_time = commit_time;
@@ -2453,6 +2552,140 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as a regular commit later.
+ */
+bool
+ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid)
+{
+   ReorderBufferTXN *txn;
+
+   txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+   return rb->filter_prepare(rb, txn, xid, gid);
+}
+
+
+/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferCommitInternal()
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+								commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Prepare a twophase transaction. It calls ReorderBufferCommitInternal()
+ * since all prepared transactions need to be decoded at PREPARE time.
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					 TimestampTz commit_time,
+					 RepOriginId origin_id, XLogRecPtr origin_lsn,
+					 char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	txn->txn_flags |= RBTXN_PREPARE;
+	txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */
+	strcpy(txn->gid, gid);
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+								commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Check whether this transaction was sent as prepared to subscribers.
+ * Called while handling commit|abort prepared.
+ */
+bool
+ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+						   const char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                               false);
+
+	/*
+	* Always call the prepare filter. It's the job of the prepare filter to
+	* give us the *same* response for a given xid across multiple calls
+	* (including ones on restart)
+	*/
+	return !(rb->filter_prepare(rb, txn, xid, gid));
+}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+							TimestampTz commit_time,
+							RepOriginId origin_id, XLogRecPtr origin_lsn,
+							char *gid, bool is_commit)
+{
+	ReorderBufferTXN *txn;
+
+	/*
+	* The transaction may or may not exist (during restarts for example).
+	* Anyways, 2PC transactions do not contain any reorderbuffers. So allow
+	* it to be created below.
+	*/
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+								true);
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+	/* this txn is obviously prepared */
+	txn->txn_flags |= RBTXN_PREPARE;
+	txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */
+	strcpy(txn->gid, gid);
+
+	if (is_commit)
+	{
+		txn->txn_flags |= RBTXN_COMMIT_PREPARED;
+		rb->commit_prepared(rb, txn, commit_lsn);
+	}
+	else
+	{
+		txn->txn_flags |= RBTXN_ROLLBACK_PREPARED;
+		rb->abort_prepared(rb, txn, commit_lsn);
+	}
+
+	/* cleanup: make sure there's no cache pollution */
+	ReorderBufferExecuteInvalidations(rb, txn);
+	ReorderBufferCleanupPreparedTXN(rb, txn);
+}
+
+/*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
  *
@@ -2495,7 +2728,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	/* cosmetic... */
 	txn->final_lsn = lsn;
 
-	/* remove potential on-disk data, and deallocate */
+    /*
+     * remove potential on-disk data, and deallocate.
+     *
+     * We remove it even for prepared transactions (GID is enough to
+     * commit/abort those later).
+     */
+
 	ReorderBufferCleanupTXN(rb, txn);
 }
 
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 45abc44..ee63e7b 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -84,6 +84,11 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		streaming;
 
+ 	/*
+	 * Does the output plugin support two phase decoding, and is it enabled?
+	 */
+	bool		enable_twophase;
+
 	/*
 	 * State for writing output.
 	 */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index b78c796..96e269b 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -77,6 +77,39 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+ /*
+  * Called before decoding of PREPARE record to decide whether this
+  * transaction should be decoded with separate calls to prepare and
+  * commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED and
+  * sent as usual transaction.
+  */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  TransactionId xid,
+											  const char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+										ReorderBufferTXN *txn,
+										XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr abort_lsn);
+
 /*
  * Called for the generic logical decoding messages.
  */
@@ -171,6 +204,10 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeFilterPrepareCB filter_prepare_cb;
+	LogicalDecodePrepareCB prepare_cb;
+	LogicalDecodeCommitPreparedCB commit_prepared_cb;
+	LogicalDecodeAbortPreparedCB abort_prepared_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 	/* streaming of changes */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1ae17d5..4d4e35d 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -162,9 +163,13 @@ typedef struct ReorderBufferChange
 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
 #define RBTXN_IS_SUBXACT          0x0002
 #define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_STREAMED         0x0008
-#define RBTXN_HAS_TOAST_INSERT    0x0010
-#define RBTXN_HAS_SPEC_INSERT     0x0020
+#define RBTXN_PREPARE             0x0008
+#define RBTXN_COMMIT_PREPARED     0x0010
+#define RBTXN_ROLLBACK_PREPARED   0x0020
+#define RBTXN_COMMIT              0x0040
+#define RBTXN_IS_STREAMED         0x0080
+#define RBTXN_HAS_TOAST_INSERT    0x0100
+#define RBTXN_HAS_SPEC_INSERT     0x0200
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -218,6 +223,15 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
 )
 
+/* is this txn prepared? */
+#define rbtxn_prepared(txn)            (txn->txn_flags & RBTXN_PREPARE)
+/* was this prepared txn committed in the meanwhile? */
+#define rbtxn_commit_prepared(txn)     (txn->txn_flags & RBTXN_COMMIT_PREPARED)
+/* was this prepared txn aborted in the meanwhile? */
+#define rbtxn_rollback_prepared(txn)   (txn->txn_flags & RBTXN_ROLLBACK_PREPARED)
+/* was this txn committed in the meanwhile? */
+#define rbtxn_commit(txn)              (txn->txn_flags & RBTXN_COMMIT)
+
 typedef struct ReorderBufferTXN
 {
 	/* See above */
@@ -229,6 +243,9 @@ typedef struct ReorderBufferTXN
 	/* Xid of top-level transaction, if known */
 	TransactionId toplevel_xid;
 
+	/* In case of 2PC we need to pass GID to output plugin */
+	char         *gid;
+
 	/*
 	 * LSN of the first data carrying, WAL record with knowledge about this
 	 * xid. This is allowed to *not* be first record adorned with this xid, if
@@ -390,6 +407,39 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+/* abort callback signature */
+typedef void (*ReorderBufferAbortCB) (
+                                     ReorderBuffer *rb,
+                                     ReorderBufferTXN *txn,
+                                     XLogRecPtr abort_lsn);
+
+typedef bool (*ReorderBufferFilterPrepareCB) (
+                                             ReorderBuffer *rb,
+                                             ReorderBufferTXN *txn,
+                                             TransactionId xid,
+                                             const char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (
+                                       ReorderBuffer *rb,
+                                       ReorderBufferTXN *txn,
+                                       XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (
+                                              ReorderBuffer *rb,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr commit_lsn);
+
+/* abort prepared callback signature */
+typedef void (*ReorderBufferAbortPreparedCB) (
+                                             ReorderBuffer *rb,
+                                             ReorderBufferTXN *txn,
+                                             XLogRecPtr abort_lsn);
+
+
+
+
 /* message callback signature */
 typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										ReorderBufferTXN *txn,
@@ -482,6 +532,11 @@ struct ReorderBuffer
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
+	ReorderBufferAbortCB abort;
+	ReorderBufferFilterPrepareCB filter_prepare;
+	ReorderBufferPrepareCB prepare;
+	ReorderBufferCommitPreparedCB commit_prepared;
+	ReorderBufferAbortPreparedCB abort_prepared;
 	ReorderBufferMessageCB message;
 
 	/*
@@ -548,6 +603,11 @@ void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void		ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+                           XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+                           TimestampTz commit_time,
+                           RepOriginId origin_id, XLogRecPtr origin_lsn,
+                           char *gid, bool is_commit);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void		ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 									 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
@@ -571,6 +631,15 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid,
+							 const char *gid);
+bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+						   const char *gid);
+void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					 TimestampTz commit_time,
+					 RepOriginId origin_id, XLogRecPtr origin_lsn,
+					 char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
-- 
1.8.3.1

