diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d4ff..f9676b2e01 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -6,19 +6,123 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc_nofilter', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+-- Reused queries
+\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
+\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
+\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc_nofilter'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+:get_no2pc
+ data 
+------
+(0 rows)
+
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
+:get_with2pc_nofilter
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
 COMMIT PREPARED 'test_prepared#1';
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ COMMIT
+(3 rows)
+
+:get_with2pc
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
+:get_with2pc_nofilter
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (2);
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+(3 rows)
+
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
+:get_with2pc_nofilter
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
 ROLLBACK PREPARED 'test_prepared#2';
+:get_no2pc
+ data 
+------
+(0 rows)
+
+:get_with2pc
+               data               
+----------------------------------
+ ABORT PREPARED 'test_prepared#2'
+(1 row)
+
+:get_with2pc_nofilter
+               data               
+----------------------------------
+ ABORT PREPARED 'test_prepared#2'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (4);
 -- test prepared xact containing ddl
 BEGIN;
@@ -26,45 +130,226 @@ INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
-INSERT INTO test_prepared2 VALUES (7);
-COMMIT PREPARED 'test_prepared#3';
--- make sure stuff still works
-INSERT INTO test_prepared1 VALUES (8);
-INSERT INTO test_prepared2 VALUES (9);
--- cleanup
-DROP TABLE test_prepared1;
-DROP TABLE test_prepared2;
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation     | locktype |        mode         
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4
+ COMMIT
+(3 rows)
+
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4
+ COMMIT
+(3 rows)
+
+:get_with2pc_nofilter
                                   data                                   
 -------------------------------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
+ table public.test_prepared1: INSERT: id[integer]:4
  COMMIT
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:2
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(7 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists. Our 2pc filter callback will skip decoding of xacts
+-- with catalog changes at PREPARE time, so we don't decode it now.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
+INSERT INTO test_prepared2 VALUES (7);
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+:get_with2pc
+                        data                        
+----------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:4
+ table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+:get_with2pc_nofilter
+                        data                        
+----------------------------------------------------
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+:get_no2pc
+                                  data                                   
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ COMMIT
+(4 rows)
+
+:get_with2pc
+                                  data                                   
+-------------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:5
  table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
  COMMIT
+(4 rows)
+
+:get_with2pc_nofilter
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#3'
+(1 row)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (8);
+INSERT INTO test_prepared2 VALUES (9);
+:get_no2pc
+                                data                                
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(6 rows)
+
+:get_with2pc
+                                data                                
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(6 rows)
+
+:get_with2pc_nofilter
+                                data                                
+--------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
  COMMIT
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:9
  COMMIT
-(22 rows)
+(6 rows)
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+ relation | locktype | mode 
+----------+----------+------
+(0 rows)
+
+-- Shouldn't see anything with 2pc decoding off
+:get_no2pc
+                        data                         
+-----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ COMMIT
+(3 rows)
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+:get_with2pc
+                        data                         
+-----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ PREPARE TRANSACTION 'test_prepared_lock2'
+ COMMIT PREPARED 'test_prepared_lock2'
+(4 rows)
+
+:get_with2pc_nofilter
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ PREPARE TRANSACTION 'test_prepared_lock2'
+ COMMIT PREPARED 'test_prepared_lock2'
+(8 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- Both will work normally after we commit
+:get_no2pc
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ COMMIT
+(4 rows)
+
+:get_with2pc
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ COMMIT
+(4 rows)
+
+:get_with2pc_nofilter
+                 data                 
+--------------------------------------
+ COMMIT PREPARED 'test_prepared_lock'
+(1 row)
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
 
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
@@ -72,3 +357,9 @@ SELECT pg_drop_replication_slot('regression_slot');
  
 (1 row)
 
+SELECT pg_drop_replication_slot('regression_slot_2pc');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e72639767e..132b30e97b 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -1,22 +1,41 @@
 -- predictability
 SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc_nofilter', 'test_decoding');
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Reused queries
+\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
+\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
+\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc_nofilter'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
 
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
 COMMIT PREPARED 'test_prepared#1';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
 INSERT INTO test_prepared1 VALUES (2);
 
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
 ROLLBACK PREPARED 'test_prepared#2';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
 
 INSERT INTO test_prepared1 VALUES (4);
 
@@ -27,18 +46,74 @@ ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
 
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
 
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists. Our 2pc filter callback will skip decoding of xacts
+-- with catalog changes at PREPARE time, so we don't decode it now.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
 INSERT INTO test_prepared2 VALUES (7);
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
 
 COMMIT PREPARED 'test_prepared#3';
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (8);
 INSERT INTO test_prepared2 VALUES (9);
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+
+-- Shouldn't see anything with 2pc decoding off
+:get_no2pc
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+:get_with2pc
+:get_with2pc_nofilter
+RESET statement_timeout;
+
+COMMIT PREPARED 'test_prepared_lock';
+
+-- Both will work normally after we commit
+:get_no2pc
+:get_with2pc
+:get_with2pc_nofilter
 
 -- cleanup
 DROP TABLE test_prepared1;
@@ -48,3 +123,4 @@ DROP TABLE test_prepared2;
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_drop_replication_slot('regression_slot_2pc');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 135b3b7638..362683feef 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -24,6 +24,8 @@
 #include "replication/message.h"
 #include "replication/origin.h"
 
+#include "storage/procarray.h"
+
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -46,6 +48,8 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		twophase_decoding;
+	bool		twophase_decode_with_catalog_changes;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -68,6 +72,18 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 				  bool transactional, const char *prefix,
 				  Size sz, const char *message);
+static bool pg_filter_prepare(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
 
 void
 _PG_init(void)
@@ -88,6 +104,10 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+	cb->filter_prepare_cb = pg_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
 }
 
 
@@ -107,6 +127,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->twophase_decoding = false;
+	data->twophase_decode_with_catalog_changes = false;
 
 	ctx->output_plugin_private = data;
 
@@ -176,6 +198,27 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "twophase-decoding") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->twophase_decoding = true;
+			else if (!parse_bool(strVal(elem->arg), &data->twophase_decoding))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
+		else if (strcmp(elem->defname, "twophase-decode-with-catalog-changes") == 0)
+		{
+			if (elem->arg == NULL)
+				data->twophase_decode_with_catalog_changes = true;
+			else if (!parse_bool(strVal(elem->arg), &data->twophase_decode_with_catalog_changes))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -244,6 +287,164 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+/* Filter out unnecessary two-phase transactions */
+static bool
+pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					char *gid)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* treat all transaction as one-phase */
+	if (!data->twophase_decoding)
+		return true;
+
+	/*
+	 * Two-phase transactions that accessed catalog require special
+	 * treatment.
+	 *
+	 * Right now we don't have a safe way to decode catalog changes made in
+	 * prepared transaction that was already aborted by the time of
+	 * decoding.
+	 *
+	 * That kind of problem arises only when we are trying to
+	 * retrospectively decode aborted transactions with catalog changes -
+	 * including if a transaction aborts while we're decoding it. If one
+	 * wants to code distributed commit based on prepare decoding then
+	 * commits/aborts will happend strictly after decoding will be
+	 * completed, so it is possible to skip any checks/locks here.
+	 *
+	 * We'll also get stuck trying to acquire locks on catalog relations
+	 * we need for decoding if the prepared xact holds a strong lock on
+	 * one of them and we also need to decode row changes.
+	 */
+	if (!txn->has_catalog_changes)
+		return false;
+
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+	if (TransactionIdIsInProgress(txn->xid))
+	{
+		/*
+		 * For the sake of simplicity, by default we just
+		 * ignore in-progess prepared transactions with catalog
+		 * changes in this extension. If they abort during
+		 * decoding then tuples we need to decode them may be
+		 * overwritten while we're still decoding, causing
+		 * wrong catalog lookups.
+		 *
+		 * It is possible to move that LWLockRelease() to
+		 * pg_decode_prepare_txn() and allow decoding of
+		 * running prepared tx, but such lock will prevent any
+		 * 2pc transaction commit during decoding time.  That
+		 * can be a long time in case of lots of
+		 * changes/inserts in that tx or if the downstream is
+		 * slow/unresonsive.
+		 *
+		 * (Continuing to decode without the lock is unsafe, XXX)
+		 */
+		LWLockRelease(TwoPhaseStateLock);
+		return !data->twophase_decode_with_catalog_changes;
+	}
+	else if (TransactionIdDidAbort(txn->xid))
+	{
+		/*
+		 * Here we know that it is already aborted and there is
+		 * not much sense in doing something with this
+		 * transaction.  Consequently ABORT PREPARED will be
+		 * suppressed.
+		 */
+		LWLockRelease(TwoPhaseStateLock);
+		return true;
+	}
+	LWLockRelease(TwoPhaseStateLock);
+	/*
+	 * XXX: Transaction is not in progress, so buf->origptr lags behind
+	 * ctx->snapshot_builder.start_decoding_at.
+	 * If we did decode it (ie didn't filtered it above), we should not filter
+	 * it out here either, so it will be passed to ReorderBufferForget in
+	 * DecodePrepare.
+	 * If we had to skip it previously, we have to skip it now either, so
+	 * whole transaction will be decoded as simple non-2phase transaction.
+	 */
+	return !data->twophase_decode_with_catalog_changes;
+}
+
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes && !data->twophase_decoding)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes && !data->twophase_decoding)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ABORT PREPARED %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
 static bool
 pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id)
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 3aafa79e52..1a4487d404 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 		parsed->twophase_xid = xl_twophase->xid;
 
 		data += sizeof(xl_xact_twophase);
+
+		if (parsed->xinfo & XACT_XINFO_HAS_GID)
+		{
+			int gidlen;
+			strcpy(parsed->twophase_gid, data);
+			gidlen = strlen(parsed->twophase_gid) + 1;
+			data += MAXALIGN(gidlen);
+		}
 	}
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		data += sizeof(xl_xact_xinfo);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+	{
+		xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+		parsed->dbId = xl_dbinfo->dbId;
+		parsed->tsId = xl_dbinfo->tsId;
+
+		data += sizeof(xl_xact_dbinfo);
+	}
+
 	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		parsed->twophase_xid = xl_twophase->xid;
 
 		data += sizeof(xl_xact_twophase);
+
+		if (parsed->xinfo & XACT_XINFO_HAS_GID)
+		{
+			int gidlen;
+			strcpy(parsed->twophase_gid, data);
+			gidlen = strlen(parsed->twophase_gid) + 1;
+			data += MAXALIGN(gidlen);
+		}
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		xl_xact_origin xl_origin;
+
+		/* we're only guaranteed 4 byte alignment, so copy onto stack */
+		memcpy(&xl_origin, data, sizeof(xl_origin));
+
+		parsed->origin_lsn = xl_origin.origin_lsn;
+		parsed->origin_timestamp = xl_origin.origin_timestamp;
+
+		data += sizeof(xl_xact_origin);
 	}
 }
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index b715152e8d..792408c94d 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -144,11 +144,7 @@ int			max_prepared_xacts = 0;
  *
  * typedef struct GlobalTransactionData *GlobalTransaction appears in
  * twophase.h
- *
- * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
- * specified in TwoPhaseFileHeader.
  */
-#define GIDSIZE 200
 
 typedef struct GlobalTransactionData
 {
@@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval);
+								bool initfileinval,
+								const char *gid);
 static void RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels);
+							   RelFileNode *rels,
+							   const char *gid);
 static void ProcessRecords(char *bufptr, TransactionId xid,
 			   const TwoPhaseCallback callbacks[]);
 static void RemoveGXact(GlobalTransaction gxact);
@@ -556,7 +554,7 @@ MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
  *		Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
  */
 static GlobalTransaction
-LockGXact(const char *gid, Oid user)
+LockGXact(const char *gid, Oid user, bool missing_ok)
 {
 	int			i;
 
@@ -616,7 +614,8 @@ LockGXact(const char *gid, Oid user)
 
 	LWLockRelease(TwoPhaseStateLock);
 
-	ereport(ERROR,
+	if (!missing_ok)
+		ereport(ERROR,
 			(errcode(ERRCODE_UNDEFINED_OBJECT),
 			 errmsg("prepared transaction with identifier \"%s\" does not exist",
 					gid)));
@@ -898,7 +897,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
 /*
  * Header for a 2PC state file
  */
-#define TWOPHASE_MAGIC	0x57F94533	/* format identifier */
+#define TWOPHASE_MAGIC	0x57F94534	/* format identifier */
 
 typedef struct TwoPhaseFileHeader
 {
@@ -914,6 +913,8 @@ typedef struct TwoPhaseFileHeader
 	int32		ninvalmsgs;		/* number of cache invalidation messages */
 	bool		initfileinval;	/* does relcache init file need invalidation? */
 	uint16		gidlen;			/* length of the GID - GID follows the header */
+	XLogRecPtr	origin_lsn;		/* lsn of this record at origin node */
+	TimestampTz origin_timestamp; /* time of prepare at origin node */
 } TwoPhaseFileHeader;
 
 /*
@@ -1065,6 +1066,7 @@ EndPrepare(GlobalTransaction gxact)
 {
 	TwoPhaseFileHeader *hdr;
 	StateFileChunk *record;
+	bool			replorigin;
 
 	/* Add the end sentinel to the list of 2PC records */
 	RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1075,6 +1077,21 @@ EndPrepare(GlobalTransaction gxact)
 	Assert(hdr->magic == TWOPHASE_MAGIC);
 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
+	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+				  replorigin_session_origin != DoNotReplicateId);
+
+	if (replorigin)
+	{
+		Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+		hdr->origin_lsn = replorigin_session_origin_lsn;
+		hdr->origin_timestamp = replorigin_session_origin_timestamp;
+	}
+	else
+	{
+		hdr->origin_lsn = InvalidXLogRecPtr;
+		hdr->origin_timestamp = 0;
+	}
+
 	/*
 	 * If the data size exceeds MaxAllocSize, we won't be able to read it in
 	 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1107,7 +1124,16 @@ EndPrepare(GlobalTransaction gxact)
 	XLogBeginInsert();
 	for (record = records.head; record != NULL; record = record->next)
 		XLogRegisterData(record->data, record->len);
+
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 	gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+	if (replorigin)
+		/* Move LSNs forward for this replication origin */
+		replorigin_session_advance(replorigin_session_origin_lsn,
+								   gxact->prepare_end_lsn);
+
 	XLogFlush(gxact->prepare_end_lsn);
 
 	/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1283,6 +1309,43 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 	return buf;
 }
 
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+	TwoPhaseFileHeader *hdr;
+	char *bufptr;
+
+	hdr = (TwoPhaseFileHeader *) xlrec;
+	bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+	parsed->origin_lsn = hdr->origin_lsn;
+	parsed->origin_timestamp = hdr->origin_timestamp;
+	parsed->twophase_xid = hdr->xid;
+	parsed->dbId = hdr->database;
+	parsed->nsubxacts = hdr->nsubxacts;
+	parsed->ncommitrels = hdr->ncommitrels;
+	parsed->nabortrels = hdr->nabortrels;
+	parsed->nmsgs = hdr->ninvalmsgs;
+
+	strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+	bufptr += MAXALIGN(hdr->gidlen);
+
+	parsed->subxacts = (TransactionId *) bufptr;
+	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+	parsed->commitrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+	parsed->abortrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+	parsed->msgs = (SharedInvalidationMessage *) bufptr;
+	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
 
 /*
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1365,7 +1428,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
  * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
  */
 void
-FinishPreparedTransaction(const char *gid, bool isCommit)
+FinishPreparedTransaction(const char *gid, bool isCommit, bool missing_ok)
 {
 	GlobalTransaction gxact;
 	PGPROC	   *proc;
@@ -1386,8 +1449,20 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 	/*
 	 * Validate the GID, and lock the GXACT to ensure that two backends do not
 	 * try to commit the same GID at once.
+	 *
+	 * During logical decoding, on the apply side, it's possible that a prepared
+	 * transaction got aborted while decoding. In that case, we stop the
+	 * decoding and abort the transaction immediately. However the ROLLBACK
+	 * prepared processing still reaches the subscriber. In that case it's ok
+	 * to have a missing gid
 	 */
-	gxact = LockGXact(gid, GetUserId());
+	gxact = LockGXact(gid, GetUserId(), missing_ok);
+	if (gxact == NULL)
+	{
+		Assert(isCommit && missing_ok);
+		return;
+	}
+
 	proc = &ProcGlobal->allProcs[gxact->pgprocno];
 	pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
 	xid = pgxact->xid;
@@ -1435,11 +1510,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ninvalmsgs, invalmsgs,
-										hdr->initfileinval);
+										hdr->initfileinval, gid);
 	else
 		RecordTransactionAbortPrepared(xid,
 									   hdr->nsubxacts, children,
-									   hdr->nabortrels, abortrels);
+									   hdr->nabortrels, abortrels,
+									   gid);
 
 	ProcArrayRemove(proc, latestXid);
 
@@ -2165,7 +2241,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval)
+								bool initfileinval,
+								const char *gid)
 {
 	XLogRecPtr	recptr;
 	TimestampTz committs = GetCurrentTimestamp();
@@ -2193,7 +2270,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								 ninvalmsgs, invalmsgs,
 								 initfileinval, false,
 								 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								 xid);
+								 xid, gid);
 
 
 	if (replorigin)
@@ -2255,7 +2332,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels)
+							   RelFileNode *rels,
+							   const char *gid)
 {
 	XLogRecPtr	recptr;
 
@@ -2278,7 +2356,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 								nchildren, children,
 								nrels, rels,
 								MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								xid);
+								xid, gid);
 
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c06fabca10..d751267b51 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1226,7 +1226,7 @@ RecordTransactionCommit(void)
 							nmsgs, invalMessages,
 							RelcacheInitFileInval, forceSyncCommit,
 							MyXactFlags,
-							InvalidTransactionId /* plain commit */ );
+							InvalidTransactionId, NULL /* plain commit */ );
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
@@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact)
 	XactLogAbortRecord(xact_time,
 					   nchildren, children,
 					   nrels, rels,
-					   MyXactFlags, InvalidTransactionId);
+					   MyXactFlags, InvalidTransactionId,
+					   NULL);
 
 	/*
 	 * Report the latest async abort LSN, so that the WAL writer knows to
@@ -5243,7 +5244,6 @@ xactGetCommittedChildren(TransactionId **ptr)
  *	XLOG support routines
  */
 
-
 /*
  * Log the commit record for a plain or twophase transaction commit.
  *
@@ -5256,7 +5256,8 @@ XactLogCommitRecord(TimestampTz commit_time,
 					int nrels, RelFileNode *rels,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
-					int xactflags, TransactionId twophase_xid)
+					int xactflags, TransactionId twophase_xid,
+					const char *twophase_gid)
 {
 	xl_xact_commit xlrec;
 	xl_xact_xinfo xl_xinfo;
@@ -5268,6 +5269,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_origin xl_origin;
 
 	uint8		info;
+	int			gidlen = 0;
 
 	Assert(CritSectionCount > 0);
 
@@ -5330,6 +5332,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		Assert(twophase_gid != NULL);
+
+		if (XLogLogicalInfoActive())
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+		}
 	}
 
 	/* dump transaction origin information */
@@ -5380,8 +5389,19 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
 
+		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+		{
+			static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+			XLogRegisterData((char*) twophase_gid, gidlen);
+			if (MAXALIGN(gidlen) != gidlen)
+				XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+		}
+	}
+
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
@@ -5401,15 +5421,19 @@ XLogRecPtr
 XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   int xactflags, TransactionId twophase_xid)
+				   int xactflags, TransactionId twophase_xid,
+				   const char *twophase_gid)
 {
 	xl_xact_abort xlrec;
 	xl_xact_xinfo xl_xinfo;
 	xl_xact_subxacts xl_subxacts;
 	xl_xact_relfilenodes xl_relfilenodes;
 	xl_xact_twophase xl_twophase;
+	xl_xact_dbinfo xl_dbinfo;
+	xl_xact_origin xl_origin;
 
 	uint8		info;
+	int			gidlen = 0;
 
 	Assert(CritSectionCount > 0);
 
@@ -5445,6 +5469,31 @@ XactLogAbortRecord(TimestampTz abort_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		Assert(twophase_gid != NULL);
+
+		if (XLogLogicalInfoActive())
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+		}
+	}
+
+	if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+		xl_dbinfo.dbId = MyDatabaseId;
+		xl_dbinfo.tsId = MyDatabaseTableSpace;
+	}
+
+	/* dump transaction origin information only for abort prepared */
+	if ( (replorigin_session_origin != InvalidRepOriginId) &&
+			TransactionIdIsValid(twophase_xid) &&
+			XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+		xl_origin.origin_lsn = replorigin_session_origin_lsn;
+		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -5459,6 +5508,9 @@ XactLogAbortRecord(TimestampTz abort_time,
 	if (xl_xinfo.xinfo != 0)
 		XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		XLogRegisterData((char *) (&xl_subxacts),
@@ -5476,7 +5528,23 @@ XactLogAbortRecord(TimestampTz abort_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+		{
+			static const char zeroes[MAXIMUM_ALIGNOF] = { 0 };
+			XLogRegisterData((char*) twophase_gid, gidlen);
+			if (MAXALIGN(gidlen) != gidlen)
+				XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen);
+		}
+	}
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+	if (TransactionIdIsValid(twophase_xid))
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 
 	return XLogInsert(RM_XACT_ID, info);
 }
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 486fd0c988..c9231f4973 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/twophase.h"
 
 #include "catalog/pg_control.h"
 
@@ -72,6 +73,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -277,16 +280,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_PREPARE:
+			{
+				xl_xact_parsed_prepare parsed;
 
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+				/* check that output plugin is capable of twophase decoding */
+				if (!ctx->enable_twophase)
+				{
+					ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+					break;
+				}
+
+				/* ok, parse it */
+				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+								   XLogRecGetData(buf->record), &parsed);
+
+				/* does output plugin want this particular transaction? */
+				if (ctx->callbacks.filter_prepare_cb &&
+					ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
+												 parsed.twophase_gid))
+				{
+					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+											buf->origptr);
+					break;
+				}
+
+				DecodePrepare(ctx, buf, &parsed);
+				break;
+			}
 			break;
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
@@ -551,8 +571,14 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * Process invalidation messages, even if we're not interested in the
 	 * transaction's contents, since the various caches need to always be
 	 * consistent.
+	 *
+	 * Also if that transaction was sent to prepare callback then both
+	 * these function have already been called during prepare.
 	 */
-	if (parsed->nmsgs > 0)
+	if (parsed->nmsgs > 0 &&
+			!(TransactionIdIsValid(parsed->twophase_xid) &&
+				ReorderBufferTxnIsPrepared(ctx->reorder, xid,
+										   parsed->twophase_gid)))
 	{
 		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
 									  parsed->nmsgs, parsed->msgs);
@@ -607,9 +633,69 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	if (TransactionIdIsValid(parsed->twophase_xid) &&
+		ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid))
+	{
+		/* we are processing COMMIT PREPARED */
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+					commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
+	}
+	else
+	{
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+/*
+ * Decode PREPARE record. Similar logic as in COMMIT
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed)
+{
+	XLogRecPtr	origin_lsn = parsed->origin_lsn;
+	TimestampTz	commit_time = parsed->origin_timestamp;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	int			i;
+	TransactionId xid = parsed->twophase_xid;
+
+	/*
+	 * Process invalidation messages, even if we're not interested in the
+	 * transaction's contents, since the various caches need to always be
+	 * consistent.
+	 */
+	if (parsed->nmsgs > 0)
+	{
+		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+									  parsed->nmsgs, parsed->msgs);
+		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+	}
+
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		FilterByOrigin(ctx, origin_id))
+	{
+		for (i = 0; i < parsed->nsubxacts; i++)
+		{
+			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+		}
+		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+		return;
+	}
+
+	/* tell the reorderbuffer about the surviving subtransactions */
+	for (i = 0; i < parsed->nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+								 buf->origptr, buf->endptr);
+	}
+
 	/* replay actions of all transaction + subtransactions in order */
-	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time, origin_id, origin_lsn);
+	ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
+						 commit_time, origin_id, origin_lsn, parsed->twophase_gid);
 }
 
 /*
@@ -621,6 +707,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz	commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
+	}
+
+	/*
+	 * If it's ROLLBACK PREPARED then handle it via callbacks.
+	 */
+	if (TransactionIdIsValid(xid) &&
+			!SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
+			parsed->dbId == ctx->slot->data.database &&
+			!FilterByOrigin(ctx, origin_id) &&
+			ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid))
+	{
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+						commit_time, origin_id, origin_lsn,
+						parsed->twophase_gid, false);
+		return;
+	}
 
 	for (i = 0; i < parsed->nsubxacts; i++)
 	{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index bca585fc27..ff23002cf3 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -60,6 +60,18 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  XLogRecPtr commit_lsn);
+static void abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
+static bool filter_decode_txn_cb_wrapper(ReorderBuffer *cache,
+				  ReorderBufferTXN *txn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  const char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn);
+static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change);
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -124,6 +136,7 @@ StartupDecodingContext(List *output_plugin_options,
 	MemoryContext context,
 				old_context;
 	LogicalDecodingContext *ctx;
+	int			twophase_callbacks;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -182,8 +195,27 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->abort = abort_cb_wrapper;
+	ctx->reorder->filter_decode_txn = filter_decode_txn_cb_wrapper;
+	ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+	ctx->reorder->prepare = prepare_cb_wrapper;
+	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+	ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
+	/* check that plugin implements all callbacks necessary to perform 2PC */
+	twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) +
+		(ctx->callbacks.commit_prepared_cb != NULL) +
+		(ctx->callbacks.abort_prepared_cb != NULL);
+
+	ctx->enable_twophase = (twophase_callbacks == 3);
+
+	if (twophase_callbacks != 3 && twophase_callbacks != 0)
+		ereport(WARNING,
+			(errmsg("Output plugin registered only %d twophase callbacks. "
+					"Twophase transactions will be decoded at commit time.",
+					twophase_callbacks)));
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -680,6 +712,122 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort";
+	state.report_location = txn->final_lsn; /* beginning of abort record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+ static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "prepare";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "commit_prepared";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort_prepared";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
 {
@@ -714,6 +862,61 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static bool
+filter_decode_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_decode_txn";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_decode_txn_cb(ctx, txn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+	return ret;
+}
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, const char *gid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_prepare";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_prepare_cb(ctx, txn, gid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+	return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 9b126b2957..77b9f58ae2 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -72,10 +72,11 @@ void
 logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 						XLogRecPtr commit_lsn)
 {
-	uint8		flags = 0;
+	uint8	flags = 0;
 
 	pq_sendbyte(out, 'C');		/* sending COMMIT */
 
+	flags |= LOGICALREP_IS_COMMIT;
 	/* send the flags field (unused for now) */
 	pq_sendbyte(out, flags);
 
@@ -86,21 +87,106 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 }
 
 /*
- * Read transaction COMMIT from the stream.
+ * Write ABORT to the output stream.
+ */
+void
+logicalrep_write_abort(StringInfo out, ReorderBufferTXN *txn,
+						XLogRecPtr abort_lsn)
+{
+	uint8	flags = 0;
+
+	pq_sendbyte(out, 'C');		/* sending ABORT flag below */
+
+	flags |= LOGICALREP_IS_ABORT;
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fields */
+	pq_sendint64(out, abort_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Read transaction COMMIT|ABORT from the stream.
  */
 void
-logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
+logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data,
+					   uint8 *flags)
 {
-	/* read flags (unused for now) */
-	uint8		flags = pq_getmsgbyte(in);
+	/* read flags */
+	uint8		commit_flags = pq_getmsgbyte(in);
 
-	if (flags != 0)
-		elog(ERROR, "unrecognized flags %u in commit message", flags);
+	if (!(commit_flags & LOGICALREP_COMMIT_MASK))
+		elog(ERROR, "unrecognized flags %u in commit|abort message",
+			 commit_flags);
 
 	/* read fields */
 	commit_data->commit_lsn = pq_getmsgint64(in);
 	commit_data->end_lsn = pq_getmsgint64(in);
 	commit_data->committime = pq_getmsgint64(in);
+
+	/* set gid to empty */
+	commit_data->gid[0] = '\0';
+
+	*flags = commit_flags;
+}
+
+/*
+ * Write PREPARE to the output stream.
+ */
+void
+logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+						XLogRecPtr prepare_lsn)
+{
+	uint8		flags = 0;
+
+	pq_sendbyte(out, 'P');		/* sending PREPARE protocol */
+
+	if (txn->txn_flags & TXN_COMMIT_PREPARED)
+		flags |= LOGICALREP_IS_COMMIT_PREPARED;
+	else if (txn->txn_flags & TXN_ROLLBACK_PREPARED)
+		flags |= LOGICALREP_IS_ROLLBACK_PREPARED;
+	else if (txn->txn_flags & TXN_PREPARE)
+		flags |= LOGICALREP_IS_PREPARE;
+
+	if (flags == 0)
+		elog(ERROR, "unrecognized flags %u in [commit|rollback] prepare message", flags);
+
+	/* send the flags field */
+	pq_sendbyte(out, flags);
+
+	/* send fields */
+	pq_sendint64(out, prepare_lsn);
+	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, txn->commit_time);
+
+	/* send gid */
+	pq_sendstring(out, txn->gid);
+}
+
+/*
+ * Read transaction PREPARE from the stream.
+ */
+void
+logicalrep_read_prepare(StringInfo in, LogicalRepCommitData *commit_data, uint8 *flags)
+{
+	/* read flags */
+	uint8		prep_flags = pq_getmsgbyte(in);
+
+	if (!(prep_flags & LOGICALREP_PREPARE_MASK))
+		elog(ERROR, "unrecognized flags %u in prepare message", prep_flags);
+
+	/* read fields */
+	commit_data->commit_lsn = pq_getmsgint64(in);
+	commit_data->end_lsn = pq_getmsgint64(in);
+	commit_data->committime = pq_getmsgint64(in);
+
+	/* read gid */
+	strcpy(commit_data->gid, pq_getmsgstring(in));
+
+	/* set flags */
+	*flags = prep_flags;
 }
 
 /*
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index dc0ad5b0e7..91b2a76fa7 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1261,25 +1261,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
  * the top and subtransactions (using a k-way merge) and replay the changes in
  * lsn order.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferCommitInternal(ReorderBufferTXN *txn,
+					ReorderBuffer *rb, TransactionId xid,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 					TimestampTz commit_time,
 					RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	ReorderBufferTXN *txn;
 	volatile Snapshot snapshot_now;
 	volatile CommandId command_id = FirstCommandId;
 	bool		using_subtxn;
 	ReorderBufferIterTXNState *volatile iterstate = NULL;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-
-	/* unknown transaction, nothing to replay */
-	if (txn == NULL)
-		return;
-
 	txn->final_lsn = commit_lsn;
 	txn->end_lsn = end_lsn;
 	txn->commit_time = commit_time;
@@ -1323,6 +1316,17 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	{
 		ReorderBufferChange *change;
 		ReorderBufferChange *specinsert = NULL;
+		bool				 change_cleanup = false;
+		bool				 check_txn_status;
+
+		/*
+		 * check for the xid once to see if it's already
+		 * committed. Otherwise we need to consult the
+		 * decode_txn filter function to enquire if it's
+		 * still ok for us to continue to decode this xid
+		 */
+		check_txn_status = TransactionIdDidCommit(txn->xid)?
+								false : true;
 
 		if (using_subtxn)
 			BeginInternalSubTransaction("replay");
@@ -1337,6 +1341,20 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 			Relation	relation = NULL;
 			Oid			reloid;
 
+			/*
+			 * While decoding 2PC or while streaming uncommitted
+			 * transactions, check if this transaction needs to
+			 * be still decoded. If the transaction got aborted
+			 * or if we were instructed to stop decoding, then
+			 * bail out early.
+			 */
+			if (check_txn_status && rb->filter_decode_txn(rb, txn))
+			{
+				elog(LOG, "stopping decoding of (%u)", txn->xid);
+				change_cleanup = true;
+				goto change_cleanuptxn;
+			}
+
 			switch (change->action)
 			{
 				case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -1543,6 +1561,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 			}
 		}
 
+change_cleanuptxn:
 		/*
 		 * There's a speculative insertion remaining, just clean in up, it
 		 * can't have been successful, otherwise we'd gotten a confirmation
@@ -1558,8 +1577,19 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		if (change_cleanup)
+		{
+			/* call abort */
+			rb->abort(rb, txn, commit_lsn);
+		}
+		else
+		{
+			/* call commit or prepare callback */
+			if (txn_prepared(txn))
+				rb->prepare(rb, txn, commit_lsn);
+			else
+				rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -1586,7 +1616,14 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		if (snapshot_now->copied)
 			ReorderBufferFreeSnap(rb, snapshot_now);
 
-		/* remove potential on-disk data, and deallocate */
+		/*
+		 * remove potential on-disk data, and deallocate.
+		 *
+		 * We cleanup even for 2PC transactions, this is
+		 * because the commit prepared might be some time
+		 * away. Also that does not need this data to be
+		 * around for processing anyways.
+		 */
 		ReorderBufferCleanupTXN(rb, txn);
 	}
 	PG_CATCH();
@@ -1621,6 +1658,137 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as a regular commit later.
+ */
+bool
+ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+	return rb->filter_prepare(rb, txn, gid);
+}
+
+
+/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferCommitInternal()
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+								commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Prepare a twophase transaction. It calls ReorderBufferCommitInternal()
+ * since all prepared transactions need to be decoded at PREPARE time.
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	txn->txn_flags |= TXN_PREPARE;
+	strcpy(txn->gid, gid);
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+								commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Check whether this transaction was sent as prepared to subscribers.
+ * Called while handling commit|abort prepared.
+ */
+bool
+ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+						   const char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/*
+	 * If txn == NULL then employ the callback to see if this txn
+	 * was send at PREPARE time. The callback should return the same
+	 * answer for a given GID, everytime we call it.
+	 */
+	if (txn == NULL)
+		return !(rb->filter_prepare(rb, NULL, gid));
+	else
+		return txn_prepared(txn);
+}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid, bool is_commit)
+{
+	ReorderBufferTXN *txn;
+
+	/*
+	 * The transaction may or may not exist (during restarts for
+	 * example). Anyways, 2PC transactions do not contain any
+	 * reorderbuffers. So allow it to be created below.
+	 */
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+								true);
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+	strcpy(txn->gid, gid);
+
+	if (is_commit)
+	{
+		txn->txn_flags |= TXN_COMMIT_PREPARED;
+		rb->commit_prepared(rb, txn, commit_lsn);
+	}
+	else
+	{
+		txn->txn_flags |= TXN_ROLLBACK_PREPARED;
+		rb->abort_prepared(rb, txn, commit_lsn);
+	}
+
+	ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fa5d9bb120..71ff19ded5 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -452,8 +452,9 @@ static void
 apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
+	uint8	flags = 0;
 
-	logicalrep_read_commit(s, &commit_data);
+	logicalrep_read_commit(s, &commit_data, &flags);
 
 	Assert(commit_data.commit_lsn == remote_final_lsn);
 
@@ -467,7 +468,11 @@ apply_handle_commit(StringInfo s)
 		replorigin_session_origin_lsn = commit_data.end_lsn;
 		replorigin_session_origin_timestamp = commit_data.committime;
 
-		CommitTransactionCommand();
+		if (flags & LOGICALREP_IS_COMMIT)
+			CommitTransactionCommand();
+		else if (flags & LOGICALREP_IS_ABORT)
+			AbortCurrentTransaction();
+
 		pgstat_report_stat(false);
 
 		store_flush_position(commit_data.end_lsn);
@@ -487,6 +492,121 @@ apply_handle_commit(StringInfo s)
 	pgstat_report_activity(STATE_IDLE, NULL);
 }
 
+static void
+apply_handle_prepare_txn(LogicalRepCommitData *commit_data)
+{
+	Assert(commit_data->commit_lsn == remote_final_lsn);
+	/* The synchronization worker runs in single transaction. */
+	if (IsTransactionState() && !am_tablesync_worker())
+	{
+		/* End the earlier transaction and start a new one */
+		BeginTransactionBlock();
+		CommitTransactionCommand();
+		StartTransactionCommand();
+		/*
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
+		 */
+		replorigin_session_origin_lsn = commit_data->end_lsn;
+		replorigin_session_origin_timestamp = commit_data->committime;
+
+		PrepareTransactionBlock(commit_data->gid);
+		CommitTransactionCommand();
+		pgstat_report_stat(false);
+
+		store_flush_position(commit_data->end_lsn);
+	}
+	else
+	{
+		/* Process any invalidation messages that might have accumulated. */
+		AcceptInvalidationMessages();
+		/* TODO: what to do here for prepared transactions?? */
+		Assert(false);
+	}
+
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(commit_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+static void
+apply_handle_commit_prepared_txn(LogicalRepCommitData *commit_data)
+{
+	/* there is no transaction when COMMIT PREPARED is called */
+	ensure_transaction();
+
+	/*
+	 * Update origin state so we can restart streaming from correct
+	 * position in case of crash.
+	 */
+	replorigin_session_origin_lsn = commit_data->end_lsn;
+	replorigin_session_origin_timestamp = commit_data->committime;
+
+	FinishPreparedTransaction(commit_data->gid, true, false);
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
+
+	store_flush_position(commit_data->end_lsn);
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(commit_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+static void
+apply_handle_rollback_prepared_txn(LogicalRepCommitData *commit_data)
+{
+	/* there is no transaction when ABORT/ROLLBACK PREPARED is called */
+	ensure_transaction();
+
+	/*
+	 * Update origin state so we can restart streaming from correct
+	 * position in case of crash.
+	 */
+	replorigin_session_origin_lsn = commit_data->end_lsn;
+	replorigin_session_origin_timestamp = commit_data->committime;
+
+	FinishPreparedTransaction(commit_data->gid, false, true);
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
+
+	store_flush_position(commit_data->end_lsn);
+	in_remote_transaction = false;
+
+	/* Process any tables that are being synchronized in parallel. */
+	process_syncing_tables(commit_data->end_lsn);
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle PREPARE message.
+ */
+static void
+apply_handle_prepare(StringInfo s)
+{
+	LogicalRepCommitData commit_data;
+	uint8	flags = 0;
+
+	logicalrep_read_prepare(s, &commit_data, &flags);
+
+	if (flags & LOGICALREP_IS_PREPARE)
+		apply_handle_prepare_txn(&commit_data);
+	else if (flags & LOGICALREP_IS_COMMIT_PREPARED)
+		apply_handle_commit_prepared_txn(&commit_data);
+	else if (flags & LOGICALREP_IS_ROLLBACK_PREPARED)
+		apply_handle_rollback_prepared_txn(&commit_data);
+	else
+		ereport(ERROR,
+			(errcode(ERRCODE_PROTOCOL_VIOLATION),
+			 errmsg("wrong [commit|rollback] prepare message")));
+}
+
 /*
  * Handle ORIGIN message.
  *
@@ -884,10 +1004,14 @@ apply_dispatch(StringInfo s)
 		case 'B':
 			apply_handle_begin(s);
 			break;
-			/* COMMIT */
+			/* COMMIT|ABORT */
 		case 'C':
 			apply_handle_commit(s);
 			break;
+			/* [COMMIT|ROLLBACK] PREPARE */
+		case 'P':
+			apply_handle_prepare(s);
+			break;
 			/* INSERT */
 		case 'I':
 			apply_handle_insert(s);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index c3126545b4..4bbad5b21d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -37,11 +37,23 @@ static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
 				   ReorderBufferTXN *txn);
 static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
 					ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pgoutput_abort_txn(LogicalDecodingContext *ctx,
+					ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
 static void pgoutput_change(LogicalDecodingContext *ctx,
 				ReorderBufferTXN *txn, Relation rel,
 				ReorderBufferChange *change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 					   RepOriginId origin_id);
+static bool pgoutput_filter_prepare(LogicalDecodingContext *ctx,
+				ReorderBufferTXN *txn, const char *gid);
+static bool pgoutput_decode_txn_filter(LogicalDecodingContext *ctx,
+					   ReorderBufferTXN *txn);
+static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+				ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
+				ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx,
+				ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 
 static bool publications_valid;
 
@@ -79,7 +91,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pgoutput_begin_txn;
 	cb->change_cb = pgoutput_change;
 	cb->commit_cb = pgoutput_commit_txn;
+	cb->abort_cb = pgoutput_abort_txn;
+
+	cb->filter_prepare_cb = pgoutput_filter_prepare;
+	cb->prepare_cb = pgoutput_prepare_txn;
+	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
+	cb->abort_prepared_cb = pgoutput_abort_prepared_txn;
+
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_decode_txn_cb = pgoutput_decode_txn_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 }
 
@@ -254,6 +274,61 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
+ * ABORT callback
+ */
+static void
+pgoutput_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr abort_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_abort(ctx->out, txn, abort_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * COMMIT PREPARED callback
+ */
+static void
+pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+/*
+ * PREPARE callback
+ */
+static void
+pgoutput_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	OutputPluginUpdateProgress(ctx);
+
+	OutputPluginPrepareWrite(ctx, true);
+	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
+	OutputPluginWrite(ctx, true);
+}
+
+/*
  * Sends the decoded DML over wire.
  */
 static void
@@ -364,6 +439,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
+ * Filter out unnecessary two-phase transactions.
+ *
+ * Currently, we forward all two-phase transactions
+ */
+static bool
+pgoutput_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+								const char *gid)
+{
+	return false;
+}
+
+/*
  * Currently we always forward.
  */
 static bool
@@ -374,6 +461,37 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 }
 
 /*
+ * Check if we should continue to decode this transaction.
+ *
+ * If it has aborted in the meanwhile, then there's no sense
+ * in decoding and sending the rest of the changes, we might
+ * as well ask the subscribers to abort immediately.
+ *
+ * This should be called if we are streaming a transaction
+ * before it's committed or if we are decoding a 2PC
+ * transaction. Otherwise we always decode committed
+ * transactions
+ *
+ * Additional checks can be added here, as needed
+ */
+static bool
+pgoutput_decode_txn_filter(LogicalDecodingContext *ctx,
+					   ReorderBufferTXN *txn)
+{
+	/*
+	 * Due to caching, repeated TransactionIdDidAbort calls
+	 * shouldn't be that expensive
+	 */
+	if (txn != NULL &&
+			TransactionIdIsValid(txn->xid) &&
+			TransactionIdDidAbort(txn->xid))
+			return true;
+
+	/* if txn is NULL, filter it out :-) */
+	return (txn != NULL)? false:true;
+}
+
+/*
  * Shutdown the output plugin.
  *
  * Note, we don't need to clean the data->context as it's child context
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 82a707af7b..792cd9c868 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -454,13 +454,13 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 					case TRANS_STMT_COMMIT_PREPARED:
 						PreventTransactionChain(isTopLevel, "COMMIT PREPARED");
 						PreventCommandDuringRecovery("COMMIT PREPARED");
-						FinishPreparedTransaction(stmt->gid, true);
+						FinishPreparedTransaction(stmt->gid, true, false);
 						break;
 
 					case TRANS_STMT_ROLLBACK_PREPARED:
 						PreventTransactionChain(isTopLevel, "ROLLBACK PREPARED");
 						PreventCommandDuringRecovery("ROLLBACK PREPARED");
-						FinishPreparedTransaction(stmt->gid, false);
+						FinishPreparedTransaction(stmt->gid, false, false);
 						break;
 
 					case TRANS_STMT_ROLLBACK:
diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c
index a821e2eed1..0db65bca1b 100644
--- a/src/backend/utils/time/tqual.c
+++ b/src/backend/utils/time/tqual.c
@@ -1252,6 +1252,19 @@ HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin,
 			 */
 			SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
 						InvalidTransactionId);
+
+			/*
+			 * Transaction aborted, but perhaps it was recent enough
+			 * that some open transactions could still see the tuple.
+			 * We restrict the scope of this check to activities on
+			 * catalog tables only, because logical decoding could be
+			 * peaking into such tuples for a short while
+			 */
+			if ((htup->t_tableOid < (Oid) FirstNormalObjectId) &&
+				!TransactionIdPrecedes(HeapTupleHeaderGetRawXmin(tuple),
+									   OldestXmin))
+				return HEAPTUPLE_RECENTLY_DEAD;
+
 			return HEAPTUPLE_DEAD;
 		}
 
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 54dec4eeaf..c552d38367 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
 #define TWOPHASE_H
 
 #include "access/xlogdefs.h"
+#include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
 
@@ -46,12 +47,15 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 							int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+							xl_xact_parsed_prepare *parsed);
 extern void StandbyRecoverPreparedTransactions(void);
 extern void RecoverPreparedTransactions(void);
 
 extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
 
-extern void FinishPreparedTransaction(const char *gid, bool isCommit);
+extern void FinishPreparedTransaction(const char *gid, bool isCommit,
+									  bool missing_ok);
 
 extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
 			   XLogRecPtr end_lsn);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 118b0a8432..118156ed78 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,13 @@
 #include "storage/sinval.h"
 #include "utils/datetime.h"
 
+/*
+ * Maximum size of Global Transaction ID (including '\0').
+ *
+ * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
+ * specified in TwoPhaseFileHeader.
+ */
+#define GIDSIZE 200
 
 /*
  * Xact isolation levels
@@ -156,6 +163,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_TWOPHASE			(1U << 4)
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
+#define XACT_XINFO_HAS_GID				(1U << 7)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -302,13 +310,40 @@ typedef struct xl_xact_parsed_commit
 	SharedInvalidationMessage *msgs;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
 } xl_xact_parsed_commit;
 
+typedef struct xl_xact_parsed_prepare
+{
+	Oid			dbId;			/* MyDatabaseId */
+
+	int			nsubxacts;
+	TransactionId *subxacts;
+
+	int			ncommitrels;
+	RelFileNode *commitrels;
+
+	int			nabortrels;
+	RelFileNode *abortrels;
+
+	int			nmsgs;
+	SharedInvalidationMessage *msgs;
+
+	TransactionId twophase_xid;
+	char 		twophase_gid[GIDSIZE];
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+} xl_xact_parsed_prepare;
+
 typedef struct xl_xact_parsed_abort
 {
+	Oid			dbId;
+	Oid			tsId;
+
 	TimestampTz xact_time;
 	uint32		xinfo;
 
@@ -319,6 +354,10 @@ typedef struct xl_xact_parsed_abort
 	RelFileNode *xnodes;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
 } xl_xact_parsed_abort;
 
 
@@ -386,12 +425,13 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
 					int xactflags,
-					TransactionId twophase_xid);
+					TransactionId twophase_xid, const char *twophase_gid);
 
 extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   int xactflags, TransactionId twophase_xid);
+				   int xactflags, TransactionId twophase_xid,
+				   const char *twophase_gid);
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7f0e0fa881..758de40db9 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -82,6 +82,11 @@ typedef struct LogicalDecodingContext
 	bool		prepared_write;
 	XLogRecPtr	write_location;
 	TransactionId write_xid;
+
+	/*
+	 * Capabilities of the output plugin.
+	 */
+	bool		enable_twophase;
 } LogicalDecodingContext;
 
 
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a9736e1bf6..7f51f75b97 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -69,11 +69,20 @@ typedef struct LogicalRepBeginData
 	TransactionId xid;
 } LogicalRepBeginData;
 
+#define LOGICALREP_IS_COMMIT			0x01
+#define LOGICALREP_IS_ABORT				0x02
+#define LOGICALREP_IS_PREPARE			0x04
+#define LOGICALREP_IS_COMMIT_PREPARED	0x08
+#define LOGICALREP_IS_ROLLBACK_PREPARED	0x10
+#define LOGICALREP_COMMIT_MASK (LOGICALREP_IS_COMMIT | LOGICALREP_IS_ABORT)
+#define LOGICALREP_PREPARE_MASK	(LOGICALREP_IS_PREPARE | LOGICALREP_IS_COMMIT_PREPARED | LOGICALREP_IS_ROLLBACK_PREPARED)
 typedef struct LogicalRepCommitData
 {
+	uint8		flag;
 	XLogRecPtr	commit_lsn;
 	XLogRecPtr	end_lsn;
 	TimestampTz committime;
+	char		gid[GIDSIZE];
 } LogicalRepCommitData;
 
 extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
@@ -81,8 +90,14 @@ extern void logicalrep_read_begin(StringInfo in,
 					  LogicalRepBeginData *begin_data);
 extern void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 						XLogRecPtr commit_lsn);
+extern void logicalrep_write_abort(StringInfo out, ReorderBufferTXN *txn,
+						XLogRecPtr abort_lsn);
+extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
+						XLogRecPtr prepare_lsn);
 extern void logicalrep_read_commit(StringInfo in,
-					   LogicalRepCommitData *commit_data);
+					   LogicalRepCommitData *commit_data, uint8 *flags);
+extern void logicalrep_read_prepare(StringInfo in,
+					   LogicalRepCommitData *commit_data, uint8 *flags);
 extern void logicalrep_write_origin(StringInfo out, const char *origin,
 						XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 26ff024882..c92ace3838 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -68,6 +68,45 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   XLogRecPtr commit_lsn);
 
 /*
+ * Called for an implicit ABORT of a transaction.
+ */
+typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
+									   ReorderBufferTXN *txn,
+									   XLogRecPtr abort_lsn);
+
+ /*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare
+ * and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+													  ReorderBufferTXN *txn,
+													  const char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr abort_lsn);
+
+/*
  * Called for the generic logical decoding messages.
  */
 typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
@@ -85,6 +124,12 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
 											   RepOriginId origin_id);
 
 /*
+ * Filter to check if we should continue to decode this transaction
+ */
+typedef bool (*LogicalDecodeFilterDecodeTxnCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn);
+
+/*
  * Called to shutdown an output plugin.
  */
 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
@@ -98,8 +143,14 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeAbortCB abort_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeFilterPrepareCB filter_prepare_cb;
+	LogicalDecodePrepareCB prepare_cb;
+	LogicalDecodeCommitPreparedCB commit_prepared_cb;
+	LogicalDecodeAbortPreparedCB abort_prepared_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterDecodeTxnCB filter_decode_txn_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b18ce5a9df..933f13a174 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -137,13 +138,28 @@ typedef struct ReorderBufferChange
 	dlist_node	node;
 } ReorderBufferChange;
 
+
+/* TODO: convert existing bools into flags later */
+/* values for txn_flags */
+#define TXN_HAS_CATALOG_CHANGES	0x0001
+#define TXN_IS_SUBXACT			0x0002
+#define TXN_PREPARE				0x0004
+#define TXN_COMMIT_PREPARED		0x0008
+#define TXN_ROLLBACK_PREPARED	0x0010
+#define txn_prepared(txn)		(txn->txn_flags & TXN_PREPARE)
+
 typedef struct ReorderBufferTXN
 {
+	int		txn_flags;
+
 	/*
 	 * The transactions transaction id, can be a toplevel or sub xid.
 	 */
 	TransactionId xid;
 
+	/* In case of 2PC we need to pass GID to output plugin */
+	char		gid[GIDSIZE];
+
 	/* did the TX have catalog changes */
 	bool		has_catalog_changes;
 
@@ -292,6 +308,39 @@ typedef void (*ReorderBufferCommitCB) (
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+/* abort callback signature */
+typedef void (*ReorderBufferAbortCB) (
+									   ReorderBuffer *rb,
+									   ReorderBufferTXN *txn,
+									   XLogRecPtr abort_lsn);
+
+typedef bool (*ReorderBufferFilterDecodeTxnCB) (
+											  ReorderBuffer *rb,
+											  ReorderBufferTXN *txn);
+
+typedef bool (*ReorderBufferFilterPrepareCB) (
+											  ReorderBuffer *rb,
+											  ReorderBufferTXN *txn,
+											  const char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (
+										ReorderBuffer *rb,
+										ReorderBufferTXN *txn,
+										XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (
+											   ReorderBuffer *rb,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr commit_lsn);
+
+/* abort prepared callback signature */
+typedef void (*ReorderBufferAbortPreparedCB) (
+											  ReorderBuffer *rb,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr abort_lsn);
+
 /* message callback signature */
 typedef void (*ReorderBufferMessageCB) (
 										ReorderBuffer *rb,
@@ -327,6 +376,12 @@ struct ReorderBuffer
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferCommitCB commit;
+	ReorderBufferAbortCB abort;
+	ReorderBufferFilterDecodeTxnCB filter_decode_txn;
+	ReorderBufferFilterPrepareCB filter_prepare;
+	ReorderBufferPrepareCB prepare;
+	ReorderBufferCommitPreparedCB commit_prepared;
+	ReorderBufferAbortPreparedCB abort_prepared;
 	ReorderBufferMessageCB message;
 
 	/*
@@ -369,6 +424,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 					TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid, bool is_commit);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
@@ -392,6 +452,15 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool		ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid,
+										 const char *gid);
+bool		ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+										 const char *gid);
+void		ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
