diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d4ff..56c6e7287f 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -6,19 +6,84 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+-- Reused queries
+\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
+\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
+\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
+:get_no2pc
+ data 
+------
+(0 rows)
+
 COMMIT PREPARED 'test_prepared#1';
+:get_with2pc
+ data 
+------
+(0 rows)
+
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ COMMIT
+(3 rows)
+
 INSERT INTO test_prepared1 VALUES (2);
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+(3 rows)
+
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
 ROLLBACK PREPARED 'test_prepared#2';
+:get_no2pc
+ data 
+------
+(0 rows)
+
+:get_with2pc
+ data 
+------
+(0 rows)
+
 INSERT INTO test_prepared1 VALUES (4);
 -- test prepared xact containing ddl
 BEGIN;
@@ -26,49 +91,169 @@ INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
-INSERT INTO test_prepared2 VALUES (7);
-COMMIT PREPARED 'test_prepared#3';
--- make sure stuff still works
-INSERT INTO test_prepared1 VALUES (8);
-INSERT INTO test_prepared2 VALUES (9);
--- cleanup
-DROP TABLE test_prepared1;
-DROP TABLE test_prepared2;
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-                                  data                                   
--------------------------------------------------------------------------
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation     | locktype |        mode         
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
+:get_no2pc
+                        data                        
+----------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:4
  COMMIT
+(3 rows)
+
+:get_with2pc
+                        data                        
+----------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:2
+ table public.test_prepared1: INSERT: id[integer]:4
  COMMIT
+(3 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists. Our 2pc filter callback will skip decoding of xacts
+-- with catalog changes at PREPARE time, so we don't decode it now.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
+INSERT INTO test_prepared2 VALUES (7);
+:get_with2pc
+                        data                        
+----------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:4
+ table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+:get_no2pc
+                        data                        
+----------------------------------------------------
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+:get_no2pc
+                                  data                                   
+-------------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:5
  table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
  COMMIT
+(4 rows)
+
+:get_with2pc
+                                  data                                   
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3';
+ COMMIT PREPARED 'test_prepared#3';
+(5 rows)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (8);
+INSERT INTO test_prepared2 VALUES (9);
+:get_with2pc
+                                data                                
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(6 rows)
+
+:get_no2pc
+                                data                                
+--------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
  COMMIT
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:9
  COMMIT
-(22 rows)
+(6 rows)
+
+-- If we do something that takes a strong lock on a catalog relation we need to
+-- read in order to decode a transaction we deadlock; we can't finish decoding
+-- until the lock is released, but we're waiting for decoding to finish so we
+-- can make a commit/abort decision.
+---
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+ relation | locktype | mode 
+----------+----------+------
+(0 rows)
 
+-- Shouldn't see anything with 2pc decoding off
+:get_no2pc
+ data 
+------
+(0 rows)
+
+-- If we try to decode it now we'll deadlock
+SET statement_timeout = '10s';
+:get_with2pc_nofilter
+-- FIXME we expect a timeout here, but it actually works...
+ERROR: statement timed out
+
+RESET statement_timeout;
+-- we can decode past it by skipping xacts with catalog changes
+-- and let it be decoded after COMMIT PREPARED, though.
+:get_with2pc
+ data 
+------
+(0 rows)
+
+COMMIT PREPARED 'test_prepared_lock';
+-- Both will work normally after we commit
+:get_no2pc
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ COMMIT
+(4 rows)
+
+:get_with2pc
+ data 
+------
+(0 rows)
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
  
 (1 row)
 
+SELECT pg_drop_replication_slot('regression_slot_2pc');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e72639767e..a94503c8b8 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -1,22 +1,36 @@
 -- predictability
 SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Reused queries
+\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
+\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
+\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
 
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+:get_with2pc
+:get_no2pc
 COMMIT PREPARED 'test_prepared#1';
+:get_with2pc
+:get_no2pc
 INSERT INTO test_prepared1 VALUES (2);
 
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+:get_no2pc
+:get_with2pc
 ROLLBACK PREPARED 'test_prepared#2';
+:get_no2pc
+:get_with2pc
 
 INSERT INTO test_prepared1 VALUES (4);
 
@@ -27,24 +41,74 @@ ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
 
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+
+:get_no2pc
+:get_with2pc
 
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists. Our 2pc filter callback will skip decoding of xacts
+-- with catalog changes at PREPARE time, so we don't decode it now.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
 INSERT INTO test_prepared2 VALUES (7);
+:get_with2pc
+:get_no2pc
 
 COMMIT PREPARED 'test_prepared#3';
+:get_no2pc
+:get_with2pc
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (8);
 INSERT INTO test_prepared2 VALUES (9);
+:get_with2pc
+:get_no2pc
+
+-- If we do something that takes a strong lock on a catalog relation we need to
+-- read in order to decode a transaction we deadlock; we can't finish decoding
+-- until the lock is released, but we're waiting for decoding to finish so we
+-- can make a commit/abort decision.
+---
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+
+-- Shouldn't see anything with 2pc decoding off
+:get_no2pc
+
+-- If we try to decode it now we'll deadlock
+SET statement_timeout = '10s';
+:get_with2pc_nofilter
+RESET statement_timeout;
+
+-- we can decode past it by skipping xacts with catalog changes
+-- and let it be decoded after COMMIT PREPARED, though.
+:get_with2pc
+
+COMMIT PREPARED 'test_prepared_lock';
+
+-- Both will work normally after we commit
+:get_no2pc
+:get_with2pc
 
 -- cleanup
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-
 SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_drop_replication_slot('regression_slot_2pc');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 135b3b7638..fb0deacfb3 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -24,6 +24,8 @@
 #include "replication/message.h"
 #include "replication/origin.h"
 
+#include "storage/procarray.h"
+
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -46,6 +48,8 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		twophase_decoding;
+	bool		twophase_decode_with_catalog_changes;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -68,6 +72,19 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 				  bool transactional, const char *prefix,
 				  Size sz, const char *message);
+static bool pg_filter_prepare(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
+
 
 void
 _PG_init(void)
@@ -85,9 +102,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pg_decode_begin_txn;
 	cb->change_cb = pg_decode_change;
 	cb->commit_cb = pg_decode_commit_txn;
+
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+
+	cb->filter_prepare_cb = pg_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
 }
 
 
@@ -107,6 +130,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->twophase_decoding = false;
+	data->twophase_decode_with_catalog_changes = false;
 
 	ctx->output_plugin_private = data;
 
@@ -176,6 +201,27 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "twophase-decoding") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->twophase_decoding = true;
+			else if (!parse_bool(strVal(elem->arg), &data->twophase_decoding))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
+		else if (strcmp(elem->defname, "twophase-decode-with-catalog-changes") == 0)
+		{
+			if (elem->arg == NULL)
+				data->twophase_decode_with_catalog_changes = true;
+			else if (!parse_bool(strVal(elem->arg), &data->twophase_decode_with_catalog_changes))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -232,10 +278,163 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfoString(ctx->out, "COMMIT");
+
 	if (data->include_xids)
-		appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
-	else
-		appendStringInfoString(ctx->out, "COMMIT");
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+
+/* Filter out unnecessary two-phase transactions */
+static bool
+pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					char *gid)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* treat all transaction as one-phase */
+	if (!data->twophase_decoding)
+		return true;
+
+	/*
+	 * Two-phase transactions that accessed catalog require special
+	 * treatment.
+	 *
+	 * Right now we don't have a safe way to decode catalog changes made in
+	 * prepared transaction that was already aborted by the time of
+	 * decoding.
+	 *
+	 * That kind of problem arises only when we are trying to
+	 * retrospectively decode aborted transactions with catalog changes -
+	 * including if a transaction aborts while we're decoding it. If one
+	 * wants to code distributed commit based on prepare decoding then
+	 * commits/aborts will happend strictly after decoding will be
+	 * completed, so it is possible to skip any checks/locks here.
+	 *
+	 * We'll also get stuck trying to acquire locks on catalog relations
+	 * we need for decoding if the prepared xact holds a strong lock on
+	 * one of them and we also need to decode row changes.
+	 */
+	if (txn->has_catalog_changes)
+	{
+		LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+		if (TransactionIdIsInProgress(txn->xid))
+		{
+			/*
+			 * For the sake of simplicity, by default we just
+			 * ignore in-progess prepared transactions with catalog
+			 * changes in this extension. If they abort during
+			 * decoding then tuples we need to decode them may be
+			 * overwritten while we're still decoding, causing
+			 * wrong catalog lookups.
+			 *
+			 * It is possible to move that LWLockRelease() to
+			 * pg_decode_prepare_txn() and allow decoding of
+			 * running prepared tx, but such lock will prevent any
+			 * 2pc transaction commit during decoding time.  That
+			 * can be a long time in case of lots of
+			 * changes/inserts in that tx or if the downstream is
+			 * slow/unresonsive.
+			 *
+			 * (Continuing to decode without the lock is unsafe, XXX)
+			 */
+			LWLockRelease(TwoPhaseStateLock);
+			return !data->twophase_decode_with_catalog_changes;
+		}
+		else if (TransactionIdDidAbort(txn->xid))
+		{
+			/*
+			 * Here we know that it is already aborted and there is
+			 * not much sense in doing something with this
+			 * transaction.  Consequently ABORT PREPARED will be
+			 * suppressed.
+			 */
+			LWLockRelease(TwoPhaseStateLock);
+			return true;
+		}
+
+		LWLockRelease(TwoPhaseStateLock);
+	}
+
+	return false;
+}
+
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ABORT PREPARED %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
 
 	if (data->include_timestamp)
 		appendStringInfo(ctx->out, " (at %s)",
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 3aafa79e52..8756e4ed18 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -100,8 +100,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
 
 		parsed->twophase_xid = xl_twophase->xid;
-
 		data += sizeof(xl_xact_twophase);
+
+		if (parsed->xinfo & XACT_XINFO_HAS_GID)
+		{
+			strcpy(parsed->twophase_gid, data);
+			data += strlen(parsed->twophase_gid) + 1;
+		}
 	}
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +144,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		data += sizeof(xl_xact_xinfo);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+	{
+		xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+		parsed->dbId = xl_dbinfo->dbId;
+		parsed->tsId = xl_dbinfo->tsId;
+
+		data += sizeof(xl_xact_dbinfo);
+	}
+
 	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -166,8 +181,26 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
 
 		parsed->twophase_xid = xl_twophase->xid;
-
 		data += sizeof(xl_xact_twophase);
+
+		if (parsed->xinfo & XACT_XINFO_HAS_GID)
+		{
+			strcpy(parsed->twophase_gid, data);
+			data += strlen(parsed->twophase_gid) + 1;
+		}
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		xl_xact_origin xl_origin;
+
+		/* we're only guaranteed 4 byte alignment, so copy onto stack */
+		memcpy(&xl_origin, data, sizeof(xl_origin));
+
+		parsed->origin_lsn = xl_origin.origin_lsn;
+		parsed->origin_timestamp = xl_origin.origin_timestamp;
+
+		data += sizeof(xl_xact_origin);
 	}
 }
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index b715152e8d..c764c6c22b 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -148,7 +148,6 @@ int			max_prepared_xacts = 0;
  * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
  * specified in TwoPhaseFileHeader.
  */
-#define GIDSIZE 200
 
 typedef struct GlobalTransactionData
 {
@@ -211,12 +210,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval);
+								bool initfileinval,
+								const char *gid);
 static void RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels);
+							   RelFileNode *rels,
+							   const char *gid);
 static void ProcessRecords(char *bufptr, TransactionId xid,
 			   const TwoPhaseCallback callbacks[]);
 static void RemoveGXact(GlobalTransaction gxact);
@@ -898,7 +899,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
 /*
  * Header for a 2PC state file
  */
-#define TWOPHASE_MAGIC	0x57F94533	/* format identifier */
+#define TWOPHASE_MAGIC	0x57F94534		/* format identifier */
 
 typedef struct TwoPhaseFileHeader
 {
@@ -914,6 +915,8 @@ typedef struct TwoPhaseFileHeader
 	int32		ninvalmsgs;		/* number of cache invalidation messages */
 	bool		initfileinval;	/* does relcache init file need invalidation? */
 	uint16		gidlen;			/* length of the GID - GID follows the header */
+	XLogRecPtr	origin_lsn;		/* lsn of this record at origin node */
+	TimestampTz origin_timestamp; /* time of prepare at origin node */
 } TwoPhaseFileHeader;
 
 /*
@@ -1065,6 +1068,7 @@ EndPrepare(GlobalTransaction gxact)
 {
 	TwoPhaseFileHeader *hdr;
 	StateFileChunk *record;
+	bool			replorigin;
 
 	/* Add the end sentinel to the list of 2PC records */
 	RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1075,6 +1079,21 @@ EndPrepare(GlobalTransaction gxact)
 	Assert(hdr->magic == TWOPHASE_MAGIC);
 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
+	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+				  replorigin_session_origin != DoNotReplicateId);
+
+	if (replorigin)
+	{
+		Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+		hdr->origin_lsn = replorigin_session_origin_lsn;
+		hdr->origin_timestamp = replorigin_session_origin_timestamp;
+	}
+	else
+	{
+		hdr->origin_lsn = InvalidXLogRecPtr;
+		hdr->origin_timestamp = 0;
+	}
+
 	/*
 	 * If the data size exceeds MaxAllocSize, we won't be able to read it in
 	 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1105,9 +1124,19 @@ EndPrepare(GlobalTransaction gxact)
 	MyPgXact->delayChkpt = true;
 
 	XLogBeginInsert();
+
 	for (record = records.head; record != NULL; record = record->next)
 		XLogRegisterData(record->data, record->len);
+
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 	gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+	if (replorigin)
+		/* Move LSNs forward for this replication origin */
+		replorigin_session_advance(replorigin_session_origin_lsn,
+								   gxact->prepare_end_lsn);
+
 	XLogFlush(gxact->prepare_end_lsn);
 
 	/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1283,6 +1312,43 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 	return buf;
 }
 
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+	TwoPhaseFileHeader *hdr;
+	char *bufptr;
+
+	hdr = (TwoPhaseFileHeader *) xlrec;
+	bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+	parsed->origin_lsn = hdr->origin_lsn;
+	parsed->origin_timestamp = hdr->origin_timestamp;
+	parsed->twophase_xid = hdr->xid;
+	parsed->dbId = hdr->database;
+	parsed->nsubxacts = hdr->nsubxacts;
+	parsed->ncommitrels = hdr->ncommitrels;
+	parsed->nabortrels = hdr->nabortrels;
+	parsed->nmsgs = hdr->ninvalmsgs;
+
+	strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+	bufptr += MAXALIGN(hdr->gidlen);
+
+	parsed->subxacts = (TransactionId *) bufptr;
+	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+	parsed->commitrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+	parsed->abortrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+	parsed->msgs = (SharedInvalidationMessage *) bufptr;
+	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
 
 /*
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1435,11 +1501,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ninvalmsgs, invalmsgs,
-										hdr->initfileinval);
+										hdr->initfileinval, gid);
 	else
 		RecordTransactionAbortPrepared(xid,
 									   hdr->nsubxacts, children,
-									   hdr->nabortrels, abortrels);
+									   hdr->nabortrels, abortrels,
+									   gid);
 
 	ProcArrayRemove(proc, latestXid);
 
@@ -2165,7 +2232,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval)
+								bool initfileinval,
+								const char *gid)
 {
 	XLogRecPtr	recptr;
 	TimestampTz committs = GetCurrentTimestamp();
@@ -2193,7 +2261,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								 ninvalmsgs, invalmsgs,
 								 initfileinval, false,
 								 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								 xid);
+								 xid, gid);
 
 
 	if (replorigin)
@@ -2255,7 +2323,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels)
+							   RelFileNode *rels,
+							   const char *gid)
 {
 	XLogRecPtr	recptr;
 
@@ -2278,7 +2347,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 								nchildren, children,
 								nrels, rels,
 								MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								xid);
+								xid, gid);
 
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c06fabca10..e22622bfb2 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1226,7 +1226,7 @@ RecordTransactionCommit(void)
 							nmsgs, invalMessages,
 							RelcacheInitFileInval, forceSyncCommit,
 							MyXactFlags,
-							InvalidTransactionId /* plain commit */ );
+							InvalidTransactionId, NULL /* plain commit */ );
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
@@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact)
 	XactLogAbortRecord(xact_time,
 					   nchildren, children,
 					   nrels, rels,
-					   MyXactFlags, InvalidTransactionId);
+					   MyXactFlags, InvalidTransactionId,
+					   NULL);
 
 	/*
 	 * Report the latest async abort LSN, so that the WAL writer knows to
@@ -5256,7 +5257,8 @@ XactLogCommitRecord(TimestampTz commit_time,
 					int nrels, RelFileNode *rels,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
-					int xactflags, TransactionId twophase_xid)
+					int xactflags, TransactionId twophase_xid,
+					const char *twophase_gid)
 {
 	xl_xact_commit xlrec;
 	xl_xact_xinfo xl_xinfo;
@@ -5268,6 +5270,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_origin xl_origin;
 
 	uint8		info;
+	int			gidlen = 0;
 
 	Assert(CritSectionCount > 0);
 
@@ -5330,6 +5333,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		Assert(twophase_gid != NULL);
+
+		if (XLogLogicalInfoActive())
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+		}
 	}
 
 	/* dump transaction origin information */
@@ -5380,8 +5390,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
 
+		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+			XLogRegisterData((char *) twophase_gid, gidlen);
+	}
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
@@ -5401,15 +5416,19 @@ XLogRecPtr
 XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   int xactflags, TransactionId twophase_xid)
+				   int xactflags, TransactionId twophase_xid,
+				   const char *twophase_gid)
 {
 	xl_xact_abort xlrec;
 	xl_xact_xinfo xl_xinfo;
 	xl_xact_subxacts xl_subxacts;
 	xl_xact_relfilenodes xl_relfilenodes;
 	xl_xact_twophase xl_twophase;
+	xl_xact_dbinfo xl_dbinfo;
+	xl_xact_origin xl_origin;
 
 	uint8		info;
+	int			gidlen = 0;
 
 	Assert(CritSectionCount > 0);
 
@@ -5421,7 +5440,6 @@ XactLogAbortRecord(TimestampTz abort_time,
 	else
 		info = XLOG_XACT_ABORT_PREPARED;
 
-
 	/* First figure out and collect all the information needed */
 
 	xlrec.xact_time = abort_time;
@@ -5445,6 +5463,31 @@ XactLogAbortRecord(TimestampTz abort_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		Assert(twophase_gid != NULL);
+
+		if (XLogLogicalInfoActive())
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+		}
+	}
+
+	if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+		xl_dbinfo.dbId = MyDatabaseId;
+		xl_dbinfo.tsId = MyDatabaseTableSpace;
+	}
+
+	/* dump transaction origin information only for abort prepared */
+	if ( (replorigin_session_origin != InvalidRepOriginId) &&
+			TransactionIdIsValid(twophase_xid) &&
+			XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+		xl_origin.origin_lsn = replorigin_session_origin_lsn;
+		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -5459,6 +5502,9 @@ XactLogAbortRecord(TimestampTz abort_time,
 	if (xl_xinfo.xinfo != 0)
 		XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		XLogRegisterData((char *) (&xl_subxacts),
@@ -5476,8 +5522,22 @@ XactLogAbortRecord(TimestampTz abort_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
 
+		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+			XLogRegisterData((char *) twophase_gid, gidlen);
+	}
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+	if (TransactionIdIsValid(twophase_xid))
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 	return XLogInsert(RM_XACT_ID, info);
 }
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 486fd0c988..7b2eec2402 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/twophase.h"
 
 #include "catalog/pg_control.h"
 
@@ -71,7 +72,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			xl_xact_parsed_abort *parsed, TransactionId xid);
+			 xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -277,17 +280,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_PREPARE:
+			{
+				xl_xact_parsed_prepare parsed;
 
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
-			break;
+				/* check that output plugin capable of twophase decoding */
+				if (!ctx->twophase_hadling)
+				{
+					ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+					break;
+				}
+
+				/* ok, parse it */
+				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+										XLogRecGetData(buf->record), &parsed);
+
+				/* does output plugin wants this particular transaction? */
+				if (ctx->callbacks.filter_prepare_cb &&
+					ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
+													 parsed.twophase_gid))
+				{
+					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+												buf->origptr);
+					break;
+				}
+
+				DecodePrepare(ctx, buf, &parsed);
+				break;
+			}
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
@@ -551,8 +570,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * Process invalidation messages, even if we're not interested in the
 	 * transaction's contents, since the various caches need to always be
 	 * consistent.
+	 *
+	 * Also if that transaction was sent to prepare callback then both
+	 * this function were called during prepare.
 	 */
-	if (parsed->nmsgs > 0)
+	if (parsed->nmsgs > 0 &&
+			!(TransactionIdIsValid(parsed->twophase_xid) &&
+				ReorderBufferTxnIsPrepared(ctx->reorder, xid)))
 	{
 		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
 									  parsed->nmsgs, parsed->msgs);
@@ -607,9 +631,81 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	if (TransactionIdIsValid(parsed->twophase_xid) &&
+								ReorderBufferTxnIsPrepared(ctx->reorder, xid))
+	{
+		/*
+		 * We are processing COMMIT PREPARED and know that reorder buffer is
+		 * empty. So we can skip use shortcut for coomiting bare xact.
+		 */
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+					commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
+	}
+	else
+	{
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+
+/*
+ * Decode PREPARE record. Same logic as in COMMIT, but diffent calls
+ * to SnapshotBuilder as we need to mark this transaction as commited
+ * instead of running to properly decode it. When prepared transation
+ * is decoded we mark it in snapshot as running again.
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed)
+{
+	XLogRecPtr	origin_lsn = parsed->origin_lsn;
+	TimestampTz	commit_time = parsed->origin_timestamp;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	int			i;
+	TransactionId xid = parsed->twophase_xid;
+
+	/*
+	 * Process invalidation messages, even if we're not interested in the
+	 * transaction's contents, since the various caches need to always be
+	 * consistent.
+	 */
+	if (parsed->nmsgs > 0)
+	{
+		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+									  parsed->nmsgs, parsed->msgs);
+		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+	}
+
+	SnapBuildPrepareTxnStart(ctx->snapshot_builder, buf->origptr, xid,
+					   parsed->nsubxacts, parsed->subxacts);
+
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		FilterByOrigin(ctx, origin_id))
+	{
+		for (i = 0; i < parsed->nsubxacts; i++)
+		{
+			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+		}
+		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+		return;
+	}
+
+	/* tell the reorderbuffer about the surviving subtransactions */
+	for (i = 0; i < parsed->nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+								 buf->origptr, buf->endptr);
+	}
+
 	/* replay actions of all transaction + subtransactions in order */
-	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time, origin_id, origin_lsn);
+	ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
+						commit_time, origin_id, origin_lsn, parsed->twophase_gid);
+
+	SnapBuildPrepareTxnFinish(ctx->snapshot_builder, xid);
 }
 
 /*
@@ -621,6 +717,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz	commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
+	}
+
+	/*
+	 * If that is ROLLBACK PREPARED than send that to callbacks.
+	 */
+	if (TransactionIdIsValid(xid) &&
+			!SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
+			parsed->dbId == ctx->slot->data.database &&
+			!FilterByOrigin(ctx, origin_id) &&
+			ReorderBufferTxnIsPrepared(ctx->reorder, xid))
+	{
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+						commit_time, origin_id, origin_lsn,
+						parsed->twophase_gid, false);
+		return;
+	}
 
 	for (i = 0; i < parsed->nsubxacts; i++)
 	{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index bca585fc27..93ba3fbc5a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -58,6 +58,14 @@ static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions
 				   bool is_init);
 static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn);
+static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -124,6 +132,7 @@ StartupDecodingContext(List *output_plugin_options,
 	MemoryContext context,
 				old_context;
 	LogicalDecodingContext *ctx;
+	int			twophase_callbacks;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -182,8 +191,25 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+	ctx->reorder->prepare = prepare_cb_wrapper;
+	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+	ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
+	/* check that plugin implements all necessary callbacks to perform 2PC */
+	twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) +
+		(ctx->callbacks.commit_prepared_cb != NULL) +
+		(ctx->callbacks.abort_prepared_cb != NULL);
+
+	ctx->twophase_hadling = (twophase_callbacks == 3);
+
+	if (twophase_callbacks != 3 && twophase_callbacks != 0)
+		ereport(WARNING,
+			(errmsg("Output plugin registered only %d twophase callbacks out of 3. "
+					"Twophase transactions will be decoded as ordinary ones.",
+					twophase_callbacks)));
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -680,6 +706,93 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "prepare";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "commit_prepared";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort_prepared";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
 {
@@ -714,6 +827,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_prepare";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_prepare_cb(ctx, txn, gid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+	return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9b126b2957..6952cbc28d 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -101,6 +101,66 @@ logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
 	commit_data->commit_lsn = pq_getmsgint64(in);
 	commit_data->end_lsn = pq_getmsgint64(in);
 	commit_data->committime = pq_getmsgint64(in);
+
+	/* set gid to empty */
+	commit_data->gid[0] = '\0';
+}
+
+/*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+						XLogRecPtr prepare_lsn)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, 'P');		/* sending PREPARE protocol */
+
+	if (txn->txn_flags & TXN_COMMIT_PREPARED)
+		flags |= LOGICALREP_IS_COMMIT_PREPARED;
+	else if (txn->txn_flags & TXN_ROLLBACK_PREPARED)
+		flags |= LOGICALREP_IS_ROLLBACK_PREPARED;
+	else if (txn->txn_flags & TXN_PREPARE)
+		flags |= LOGICALREP_IS_PREPARE;
+
+	if (flags == 0)
+		elog(ERROR, "unrecognized flags %u in [commit|rollback] prepare message", flags);
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fields */
+	pq_sendint64(out, prepare_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+
+	/* send gid */
+	pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepCommitData *commit_data, uint8 *flags)
+{
+	/* read flags */
+	uint8		prep_flags = pq_getmsgbyte(in);
+
+	if (!(prep_flags & LOGICALREP_PREPARE_MASK))
+		elog(ERROR, "unrecognized flags %u in prepare message", prep_flags);
+
+	/* read fields */
+	commit_data->commit_lsn = pq_getmsgint64(in);
+	commit_data->end_lsn = pq_getmsgint64(in);
+	commit_data->committime = pq_getmsgint64(in);
+
+	/* read gid */
+	strcpy(commit_data->gid, pq_getmsgstring(in));
+
+	/* set flags */
+	*flags = prep_flags;
 }
 
 /*
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 0f607bab70..3d9598aab8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1307,25 +1307,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
  * the top and subtransactions (using a k-way merge) and replay the changes in
  * lsn order.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferCommitInternal(ReorderBufferTXN *txn,
+					ReorderBuffer *rb, TransactionId xid,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 					TimestampTz commit_time,
 					RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	ReorderBufferTXN *txn;
 	volatile Snapshot snapshot_now;
 	volatile CommandId command_id = FirstCommandId;
 	bool		using_subtxn;
 	ReorderBufferIterTXNState *volatile iterstate = NULL;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-
-	/* unknown transaction, nothing to replay */
-	if (txn == NULL)
-		return;
-
 	txn->final_lsn = commit_lsn;
 	txn->end_lsn = end_lsn;
 	txn->commit_time = commit_time;
@@ -1604,8 +1597,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		/* call commit or prepare callback */
+		if (txn_prepared(txn))
+			rb->prepare(rb, txn, commit_lsn);
+		else
+			rb->commit(rb, txn, commit_lsn);
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -1632,8 +1628,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		if (snapshot_now->copied)
 			ReorderBufferFreeSnap(rb, snapshot_now);
 
-		/* remove potential on-disk data, and deallocate */
-		ReorderBufferCleanupTXN(rb, txn);
+		/*
+		 * remove potential on-disk data, and deallocate or postpone that
+		 * till the finish of two-phase tx
+		 */
+		if (!txn_prepared(txn))
+			ReorderBufferCleanupTXN(rb, txn);
 	}
 	PG_CATCH();
 	{
@@ -1667,6 +1667,125 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as one-phase later on commit.
+ */
+bool
+ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+	return rb->filter_prepare(rb, txn, gid);
+}
+
+
+/*
+ * Commit non-twophase transaction. See comments to ReorderBufferCommitInternal()
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+										commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Prepare twophase transaction. It calls ReorderBufferCommitInternal()
+ * since all transaction changes should be decoded on PREPARE.
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	txn->txn_flags |= TXN_PREPARE;
+	strcpy(txn->gid, gid);
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+										commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Check whether this transaction was sent as prepared to receiver.
+ * Called upon commit/abort prepared.
+ */
+bool
+ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/*
+	 * If txn == NULL then presumably subscriber confirmed prepare
+	 * but we are rebooted.
+	 */
+	return txn == NULL ? true : txn_prepared(txn);
+}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid, bool is_commit)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+								true);
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+	strcpy(txn->gid, gid);
+
+	if (is_commit)
+	{
+		txn->txn_flags |= TXN_COMMIT_PREPARED;
+		rb->commit_prepared(rb, txn, commit_lsn);
+	}
+	else
+	{
+		txn->txn_flags |= TXN_ROLLBACK_PREPARED;
+		rb->abort_prepared(rb, txn, commit_lsn);
+	}
+
+	ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
  *
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ad65b9831d..3ba6841770 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -901,7 +901,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
 	/* copy xids that still are interesting to workspace */
 	for (off = 0; off < builder->committed.xcnt; off++)
 	{
-		if (NormalTransactionIdPrecedes(builder->committed.xip[off],
+		if (TransactionIdPrecedes(builder->committed.xip[off],
 										builder->xmin))
 			;					/* remove */
 		else
@@ -1079,6 +1079,52 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	}
 }
 
+/*
+ * Just a wrapper to clarify DecodePrepare().
+ * Right now we can't extract correct historic catalog data that
+ * was produced by aborted prepared transaction, so it work of
+ * decoding plugin to avoid such situation and here we just construct usual
+ * snapshot to able to decode prepare.
+ */
+void
+SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
+				   int nsubxacts, TransactionId *subxacts)
+{
+	SnapBuildCommitTxn(builder, lsn, xid, nsubxacts, subxacts);
+}
+
+
+/*
+ * When decoding of preppare is finished we want should exclude our xid
+ * from list of committed xids to have correct snapshot between prepare
+ * and commit.
+ *
+ * However, this is not sctrictly needed. Prepared transaction holds locks
+ * between prepare and commit so nodody can produce new version of our
+ * catalog tuples. In case of abort we will have this xid in array of
+ * commited xids, but it also will not cause a problem since checks of
+ * HeapTupleHeaderXminInvalid() in HeapTupleSatisfiesHistoricMVCC()
+ * have higher priority then checks for xip array. Anyway let's be consistent
+ * about definitions and delete this xid from xip array.
+ */
+void
+SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid)
+{
+	TransactionId *search = bsearch(&xid, builder->committed.xip,
+				builder->committed.xcnt, sizeof(TransactionId), xidComparator);
+
+	if (search == NULL)
+		return;
+
+	/* delete that xid */
+	memmove(search, search + 1,
+			((builder->committed.xip + builder->committed.xcnt - 1) - search) * sizeof(TransactionId));
+	builder->committed.xcnt--;
+
+	/* update min/max */
+	builder->xmin = builder->committed.xip[0];
+	builder->xmax = builder->committed.xip[builder->committed.xcnt - 1];
+}
 
 /* -----------------------------------
  * Snapshot building functions dealing with xlog records
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fa5d9bb120..f1e91efeec 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -487,6 +487,121 @@ apply_handle_commit(StringInfo s)
 	pgstat_report_activity(STATE_IDLE, NULL);
 }
 
+static void
+apply_handle_prepare_txn(LogicalRepCommitData *commit_data)
+{
+	Assert(commit_data->commit_lsn == remote_final_lsn);
+	/* The synchronization worker runs in single transaction. */
+	if (IsTransactionState() && !am_tablesync_worker())
+	{
+		/* End the earlier transaction and start a new one */
+		BeginTransactionBlock();
+		CommitTransactionCommand();
+		StartTransactionCommand();
+		/*
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
+		 */
+		replorigin_session_origin_lsn = commit_data->end_lsn;
+		replorigin_session_origin_timestamp = commit_data->committime;
+
+		PrepareTransactionBlock(commit_data->gid);
+		CommitTransactionCommand();
+		pgstat_report_stat(false);
+
+		store_flush_position(commit_data->end_lsn);
+	}
+	else
+	{
+		/* Process any invalidation messages that might have accumulated. */
+		AcceptInvalidationMessages();
+		/* TODO: what to do here for prepared transactions?? */
+		Assert(false);
+	}
+
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(commit_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+static void
+apply_handle_commit_prepared_txn(LogicalRepCommitData *commit_data)
+{
+	/* there is no transaction when COMMIT PREPARED is called */
+	ensure_transaction();
+
+	/*
+	 * Update origin state so we can restart streaming from correct
+	 * position in case of crash.
+	 */
+	replorigin_session_origin_lsn = commit_data->end_lsn;
+	replorigin_session_origin_timestamp = commit_data->committime;
+
+	FinishPreparedTransaction(commit_data->gid, true);
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
+
+	store_flush_position(commit_data->end_lsn);
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(commit_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+static void
+apply_handle_rollback_prepared_txn(LogicalRepCommitData *commit_data)
+{
+	/* there is no transaction when ABORT/ROLLBACK PREPARED is called */
+	ensure_transaction();
+
+	/*
+	 * Update origin state so we can restart streaming from correct
+	 * position in case of crash.
+	 */
+	replorigin_session_origin_lsn = commit_data->end_lsn;
+	replorigin_session_origin_timestamp = commit_data->committime;
+
+	FinishPreparedTransaction(commit_data->gid, false);
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
+
+	store_flush_position(commit_data->end_lsn);
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(commit_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+	LogicalRepCommitData commit_data;
+	uint8	flags = 0;
+
+	logicalrep_read_prepare(s, &commit_data, &flags);
+
+	if (flags & LOGICALREP_IS_PREPARE)
+		apply_handle_prepare_txn(&commit_data);
+	else if (flags & LOGICALREP_IS_COMMIT_PREPARED)
+		apply_handle_commit_prepared_txn(&commit_data);
+	else if (flags & LOGICALREP_IS_ROLLBACK_PREPARED)
+		apply_handle_rollback_prepared_txn(&commit_data);
+	else
+		ereport(ERROR,
+			(errcode(ERRCODE_PROTOCOL_VIOLATION),
+			 errmsg("wrong [commit|rollback] prepare message")));
+}
+
 /*
  * Handle ORIGIN message.
  *
@@ -888,6 +1003,10 @@ apply_dispatch(StringInfo s)
 		case 'C':
 			apply_handle_commit(s);
 			break;
+			/* [COMMIT|ROLLBACK] PREPARE */
+		case 'P':
+			apply_handle_prepare(s);
+			break;
 			/* INSERT */
 		case 'I':
 			apply_handle_insert(s);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index c3126545b4..d55aa5b5a2 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -42,6 +42,14 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
 				ReorderBufferChange *change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 					   RepOriginId origin_id);
+static bool pgoutput_filter_prepare(LogicalDecodingContext *ctx,
+				ReorderBufferTXN *txn, char *gid);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+				ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+				ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx,
+				ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 
 static bool publications_valid;
 
@@ -79,6 +87,12 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pgoutput_begin_txn;
 	cb->change_cb = pgoutput_change;
 	cb->commit_cb = pgoutput_commit_txn;
+
+	cb->filter_prepare_cb = pgoutput_filter_prepare;
+	cb->prepare_cb = pgoutput_prepare_txn;
+	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+	cb->abort_prepared_cb = pgoutput_abort_prepared_txn;
+
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 }
@@ -254,6 +268,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
  * Sends the decoded DML over wire.
  */
 static void
@@ -364,6 +419,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
+ * Filter out unnecessary two-phase transactions.
+ *
+ * Currently, we forward all two-phase transactions
+ */
+static bool
+pgoutput_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+								char *gid)
+{
+	return false;
+}
+
+/*
  * Currently we always forward.
  */
 static bool
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 54dec4eeaf..11ff0511fd 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
 #define TWOPHASE_H
 
 #include "access/xlogdefs.h"
+#include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
 
@@ -57,4 +58,6 @@ extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 			   XLogRecPtr end_lsn);
 extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
 extern void restoreTwoPhaseData(void);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+							   xl_xact_parsed_prepare *parsed);
 #endif							/* TWOPHASE_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 118b0a8432..1f093fb7b4 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,10 @@
 #include "storage/sinval.h"
 #include "utils/datetime.h"
 
+/*
+ * Maximum size of Global Transaction ID (including '\0').
+ */
+#define GIDSIZE 200
 
 /*
  * Xact isolation levels
@@ -156,6 +160,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_TWOPHASE			(1U << 4)
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
+#define XACT_XINFO_HAS_GID				(1U << 7)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -302,13 +307,40 @@ typedef struct xl_xact_parsed_commit
 	SharedInvalidationMessage *msgs;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
 } xl_xact_parsed_commit;
 
+typedef struct xl_xact_parsed_prepare
+{
+	Oid			dbId;			/* MyDatabaseId */
+
+	int			nsubxacts;
+	TransactionId *subxacts;
+
+	int			ncommitrels;
+	RelFileNode *commitrels;
+
+	int			nabortrels;
+	RelFileNode *abortrels;
+
+	int			nmsgs;
+	SharedInvalidationMessage *msgs;
+
+	TransactionId twophase_xid;
+	char 		twophase_gid[GIDSIZE];
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+} xl_xact_parsed_prepare;
+
 typedef struct xl_xact_parsed_abort
 {
+	Oid			dbId;
+	Oid			tsId;
+
 	TimestampTz xact_time;
 	uint32		xinfo;
 
@@ -319,6 +351,10 @@ typedef struct xl_xact_parsed_abort
 	RelFileNode *xnodes;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
 } xl_xact_parsed_abort;
 
 
@@ -386,12 +422,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
 					int xactflags,
-					TransactionId twophase_xid);
+					TransactionId twophase_xid, const char *twophase_gid);
 
 extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   int xactflags, TransactionId twophase_xid);
+				   int xactflags, TransactionId twophase_xid,
+				   const char *twophase_gid);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7f0e0fa881..4a1ca4a2b9 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -82,6 +82,11 @@ typedef struct LogicalDecodingContext
 	bool		prepared_write;
 	XLogRecPtr	write_location;
 	TransactionId write_xid;
+
+	/*
+	 * Capabilities of decoding plugin used.
+	 */
+	bool		twophase_hadling;
 } LogicalDecodingContext;
 
 
@@ -111,5 +116,4 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
-
 #endif
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a9736e1bf6..99f0c50de8 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -69,11 +69,18 @@ typedef struct LogicalRepBeginData
 	TransactionId xid;
 } LogicalRepBeginData;
 
+#define LOGICALREP_IS_COMMIT			0x01
+#define LOGICALREP_IS_PREPARE			0x02
+#define LOGICALREP_IS_COMMIT_PREPARED	0x04
+#define LOGICALREP_IS_ROLLBACK_PREPARED	0x08
+#define LOGICALREP_PREPARE_MASK	(LOGICALREP_IS_PREPARE | LOGICALREP_IS_COMMIT_PREPARED | LOGICALREP_IS_ROLLBACK_PREPARED)
 typedef struct LogicalRepCommitData
 {
+	uint8		flag;
 	XLogRecPtr	commit_lsn;
 	XLogRecPtr	end_lsn;
 	TimestampTz committime;
+	char		gid[GIDSIZE];
 } LogicalRepCommitData;
 
 extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
@@ -81,8 +88,12 @@ extern void logicalrep_read_begin(StringInfo in,
 					  LogicalRepBeginData *begin_data);
 extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 						XLogRecPtr commit_lsn);
+extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+						XLogRecPtr prepare_lsn);
 extern void logicalrep_read_commit(StringInfo in,
 					   LogicalRepCommitData *commit_data);
+extern void logicalrep_read_prepare(StringInfo in,
+					   LogicalRepCommitData *commit_data, uint8 *flags);
 extern void logicalrep_write_origin(StringInfo out, const char *origin,
 						XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 26ff024882..11a7af7da8 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -68,6 +68,38 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   XLogRecPtr commit_lsn);
 
 /*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare
+ * and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+													  ReorderBufferTXN *txn,
+													  char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr abort_lsn);
+
+/*
  * Called for the generic logical decoding messages.
  */
 typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
@@ -98,6 +130,10 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeFilterPrepareCB filter_prepare_cb;
+	LogicalDecodePrepareCB prepare_cb;
+	LogicalDecodeCommitPreparedCB commit_prepared_cb;
+	LogicalDecodeAbortPreparedCB abort_prepared_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 86effe106b..ee18fa346b 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -137,13 +138,28 @@ typedef struct ReorderBufferChange
 	dlist_node	node;
 } ReorderBufferChange;
 
+
+/* TODO: convert existing bools into flags later */
+/* values for txn_flags */
+#define TXN_HAS_CATALOG_CHANGES	0x0001
+#define TXN_IS_SUBXACT			0x0002
+#define TXN_PREPARE				0x0004
+#define TXN_COMMIT_PREPARED		0x0008
+#define TXN_ROLLBACK_PREPARED	0x0010
+#define txn_prepared(txn)		(txn->txn_flags & TXN_PREPARE)
+
 typedef struct ReorderBufferTXN
 {
+	int		txn_flags;
+
 	/*
 	 * The transactions transaction id, can be a toplevel or sub xid.
 	 */
 	TransactionId xid;
 
+	/* In case of 2PC we need to pass GID to output plugin */
+	char		gid[GIDSIZE];
+
 	/* did the TX have catalog changes */
 	bool		has_catalog_changes;
 
@@ -292,6 +308,29 @@ typedef void (*ReorderBufferCommitCB) (
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+typedef bool (*ReorderBufferFilterPrepareCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr commit_lsn);
+
+/* abort prepared callback signature */
+typedef void (*ReorderBufferAbortPreparedCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr abort_lsn);
+
 /* message callback signature */
 typedef void (*ReorderBufferMessageCB) (
 										ReorderBuffer *rb,
@@ -327,6 +366,10 @@ struct ReorderBuffer
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferCommitCB commit;
+	ReorderBufferFilterPrepareCB filter_prepare;
+	ReorderBufferPrepareCB prepare;
+	ReorderBufferCommitPreparedCB commit_prepared;
+	ReorderBufferAbortPreparedCB abort_prepared;
 	ReorderBufferMessageCB message;
 
 	/*
@@ -382,6 +425,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 					TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid, bool is_commit);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
@@ -405,6 +453,13 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool		ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid);
+bool		ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid);
+void		ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 7653717f83..7fcd479d8a 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -86,5 +86,9 @@ extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
 							 struct xl_running_xacts *running);
 extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
+extern void SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn,
+									 TransactionId xid, int nsubxacts,
+									 TransactionId *subxacts);
+extern void SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid);
 
 #endif							/* SNAPBUILD_H */
