From 90e93341629bb534a619d29cc87713e35358618b Mon Sep 17 00:00:00 2001
From: Nikhil Sontakke <nikhils@2ndQuadrant.com>
Date: Wed, 13 Jun 2018 16:30:30 +0530
Subject: [PATCH 2/4] Support decoding of two-phase transactions at PREPARE

Until now two-phase transactions were decoded at COMMIT, just like
regular transaction. During replay, two-phase transactions were
translated into regular transactions on the subscriber, and the GID
was not forwarded to it.

This patch allows PREPARE-time decoding two-phase transactions (if
the output plugin supports this capability), in which case the
transactions are replayed at PREPARE and then committed later when
COMMIT PREPARED arrives.

On the subscriber, the transactions will be executed as two-phase
transactions, with the same GID. This is important for various
external transaction managers, that often encode information into
the GID itself.

Includes documentation changes.
---
 doc/src/sgml/logicaldecoding.sgml               | 127 ++++++++++-
 src/backend/replication/logical/decode.c        | 266 ++++++++++++++++++------
 src/backend/replication/logical/logical.c       | 203 ++++++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 185 ++++++++++++++--
 src/include/replication/logical.h               |   2 +-
 src/include/replication/output_plugin.h         |  46 ++++
 src/include/replication/reorderbuffer.h         |  68 ++++++
 7 files changed, 814 insertions(+), 83 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 8db9686..a89e4d5 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -385,7 +385,12 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeChangeCB change_cb;
     LogicalDecodeTruncateCB truncate_cb;
     LogicalDecodeCommitCB commit_cb;
+    LogicalDecodeAbortCB abort_cb;
     LogicalDecodeMessageCB message_cb;
+    LogicalDecodeFilterPrepareCB filter_prepare_cb;
+    LogicalDecodePrepareCB prepare_cb;
+    LogicalDecodeCommitPreparedCB commit_prepared_cb;
+    LogicalDecodeAbortPreparedCB abort_prepared_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
@@ -457,7 +462,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      never get
      decoded. Successful savepoints are
      folded into the transaction containing them in the order they were
-     executed within that transaction.
+     executed within that transaction. A transaction that is prepared for
+     a two-phase commit using <command>PREPARE TRANSACTION</command> will
+     also be decoded if the output plugin callbacks needed for decoding
+     them are provided. It is possible that the current transaction which
+     is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
+     command. In that case, the logical decoding of this transaction will
+     be aborted too.
     </para>
 
     <note>
@@ -558,6 +569,71 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-prepare">
+     <title>Transaction Prepare Callback</title>
+
+     <para>
+      The optional <function>prepare_cb</function> callback is called whenever
+      a transaction which is prepared for two-phase commit has been
+      decoded. The <function>change_cb</function> callbacks for all modified
+      rows will have been called before this, if there have been any modified
+      rows.
+<programlisting>
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn,
+                                        XLogRecPtr prepare_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+     <title>Commit Prepared Transaction Callback</title>
+
+     <para>
+      The optional <function>commit_prepared_cb</function> callback is called whenever
+      a commit prepared transaction has been decoded. The <parameter>gid</parameter> field,
+      which is part of the <parameter>txn</parameter> parameter can be used in this
+      callback.
+<programlisting>
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn,
+                                               XLogRecPtr commit_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-abort-prepared">
+     <title>Rollback Prepared Transaction Callback</title>
+
+     <para>
+      The optional <function>abort_prepared_cb</function> callback is called whenever
+      a rollback prepared transaction has been decoded. The <parameter>gid</parameter> field,
+      which is part of the <parameter>txn</parameter> parameter can be used in this
+      callback.
+<programlisting>
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr abort_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-abort">
+     <title>Transaction Abort Callback</title>
+
+     <para>
+      The required <function>abort_cb</function> callback is called whenever
+      a transaction abort has to be initiated. This can happen if we are
+      decoding a transaction that has been prepared for two-phase commit and
+      a concurrent rollback happens while we are decoding it.
+<programlisting>
+typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
+                                       ReorderBufferTXN *txn,
+                                       XLogRecPtr abort_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-change">
      <title>Change Callback</title>
 
@@ -567,7 +643,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
       an <command>INSERT</command>, <command>UPDATE</command>,
       or <command>DELETE</command>. Even if the original command modified
       several rows at once the callback will be called individually for each
-      row.
+      row. The <function>change_cb</function> callback may access system or
+      user catalog tables to aid in the process of outputting the row
+      modification details. In case of decoding a prepared (but yet
+      uncommitted) transaction or decoding of an uncommitted transaction, this
+      change callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
 <programlisting>
 typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
@@ -644,6 +726,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
      </para>
      </sect3>
 
+     <sect3 id="logicaldecoding-output-plugin-filter-prepare">
+     <title>Prepare Filter Callback</title>
+
+     <para>
+       The optional <function>filter_prepare_cb</function> callback
+       is called to determine whether data that is part of the current
+       two-phase commit transaction should be considered for decode
+       at this prepare stage or as a regular one-phase transaction at
+       <command>COMMIT PREPARED</command> time later. To signal that
+       decoding should be skipped, return <literal>true</literal>;
+       <literal>false</literal> otherwise. When the callback is not
+       defined, <literal>false</literal> is assumed (i.e. nothing is
+       filtered).
+<programlisting>
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              TransactionId xid,
+                                              const char *gid);
+</programlisting>
+      The <parameter>ctx</parameter> parameter has the same contents
+      as for the other callbacks. The <parameter>txn</parameter> parameter
+      contains meta information about the transaction. The <parameter>xid</parameter>
+      contains the XID because <parameter>txn</parameter> can be NULL in some cases.
+      The <parameter>gid</parameter> is the identifier that later identifies this
+      transaction for <command>COMMIT PREPARED</command> or <command>ROLLBACK PREPARED</command>.
+     </para>
+     <para>
+      The callback has to provide the same static answer for a given combination of
+      <parameter>xid</parameter> and <parameter>gid</parameter> every time it is
+      called.
+     </para>
+     </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-message">
      <title>Generic Message Callback</title>
 
@@ -665,7 +780,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
       non-transactional and the XID was not assigned yet in the transaction
       which logged the message. The <parameter>lsn</parameter> has WAL
       location of the message. The <parameter>transactional</parameter> says
-      if the message was sent as transactional or not.
+      if the message was sent as transactional or not. Similar to the change
+      callback, in case of decoding a prepared (but yet uncommitted)
+      transaction or decoding of an uncommitted transaction, this message 
+      callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
+
       The <parameter>prefix</parameter> is arbitrary null-terminated prefix
       which can be used for identifying interesting messages for the current
       plugin. And finally the <parameter>message</parameter> parameter holds
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e3b0565..c60ee90 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/twophase.h"
 
 #include "catalog/pg_control.h"
 
@@ -73,6 +74,8 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			  xl_xact_parsed_prepare * parsed);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -232,17 +235,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				xl_xact_commit *xlrec;
 				xl_xact_parsed_commit parsed;
-				TransactionId xid;
 
 				xlrec = (xl_xact_commit *) XLogRecGetData(r);
 				ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
-
-				if (!TransactionIdIsValid(parsed.twophase_xid))
-					xid = XLogRecGetXid(r);
-				else
-					xid = parsed.twophase_xid;
-
-				DecodeCommit(ctx, buf, &parsed, xid);
+				DecodeCommit(ctx, buf, &parsed, XLogRecGetXid(r));
 				break;
 			}
 		case XLOG_XACT_ABORT:
@@ -250,17 +246,10 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				xl_xact_abort *xlrec;
 				xl_xact_parsed_abort parsed;
-				TransactionId xid;
 
 				xlrec = (xl_xact_abort *) XLogRecGetData(r);
 				ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
-
-				if (!TransactionIdIsValid(parsed.twophase_xid))
-					xid = XLogRecGetXid(r);
-				else
-					xid = parsed.twophase_xid;
-
-				DecodeAbort(ctx, buf, &parsed, xid);
+				DecodeAbort(ctx, buf, &parsed, XLogRecGetXid(r));
 				break;
 			}
 		case XLOG_XACT_ASSIGNMENT:
@@ -281,16 +270,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_PREPARE:
+			{
+				xl_xact_parsed_prepare parsed;
 
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+				/* check that output plugin is capable of twophase decoding */
+				if (!ctx->options.enable_twophase)
+				{
+					ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+					break;
+				}
+
+				/* ok, parse it */
+				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+								   XLogRecGetData(buf->record), &parsed);
+
+				/* does output plugin want this particular transaction? */
+				if (ctx->callbacks.filter_prepare_cb &&
+					ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
+												 parsed.twophase_gid))
+				{
+					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+											buf->origptr);
+					break;
+				}
+
+				DecodePrepare(ctx, buf, &parsed);
+				break;
+			}
 			break;
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
@@ -556,20 +562,13 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  * Consolidated commit record handling between the different form of commit
  * records.
  */
-static void
-DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			 xl_xact_parsed_commit *parsed, TransactionId xid)
+static bool
+DecodeEndOfTxn(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			   xl_xact_parsed_commit *parsed, TransactionId xid)
 {
-	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
-	TimestampTz commit_time = parsed->xact_time;
 	RepOriginId origin_id = XLogRecGetOrigin(buf->record);
-	int			i;
+	bool            skip = false;
 
-	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
-	{
-		origin_lsn = parsed->origin_lsn;
-		commit_time = parsed->origin_timestamp;
-	}
 
 	/*
 	 * Process invalidation messages, even if we're not interested in the
@@ -586,20 +585,24 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
 					   parsed->nsubxacts, parsed->subxacts);
+	skip = SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		ctx->fast_forward || FilterByOrigin(ctx, origin_id);
 
-	/* ----
-	 * Check whether we are interested in this specific transaction, and tell
-	 * the reorderbuffer to forget the content of the (sub-)transactions
-	 * if not.
-	 *
-	 * There can be several reasons we might not be interested in this
-	 * transaction:
-	 * 1) We might not be interested in decoding transactions up to this
-	 *	  LSN. This can happen because we previously decoded it and now just
-	 *	  are restarting or if we haven't assembled a consistent snapshot yet.
-	 * 2) The transaction happened in another database.
-	 * 3) The output plugin is not interested in the origin.
-	 * 4) We are doing fast-forwarding
+	return skip;
+}
+
+static void
+FinalizeTxnDecoding(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+					xl_xact_parsed_commit *parsed, TransactionId xid,
+					bool will_skip)
+{
+	int                     i;
+
+
+	/*
+	 * Tell the reorderbuffer to forget the content of the (sub-)transactions,
+	 * if the transaction doesn't need decoding.
 	 *
 	 * We can't just use ReorderBufferAbort() here, because we need to execute
 	 * the transaction's invalidations.  This currently won't be needed if
@@ -611,31 +614,128 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * another database, the invalidations might be important, because they
 	 * could be for shared catalogs and we might have loaded data into the
 	 * relevant syscaches.
-	 * ---
 	 */
-	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
-		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
-		ctx->fast_forward || FilterByOrigin(ctx, origin_id))
+	if (will_skip)
 	{
 		for (i = 0; i < parsed->nsubxacts; i++)
-		{
 			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
-		}
+
 		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+	}
+	else
+	{
+		/*
+		 * If not skipped, tell the reorderbuffer about the surviving
+		 * subtransactions, if the top-level transaction isn't going to be
+		 * skipped all together.
+		 */
+		for (i = 0; i < parsed->nsubxacts; i++)
+			ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+									 buf->origptr, buf->endptr);
+	}
+}
 
-		return;
+static void
+DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_commit *parsed, TransactionId xid)
+{
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz commit_time = parsed->xact_time;
+	RepOriginId origin_id = XLogRecGetOrigin(buf->record);
+	bool		is_prepared = false;
+	bool		filter_prepared = false;
+	bool		skip;
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
 	}
 
-	/* tell the reorderbuffer about the surviving subtransactions */
-	for (i = 0; i < parsed->nsubxacts; i++)
+	if (TransactionIdIsValid(parsed->twophase_xid))
+	{
+		is_prepared = true;
+		filter_prepared = ReorderBufferPrepareNeedSkip(ctx->reorder,
+													   parsed->twophase_xid,
+													   parsed->twophase_gid);
+
+		/*
+		 * If there is a valid top-level transaction that's different from the
+		 * two-phase one we are committing, clear its reorder buffer as well.
+		 */
+		if (TransactionIdIsNormal(xid) && xid != parsed->twophase_xid)
+			ReorderBufferAbort(ctx->reorder, xid, origin_lsn);
+
+		/* act on the prepared transaction, instead */
+		xid = parsed->twophase_xid;
+	}
+
+	/* Whether or not this COMMIT needs to be skipped. */
+	skip = DecodeEndOfTxn(ctx, buf, parsed, xid);
+
+	/*
+	 * Finalize the decoding of the transaction here.  This is for regular
+	 * commits as well as for two-phase transactions the output plugin was not
+	 * interested in, which therefore are relayed as normal single-phase
+	 * commits.
+	 */
+	if (!is_prepared || filter_prepared)
+		FinalizeTxnDecoding(ctx, buf, parsed, xid, skip);
+
+	if (skip)
+		return;
+
+	/*
+	 * A regular commit simply triggers a replay of transaction changes from
+	 * the reorder buffer. For COMMIT PREPARED that however already happened
+	 * at PREPARE time, and so we only need to notify the subscriber that the
+	 * GID finally committed.
+	 *
+	 * For output plugins that do not support PREPARE-time decoding of
+	 * two-phase transactions, we never even see the PREPARE and all two-phase
+	 * transactions simply fall through to the second branch.
+	 */
+	if (is_prepared && !filter_prepared)
 	{
-		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
-								 buf->origptr, buf->endptr);
+		/* we are processing COMMIT PREPARED */
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									commit_time, origin_id, origin_lsn,
+									parsed->twophase_gid, true);
 	}
+	else
+	{
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+/*
+ * Decode PREPARE record. Similar logic as in COMMIT
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			  xl_xact_parsed_prepare * parsed)
+{
+	XLogRecPtr	origin_lsn = parsed->origin_lsn;
+	TimestampTz commit_time = parsed->origin_timestamp;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	TransactionId xid = parsed->twophase_xid;
+	bool		skip;
+
+	Assert(parsed->dbId != InvalidOid);
+	Assert(TransactionIdIsValid(parsed->twophase_xid));
+
+	/* Whether or not this PREPARE needs to be skipped. */
+	skip = DecodeEndOfTxn(ctx, buf, parsed, xid);
+
+	FinalizeTxnDecoding(ctx, buf, parsed, xid, skip);
 
 	/* replay actions of all transaction + subtransactions in order */
-	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time, origin_id, origin_lsn);
+	if (!skip)
+		ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
+							 commit_time, origin_id, origin_lsn,
+							 parsed->twophase_gid);
 }
 
 /*
@@ -647,6 +747,48 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	bool		is_prepared = TransactionIdIsValid(parsed->twophase_xid);
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
+	}
+
+	if (TransactionIdIsValid(parsed->twophase_xid))
+	{
+		is_prepared = true;
+		Assert(parsed->dbId != InvalidOid);
+
+		/*
+		 * If there is a valid top-level transaction that's different from the
+		 * two-phase one we are aborting, clear its reorder buffer as well.
+		 */
+		if (TransactionIdIsNormal(xid) && xid != parsed->twophase_xid)
+			ReorderBufferAbort(ctx->reorder, xid, origin_lsn);
+
+		/* act on the prepared transaction, instead */
+		xid = parsed->twophase_xid;
+	}
+
+	/*
+	 * If it's ROLLBACK PREPARED then handle it via callbacks.
+	 */
+	if (is_prepared &&
+		!SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
+		parsed->dbId == ctx->slot->data.database &&
+		!ctx->fast_forward &&
+		!FilterByOrigin(ctx, origin_id) &&
+		ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid))
+	{
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									commit_time, origin_id, origin_lsn,
+									parsed->twophase_gid, false);
+		return;
+	}
 
 	for (i = 0; i < parsed->nsubxacts; i++)
 	{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 9f99e4f..be08ccf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -60,6 +60,16 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  XLogRecPtr commit_lsn);
+static void abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				 XLogRecPtr abort_lsn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  TransactionId xid, const char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				   XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   XLogRecPtr commit_lsn);
+static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  XLogRecPtr abort_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change);
 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -192,6 +202,11 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->abort = abort_cb_wrapper;
+	ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+	ctx->reorder->prepare = prepare_cb_wrapper;
+	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+	ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
 	ctx->out = makeStringInfo();
@@ -616,6 +631,33 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
 	/* do the actual work: call callback */
 	ctx->callbacks.startup_cb(ctx, opt, is_init);
 
+	/*
+	 * If the plugin claims to support two-phase transactions, then
+	 * check that the plugin implements all callbacks necessary to decode
+	 * two-phase transactions - we either have to have all of them or none.
+	 * The filter_prepare callback is optional, but can only be defined when
+	 * two-phase decoding is enabled (i.e. the three other callbacks are
+	 * defined).
+	 */
+	if (opt->enable_twophase)
+	{
+		int twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) +
+			(ctx->callbacks.commit_prepared_cb != NULL) +
+			(ctx->callbacks.abort_prepared_cb != NULL);
+
+		/* Plugins with incorrect number of two-phase callbacks are broken. */
+		if ((twophase_callbacks != 3) && (twophase_callbacks != 0))
+			ereport(ERROR,
+					(errmsg("Output plugin registered only %d twophase callbacks. ",
+							twophase_callbacks)));
+	}
+
+	/* filter_prepare is optional, but requires two-phase decoding */
+	if ((ctx->callbacks.filter_prepare_cb != NULL) && (!opt->enable_twophase))
+		ereport(ERROR,
+				(errmsg("Output plugin does not support two-phase decoding, but "
+						"registered filter_prepared callback.")));
+
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 }
@@ -714,6 +756,122 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				 XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort";
+	state.report_location = txn->final_lsn; /* beginning of abort record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				   XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "prepare";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   XLogRecPtr commit_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "commit_prepared";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort_prepared";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
 {
@@ -790,6 +948,51 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  TransactionId xid, const char *gid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/*
+	 * Skip if decoding of twophase at PREPARE time is not enabled. In that
+	 * case all twophase transactions are considered filtered out and will be
+	 * applied as regular transactions at COMMIT PREPARED.
+	 */
+	if (!ctx->options.enable_twophase)
+		return true;
+
+	/*
+	 * The filter_prepare callback is optional. When not supplied, all
+	 * prepared transactions should go through.
+	 */
+	if (!ctx->callbacks.filter_prepare_cb)
+		return false;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_prepare";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 6df6fc0..ffcc5c0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -337,6 +337,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	}
 
 	/* free data that's contained */
+	if (txn->gid != NULL)
+	{
+		pfree(txn->gid);
+		txn->gid = NULL;
+	}
 
 	if (txn->tuplecid_hash != NULL)
 	{
@@ -1426,25 +1431,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
  * and subtransactions (using a k-way merge) and replay the changes in lsn
  * order.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
-					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time,
-					RepOriginId origin_id, XLogRecPtr origin_lsn)
+static void
+ReorderBufferCommitInternal(ReorderBufferTXN *txn,
+							ReorderBuffer *rb, TransactionId xid,
+							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+							TimestampTz commit_time,
+							RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	ReorderBufferTXN *txn;
 	volatile Snapshot snapshot_now;
 	volatile CommandId command_id = FirstCommandId;
 	bool		using_subtxn;
 	ReorderBufferIterTXNState *volatile iterstate = NULL;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-
-	/* unknown transaction, nothing to replay */
-	if (txn == NULL)
-		return;
-
 	txn->final_lsn = commit_lsn;
 	txn->end_lsn = end_lsn;
 	txn->commit_time = commit_time;
@@ -1758,7 +1756,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					break;
 			}
 		}
-
 		/*
 		 * There's a speculative insertion remaining, just clean in up, it
 		 * can't have been successful, otherwise we'd gotten a confirmation
@@ -1774,8 +1771,22 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		/*
+		 * Call abort/commit/prepare callback, depending on the transaction
+		 * state.
+		 *
+		 * If the transaction aborted during apply (which currently can happen
+		 * only for prepared transactions), simply call the abort callback.
+		 *
+		 * Otherwise call either PREPARE (for twophase transactions) or COMMIT
+		 * (for regular ones).
+		 */
+		if (rbtxn_rollback(txn))
+			rb->abort(rb, txn, commit_lsn);
+		else if (rbtxn_prepared(txn))
+			rb->prepare(rb, txn, commit_lsn);
+		else
+			rb->commit(rb, txn, commit_lsn);
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -1802,7 +1813,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		if (snapshot_now->copied)
 			ReorderBufferFreeSnap(rb, snapshot_now);
 
-		/* remove potential on-disk data, and deallocate */
+		/*
+		 * remove potential on-disk data, and deallocate.
+		 *
+		 * We remove it even for prepared transactions (GID is enough to
+		 * commit/abort those later).
+		 */
 		ReorderBufferCleanupTXN(rb, txn);
 	}
 	PG_CATCH();
@@ -1836,6 +1852,141 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	PG_END_TRY();
 }
 
+
+/*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as a regular commit later.
+ */
+bool
+ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+	return rb->filter_prepare(rb, txn, xid, gid);
+}
+
+
+/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferCommitInternal()
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+								commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Prepare a twophase transaction. It calls ReorderBufferCommitInternal()
+ * since all prepared transactions need to be decoded at PREPARE time.
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					 TimestampTz commit_time,
+					 RepOriginId origin_id, XLogRecPtr origin_lsn,
+					 char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/* unknown transaction, nothing to replay */
+	if (txn == NULL)
+		return;
+
+	txn->txn_flags |= RBTXN_PREPARE;
+	txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */
+	strcpy(txn->gid, gid);
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+								commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Check whether this transaction was sent as prepared to subscribers.
+ * Called while handling commit|abort prepared.
+ */
+bool
+ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+						   const char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/*
+	 * Always call the prepare filter. It's the job of the prepare filter to
+	 * give us the *same* response for a given xid across multiple calls
+	 * (including ones on restart)
+	 */
+	return !(rb->filter_prepare(rb, txn, xid, gid));
+}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+							TimestampTz commit_time,
+							RepOriginId origin_id, XLogRecPtr origin_lsn,
+							char *gid, bool is_commit)
+{
+	ReorderBufferTXN *txn;
+
+	/*
+	 * The transaction may or may not exist (during restarts for example).
+	 * Anyways, 2PC transactions do not contain any reorderbuffers. So allow
+	 * it to be created below.
+	 */
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+								true);
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+	/* this txn is obviously prepared */
+	txn->txn_flags |= RBTXN_PREPARE;
+	txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */
+	strcpy(txn->gid, gid);
+
+	if (is_commit)
+	{
+		txn->txn_flags |= RBTXN_COMMIT_PREPARED;
+		rb->commit_prepared(rb, txn, commit_lsn);
+	}
+	else
+	{
+		txn->txn_flags |= RBTXN_ROLLBACK_PREPARED;
+		rb->abort_prepared(rb, txn, commit_lsn);
+	}
+
+	/* cleanup: make sure there's no cache pollution */
+	ReorderBufferExecuteInvalidations(rb, txn);
+	ReorderBufferCleanupTXN(rb, txn);
+}
+
 /*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c25ac1f..5fdda65 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -47,7 +47,7 @@ typedef struct LogicalDecodingContext
 
 	/*
 	 * Marks the logical decoding context as fast forward decoding one. Such a
-	 * context does not have plugin loaded so most of the the following
+	 * context does not have plugin loaded so most of the following
 	 * properties are unused.
 	 */
 	bool		fast_forward;
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 1ee0a56..c9140e7 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -27,6 +27,7 @@ typedef struct OutputPluginOptions
 {
 	OutputPluginOutputType output_type;
 	bool		receive_rewrites;
+	bool		enable_twophase;
 } OutputPluginOptions;
 
 /*
@@ -78,6 +79,46 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   XLogRecPtr commit_lsn);
 
 /*
+ * Called for an implicit ABORT of a transaction.
+ */
+typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
+									  ReorderBufferTXN *txn,
+									  XLogRecPtr abort_lsn);
+
+ /*
+  * Called before decoding of PREPARE record to decide whether this
+  * transaction should be decoded with separate calls to prepare and
+  * commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED and
+  * sent as usual transaction.
+  */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  TransactionId xid,
+											  const char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+										ReorderBufferTXN *txn,
+										XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr abort_lsn);
+
+/*
  * Called for the generic logical decoding messages.
  */
 typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
@@ -109,7 +150,12 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeAbortCB abort_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeFilterPrepareCB filter_prepare_cb;
+	LogicalDecodePrepareCB prepare_cb;
+	LogicalDecodeCommitPreparedCB commit_prepared_cb;
+	LogicalDecodeAbortPreparedCB abort_prepared_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5b5f4db..ae3cea9 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -154,6 +155,11 @@ typedef struct ReorderBufferChange
 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
 #define RBTXN_IS_SUBXACT          0x0002
 #define RBTXN_IS_SERIALIZED       0x0004
+#define RBTXN_PREPARE             0x0008
+#define RBTXN_COMMIT_PREPARED     0x0010
+#define RBTXN_ROLLBACK_PREPARED   0x0020
+#define RBTXN_COMMIT              0x0040
+#define RBTXN_ROLLBACK            0x0080
 
 /* does the txn have catalog changes */
 #define rbtxn_has_catalog_changes(txn) (txn->txn_flags & RBTXN_HAS_CATALOG_CHANGES)
@@ -167,6 +173,16 @@ typedef struct ReorderBufferChange
  * nentries_mem == nentries.
  */
 #define rbtxn_is_serialized(txn)       (txn->txn_flags & RBTXN_IS_SERIALIZED)
+/* is this txn prepared? */
+#define rbtxn_prepared(txn)            (txn->txn_flags & RBTXN_PREPARE)
+/* was this prepared txn committed in the meanwhile? */
+#define rbtxn_commit_prepared(txn)     (txn->txn_flags & RBTXN_COMMIT_PREPARED)
+/* was this prepared txn aborted in the meanwhile? */
+#define rbtxn_rollback_prepared(txn)   (txn->txn_flags & RBTXN_ROLLBACK_PREPARED)
+/* was this txn committed in the meanwhile? */
+#define rbtxn_commit(txn)              (txn->txn_flags & RBTXN_COMMIT)
+/* was this prepared txn aborted in the meanwhile? */
+#define rbtxn_rollback(txn)            (txn->txn_flags & RBTXN_ROLLBACK)
 
 typedef struct ReorderBufferTXN
 {
@@ -179,6 +195,8 @@ typedef struct ReorderBufferTXN
 
 	/* Do we know this is a subxact?  Xid of top-level txn if so */
 	TransactionId toplevel_xid;
+	/* In case of 2PC we need to pass GID to output plugin */
+	char		 *gid;
 
 	/*
 	 * LSN of the first data carrying, WAL record with knowledge about this
@@ -324,6 +342,37 @@ typedef void (*ReorderBufferCommitCB) (
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+/* abort callback signature */
+typedef void (*ReorderBufferAbortCB) (
+									  ReorderBuffer *rb,
+									  ReorderBufferTXN *txn,
+									  XLogRecPtr abort_lsn);
+
+typedef bool (*ReorderBufferFilterPrepareCB) (
+											  ReorderBuffer *rb,
+											  ReorderBufferTXN *txn,
+											  TransactionId xid,
+											  const char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (
+										ReorderBuffer *rb,
+										ReorderBufferTXN *txn,
+										XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (
+											   ReorderBuffer *rb,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr commit_lsn);
+
+/* abort prepared callback signature */
+typedef void (*ReorderBufferAbortPreparedCB) (
+											  ReorderBuffer *rb,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr abort_lsn);
+
+
 /* message callback signature */
 typedef void (*ReorderBufferMessageCB) (
 										ReorderBuffer *rb,
@@ -369,6 +418,11 @@ struct ReorderBuffer
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
+	ReorderBufferAbortCB abort;
+	ReorderBufferFilterPrepareCB filter_prepare;
+	ReorderBufferPrepareCB prepare;
+	ReorderBufferCommitPreparedCB commit_prepared;
+	ReorderBufferAbortPreparedCB abort_prepared;
 	ReorderBufferMessageCB message;
 
 	/*
@@ -419,6 +473,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 					TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+							TimestampTz commit_time,
+							RepOriginId origin_id, XLogRecPtr origin_lsn,
+							char *gid, bool is_commit);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
@@ -442,6 +501,15 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid,
+							 const char *gid);
+bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+						   const char *gid);
+void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					 TimestampTz commit_time,
+					 RepOriginId origin_id, XLogRecPtr origin_lsn,
+					 char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
-- 
2.7.4

