From 806112f115f06782f2177a11d376b37862a885e6 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Thu, 5 Nov 2020 04:08:22 -0500
Subject: [PATCH v17] Support 2PC txn base.

Until now two-phase transaction commands were translated into regular transactions
on the subscriber, and the GID was not forwarded to it. None of the two-phase semantics
were communicated to the subscriber.

This patch provides infrastructure for logical decoding plugins to be informed of
two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED
and ROLLBACK PREPARED commands with the corresponding GID.

Include logical decoding plugin API infrastructure changes.

Includes contrib/test_decoding changes.

Includes documentation changes.
---
 contrib/test_decoding/test_decoding.c     | 190 +++++++++++++++++++++++
 doc/src/sgml/logicaldecoding.sgml         | 146 +++++++++++++++++-
 src/backend/replication/logical/logical.c | 242 ++++++++++++++++++++++++++++++
 src/include/replication/logical.h         |   5 +
 src/include/replication/output_plugin.h   |  46 ++++++
 src/include/replication/reorderbuffer.h   |  35 +++++
 src/tools/pgindent/typedefs.list          |  11 ++
 7 files changed, 668 insertions(+), 7 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 8e33614..80b7b51 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -11,12 +11,16 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include "miscadmin.h"
 
+#include "access/transam.h"
 #include "catalog/pg_type.h"
 
 #include "replication/logical.h"
 #include "replication/origin.h"
 
+#include "storage/procarray.h"
+
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -36,6 +40,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	TransactionId check_xid_aborted;	/* track abort of this txid */
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -73,6 +78,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
 static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
 								   ReorderBufferTXN *txn,
 								   XLogRecPtr abort_lsn);
+static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+									 ReorderBufferTXN *txn,
+									 XLogRecPtr prepare_lsn);
 static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
 									ReorderBufferTXN *txn,
 									XLogRecPtr commit_lsn);
@@ -88,6 +96,18 @@ static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn,
 									  int nrelations, Relation relations[],
 									  ReorderBufferChange *change);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 ReorderBufferTXN *txn,
+									 TransactionId xid, const char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+								  ReorderBufferTXN *txn,
+								  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+										  ReorderBufferTXN *txn,
+										  XLogRecPtr commit_lsn);
+static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
+											ReorderBufferTXN *txn,
+											XLogRecPtr abort_lsn);
 
 void
 _PG_init(void)
@@ -112,10 +132,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_start_cb = pg_decode_stream_start;
 	cb->stream_stop_cb = pg_decode_stream_stop;
 	cb->stream_abort_cb = pg_decode_stream_abort;
+	cb->stream_prepare_cb = pg_decode_stream_prepare;
 	cb->stream_commit_cb = pg_decode_stream_commit;
 	cb->stream_change_cb = pg_decode_stream_change;
 	cb->stream_message_cb = pg_decode_stream_message;
 	cb->stream_truncate_cb = pg_decode_stream_truncate;
+	cb->filter_prepare_cb = pg_decode_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn;
 }
 
 
@@ -127,6 +152,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	ListCell   *option;
 	TestDecodingData *data;
 	bool		enable_streaming = false;
+	bool		enable_twophase = false;
 
 	data = palloc0(sizeof(TestDecodingData));
 	data->context = AllocSetContextCreate(ctx->context,
@@ -136,6 +162,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->check_xid_aborted = InvalidTransactionId;
 
 	ctx->output_plugin_private = data;
 
@@ -227,6 +254,35 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "two-phase-commit") == 0)
+		{
+			if (elem->arg == NULL)
+				continue;
+			else if (!parse_bool(strVal(elem->arg), &enable_twophase))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
+								strVal(elem->arg), elem->defname)));
+		}
+		else if (strcmp(elem->defname, "check-xid-aborted") == 0)
+		{
+			if (elem->arg == NULL)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("check-xid-aborted needs an input value")));
+			else
+			{
+
+				errno = 0;
+				data->check_xid_aborted = (TransactionId) strtoul(strVal(elem->arg), NULL, 0);
+
+				if (errno || !TransactionIdIsValid(data->check_xid_aborted))
+					ereport(ERROR,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("check-xid-aborted is not a valid xid: \"%s\"",
+									strVal(elem->arg))));
+			}
+		}
 		else
 		{
 			ereport(ERROR,
@@ -238,6 +294,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	}
 
 	ctx->streaming &= enable_streaming;
+	ctx->twophase &= enable_twophase;
 }
 
 /* cleanup this plugin's resources */
@@ -297,6 +354,93 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here
+ * we demonstrate a simple logic by checking the GID. If the
+ * GID contains the "_nodecode" substring, then we filter
+ * it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						 TransactionId xid, const char *gid)
+{
+	if (strstr(gid, "_nodecode") != NULL)
+		return true;
+
+	return false;
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					  XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							  XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ROLLBACK PREPARED callback */
+static void
+pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+								XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
 static bool
 pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id)
@@ -455,6 +599,26 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	}
 	data->xact_wrote_changes = true;
 
+	/*
+	 * if check_xid_aborted is a valid xid, then it was passed in as an option
+	 * to check if the transaction having this xid would be aborted. This is
+	 * to test concurrent aborts.
+	 */
+	if (TransactionIdIsValid(data->check_xid_aborted))
+	{
+		elog(LOG, "waiting for %u to abort", data->check_xid_aborted);
+		while (TransactionIdIsInProgress(data->check_xid_aborted))
+		{
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(10000L);
+		}
+		if (!TransactionIdIsInProgress(data->check_xid_aborted) &&
+			!TransactionIdDidCommit(data->check_xid_aborted))
+			elog(LOG, "%u aborted", data->check_xid_aborted);
+
+		Assert(TransactionIdDidAbort(data->check_xid_aborted));
+	}
+
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
 
@@ -646,6 +810,32 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
 }
 
 static void
+pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+						 ReorderBufferTXN *txn,
+						 XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, "preparing streamed transaction TXN %u, %s", txn->xid,
+						 quote_literal_cstr(txn->gid));
+	else
+		appendStringInfo(ctx->out, "preparing streamed transaction %s",
+						 quote_literal_cstr(txn->gid));
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+static void
 pg_decode_stream_commit(LogicalDecodingContext *ctx,
 						ReorderBufferTXN *txn,
 						XLogRecPtr commit_lsn)
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 813a037..f5b617d 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -387,11 +387,16 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeTruncateCB truncate_cb;
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
+    LogicalDecodeFilterPrepareCB filter_prepare_cb;
+    LogicalDecodePrepareCB prepare_cb;
+    LogicalDecodeCommitPreparedCB commit_prepared_cb;
+    LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeStreamStartCB stream_start_cb;
     LogicalDecodeStreamStopCB stream_stop_cb;
     LogicalDecodeStreamAbortCB stream_abort_cb;
+    LogicalDecodeStreamPrepareCB stream_prepare_cb;
     LogicalDecodeStreamCommitCB stream_commit_cb;
     LogicalDecodeStreamChangeCB stream_change_cb;
     LogicalDecodeStreamMessageCB stream_message_cb;
@@ -413,10 +418,19 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      An output plugin may also define functions to support streaming of large,
      in-progress transactions. The <function>stream_start_cb</function>,
      <function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
-     <function>stream_commit_cb</function> and <function>stream_change_cb</function>
+     <function>stream_commit_cb</function>, <function>stream_change_cb</function>,
+     and <function>stream_prepare_cb</function>
      are required, while <function>stream_message_cb</function> and
      <function>stream_truncate_cb</function> are optional.
     </para>
+
+    <para>
+    An output plugin may also define functions to support two-phase commits, which are
+    decoded on <command>PREPARE TRANSACTION</command>. The <function>prepare_cb</function>,
+    <function>stream_prepare_cb</function>, <function>commit_prepared_cb</function>
+    and <function>rollback_prepared_cb</function>
+    callbacks are required, while <function>filter_prepare_cb</function> is optional.
+    </para>
    </sect2>
 
    <sect2 id="logicaldecoding-capabilities">
@@ -477,7 +491,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      never get
      decoded. Successful savepoints are
      folded into the transaction containing them in the order they were
-     executed within that transaction.
+     executed within that transaction. A transaction that is prepared for
+     a two-phase commit using <command>PREPARE TRANSACTION</command> will
+     also be decoded if the output plugin callbacks needed for decoding
+     them are provided. It is possible that the current transaction which
+     is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
+     command. In that case, the logical decoding of this transaction will
+     be aborted too.
     </para>
 
     <note>
@@ -578,6 +598,56 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-prepare">
+     <title>Transaction Prepare Callback</title>
+
+     <para>
+      The required <function>prepare_cb</function> callback is called whenever
+      a transaction which is prepared for two-phase commit has been
+      decoded. The <function>change_cb</function> callbacks for all modified
+      rows will have been called before this, if there have been any modified
+      rows. The <parameter>gid</parameter> field, which is part of the
+      <parameter>txn</parameter> parameter can be used in this callback.
+<programlisting>
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn,
+                                        XLogRecPtr prepare_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+     <title>Transaction Commit Prepared Callback</title>
+
+     <para>
+      The required <function>commit_prepared_cb</function> callback is called whenever
+      a transaction commit prepared has been decoded. The <parameter>gid</parameter> field,
+      which is part of the <parameter>txn</parameter> parameter can be used in this
+      callback.
+<programlisting>
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn,
+                                               XLogRecPtr commit_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-rollback-prepared">
+     <title>Transaction Rollback Prepared Callback</title>
+
+     <para>
+      The required <function>rollback_prepared_cb</function> callback is called whenever
+      a transaction rollback prepared has been decoded. The <parameter>gid</parameter> field,
+      which is part of the <parameter>txn</parameter> parameter can be used in this
+      callback.
+<programlisting>
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+                                                 ReorderBufferTXN *txn,
+                                                 XLogRecPtr rollback_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-change">
      <title>Change Callback</title>
 
@@ -587,7 +657,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
       an <command>INSERT</command>, <command>UPDATE</command>,
       or <command>DELETE</command>. Even if the original command modified
       several rows at once the callback will be called individually for each
-      row.
+      row. The <function>change_cb</function> callback may access system or
+      user catalog tables to aid in the process of outputting the row
+      modification details. In case of decoding a prepared (but yet
+      uncommitted) transaction or decoding of an uncommitted transaction, this
+      change callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
 <programlisting>
 typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
@@ -664,6 +740,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
      </para>
      </sect3>
 
+     <sect3 id="logicaldecoding-output-plugin-filter-prepare">
+     <title>Prepare Filter Callback</title>
+
+     <para>
+       The optional <function>filter_prepare_cb</function> callback
+       is called to determine whether data that is part of the current
+       two-phase commit transaction should be considered for decode
+       at this prepare stage or as a regular one-phase transaction at
+       <command>COMMIT PREPARED</command> time later. To signal that
+       decoding should be skipped, return <literal>true</literal>;
+       <literal>false</literal> otherwise. When the callback is not
+       defined, <literal>false</literal> is assumed (i.e. nothing is
+       filtered).
+<programlisting>
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              TransactionId xid,
+                                              const char *gid);
+</programlisting>
+      The <parameter>ctx</parameter> parameter has the same contents
+      as for the other callbacks. The <parameter>txn</parameter> parameter
+      contains meta information about the transaction. The <parameter>xid</parameter>
+      contains the XID because <parameter>txn</parameter> can be NULL in some cases.
+      The <parameter>gid</parameter> is the identifier that later identifies this
+      transaction for <command>COMMIT PREPARED</command> or <command>ROLLBACK PREPARED</command>.
+     </para>
+     <para>
+      The callback has to provide the same static answer for a given combination of
+      <parameter>xid</parameter> and <parameter>gid</parameter> every time it is
+      called.
+     </para>
+     </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-message">
      <title>Generic Message Callback</title>
 
@@ -685,7 +794,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
       non-transactional and the XID was not assigned yet in the transaction
       which logged the message. The <parameter>lsn</parameter> has WAL
       location of the message. The <parameter>transactional</parameter> says
-      if the message was sent as transactional or not.
+      if the message was sent as transactional or not. Similar to the change
+      callback, in case of decoding a prepared (but yet uncommitted)
+      transaction or decoding of an uncommitted transaction, this message
+      callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
+
       The <parameter>prefix</parameter> is arbitrary null-terminated prefix
       which can be used for identifying interesting messages for the current
       plugin. And finally the <parameter>message</parameter> parameter holds
@@ -735,6 +850,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-stream-prepare">
+     <title>Stream Prepare Callback</title>
+     <para>
+      The <function>stream_prepare_cb</function> callback is called to prepare
+      a previously streamed transaction as part of a two-phase commit.
+<programlisting>
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+                                            ReorderBufferTXN *txn,
+                                            XLogRecPtr prepare_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-stream-commit">
      <title>Stream Commit Callback</title>
      <para>
@@ -913,9 +1041,13 @@ OutputPluginWrite(ctx, true);
     When streaming an in-progress transaction, the changes (and messages) are
     streamed in blocks demarcated by <function>stream_start_cb</function>
     and <function>stream_stop_cb</function> callbacks. Once all the decoded
-    changes are transmitted, the transaction is committed using the
-    <function>stream_commit_cb</function> callback (or possibly aborted using
-    the <function>stream_abort_cb</function> callback).
+    changes are transmitted, the transaction can be committed using the
+    the <function>stream_commit_cb</function> callback
+    (or possibly aborted using the <function>stream_abort_cb</function> callback).
+    If two-phase commits are supported, the transaction can be prepared using the
+    <function>stream_prepare_cb</function> callback, commit prepared using the
+    <function>commit_prepared_cb</function> callback or aborted using the
+    <function>rollback_prepared_cb</function>.
    </para>
 
    <para>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d5cfbea..e9107cd 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -59,6 +59,14 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							  XLogRecPtr commit_lsn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									  TransactionId xid, const char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									   XLogRecPtr commit_lsn);
+static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+										 XLogRecPtr abort_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							  Relation relation, ReorderBufferChange *change);
 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -74,6 +82,8 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 								   XLogRecPtr last_lsn);
 static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									XLogRecPtr abort_lsn);
+static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									  XLogRecPtr prepare_lsn);
 static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									 XLogRecPtr commit_lsn);
 static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -207,6 +217,10 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+	ctx->reorder->prepare = prepare_cb_wrapper;
+	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
 	/*
@@ -227,6 +241,21 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
 	/*
+	 * To support two-phase logical decoding, we require
+	 * prepare/commit-prepare/abort-prepare callbacks. The filter-prepare
+	 * callback is optional. We however enable two-phase logical decoding when
+	 * at least one of the methods is enabled so that we can easily identify
+	 * missing methods.
+	 *
+	 * We decide it here, but only check it later in the wrappers.
+	 */
+	ctx->twophase = (ctx->callbacks.prepare_cb != NULL) ||
+		(ctx->callbacks.commit_prepared_cb != NULL) ||
+		(ctx->callbacks.rollback_prepared_cb != NULL) ||
+		(ctx->callbacks.stream_prepare_cb != NULL) ||
+		(ctx->callbacks.filter_prepare_cb != NULL);
+
+	/*
 	 * streaming callbacks
 	 *
 	 * stream_message and stream_truncate callbacks are optional, so we do not
@@ -237,6 +266,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->stream_start = stream_start_cb_wrapper;
 	ctx->reorder->stream_stop = stream_stop_cb_wrapper;
 	ctx->reorder->stream_abort = stream_abort_cb_wrapper;
+	ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
 	ctx->reorder->stream_commit = stream_commit_cb_wrapper;
 	ctx->reorder->stream_change = stream_change_cb_wrapper;
 	ctx->reorder->stream_message = stream_message_cb_wrapper;
@@ -783,6 +813,129 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				   XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* We're only supposed to call this when two-phase commits are supported */
+	Assert(ctx->twophase);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "prepare";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/*
+	 * If the plugin supports two-phase commits then prepare callback is
+	 * mandatory
+	 */
+	if (ctx->callbacks.prepare_cb == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("Output plugin did not register prepare_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   XLogRecPtr commit_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* We're only supposed to call this when two-phase commits are supported */
+	Assert(ctx->twophase);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "commit_prepared";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/*
+	 * If the plugin support two-phase commits then commit prepared callback
+	 * is mandatory
+	 */
+	if (ctx->callbacks.commit_prepared_cb == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("Output plugin did not register commit_prepared_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							 XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* We're only supposed to call this when two-phase commits are supported */
+	Assert(ctx->twophase);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "rollback_prepared";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/*
+	 * If the plugin support two-phase commits then abort prepared callback is
+	 * mandatory
+	 */
+	if (ctx->callbacks.rollback_prepared_cb == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("Output plugin did not register rollback_prepared_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.rollback_prepared_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
 {
@@ -859,6 +1012,52 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  TransactionId xid, const char *gid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/*
+	 * Skip if decoding of two-phase transactions at PREPARE time is not
+	 * enabled. In that case all two-phase transactions are considered
+	 * filtered out and will be applied as regular transactions at COMMIT
+	 * PREPARED.
+	 */
+	if (!ctx->twophase)
+		return true;
+
+	/*
+	 * The filter_prepare callback is optional. When not supplied, all
+	 * prepared transactions should go through.
+	 */
+	if (!ctx->callbacks.filter_prepare_cb)
+		return false;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_prepare";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
@@ -1057,6 +1256,49 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/*
+	 * We're only supposed to call this when streaming and two-phase commits
+	 * are supported.
+	 */
+	Assert(ctx->streaming);
+	Assert(ctx->twophase);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "stream_prepare";
+	state.report_location = txn->final_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn;
+
+	/* in streaming mode with two-phase commits, stream_prepare_cb is required */
+	if (ctx->callbacks.stream_prepare_cb == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("logical streaming commits requires a stream_prepare_cb callback")));
+
+	ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						 XLogRecPtr commit_lsn)
 {
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 40bab7e..7f4384b 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -85,6 +85,11 @@ typedef struct LogicalDecodingContext
 	bool		streaming;
 
 	/*
+	 * Does the output plugin support two-phase decoding, and is it enabled?
+	 */
+	bool		twophase;
+
+	/*
 	 * State for writing output.
 	 */
 	bool		accept_writes;
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index b78c796..032e35a 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -77,6 +77,39 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+ /*
+  * Called before decoding of PREPARE record to decide whether this
+  * transaction should be decoded with separate calls to prepare and
+  * commit_prepared/rollback_prepared callbacks or wait till COMMIT PREPARED
+  * and sent as usual transaction.
+  */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  TransactionId xid,
+											  const char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+										ReorderBufferTXN *txn,
+										XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx,
+												 ReorderBufferTXN *txn,
+												 XLogRecPtr abort_lsn);
+
 /*
  * Called for the generic logical decoding messages.
  */
@@ -124,6 +157,14 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
 											XLogRecPtr abort_lsn);
 
 /*
+ * Called to prepare changes streamed to remote node from in-progress
+ * transaction. This is called as part of a two-phase commit.
+ */
+typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr prepare_lsn);
+
+/*
  * Called to apply changes streamed to remote node from in-progress
  * transaction.
  */
@@ -171,12 +212,17 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeFilterPrepareCB filter_prepare_cb;
+	LogicalDecodePrepareCB prepare_cb;
+	LogicalDecodeCommitPreparedCB commit_prepared_cb;
+	LogicalDecodeRollbackPreparedCB rollback_prepared_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 	/* streaming of changes */
 	LogicalDecodeStreamStartCB stream_start_cb;
 	LogicalDecodeStreamStopCB stream_stop_cb;
 	LogicalDecodeStreamAbortCB stream_abort_cb;
+	LogicalDecodeStreamPrepareCB stream_prepare_cb;
 	LogicalDecodeStreamCommitCB stream_commit_cb;
 	LogicalDecodeStreamChangeCB stream_change_cb;
 	LogicalDecodeStreamMessageCB stream_message_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index dfdda93..66c89d1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -244,6 +245,9 @@ typedef struct ReorderBufferTXN
 	/* Xid of top-level transaction, if known */
 	TransactionId toplevel_xid;
 
+	/* In case of two-phase commit we need to pass GID to output plugin */
+	char	   *gid;
+
 	/*
 	 * LSN of the first data carrying, WAL record with knowledge about this
 	 * xid. This is allowed to *not* be first record adorned with this xid, if
@@ -405,6 +409,26 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+typedef bool (*ReorderBufferFilterPrepareCB) (ReorderBuffer *rb,
+											  ReorderBufferTXN *txn,
+											  TransactionId xid,
+											  const char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
+										ReorderBufferTXN *txn,
+										XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr commit_lsn);
+
+/* rollback  prepared callback signature */
+typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
+												 ReorderBufferTXN *txn,
+												 XLogRecPtr abort_lsn);
+
 /* message callback signature */
 typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										ReorderBufferTXN *txn,
@@ -431,6 +455,12 @@ typedef void (*ReorderBufferStreamAbortCB) (
 											ReorderBufferTXN *txn,
 											XLogRecPtr abort_lsn);
 
+/* prepare streamed transaction callback signature */
+typedef void (*ReorderBufferStreamPrepareCB) (
+											  ReorderBuffer *rb,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr prepare_lsn);
+
 /* commit streamed transaction callback signature */
 typedef void (*ReorderBufferStreamCommitCB) (
 											 ReorderBuffer *rb,
@@ -497,6 +527,10 @@ struct ReorderBuffer
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
+	ReorderBufferFilterPrepareCB filter_prepare;
+	ReorderBufferPrepareCB prepare;
+	ReorderBufferCommitPreparedCB commit_prepared;
+	ReorderBufferRollbackPreparedCB rollback_prepared;
 	ReorderBufferMessageCB message;
 
 	/*
@@ -505,6 +539,7 @@ struct ReorderBuffer
 	ReorderBufferStreamStartCB stream_start;
 	ReorderBufferStreamStopCB stream_stop;
 	ReorderBufferStreamAbortCB stream_abort;
+	ReorderBufferStreamPrepareCB stream_prepare;
 	ReorderBufferStreamCommitCB stream_commit;
 	ReorderBufferStreamChangeCB stream_change;
 	ReorderBufferStreamMessageCB stream_message;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f2ba92b..1086e51 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1314,9 +1314,20 @@ LogStmtLevel
 LogicalDecodeBeginCB
 LogicalDecodeChangeCB
 LogicalDecodeCommitCB
+LogicalDecodeFilterPrepareCB
+LogicalDecodePrepareCB
+LogicalDecodeCommitPreparedCB
+LogicalDecodeRollbackPreparedCB
 LogicalDecodeFilterByOriginCB
 LogicalDecodeMessageCB
 LogicalDecodeShutdownCB
+LogicalDecodeStreamStartCB
+LogicalDecodeStreamStopCB
+LogicalDecodeStreamAbortCB
+LogicalDecodeStreamPrepareCB
+LogicalDecodeStreamCommitCB
+LogicalDecodeStreamChangeCB
+LogicalDecodeStreamMessageCB
 LogicalDecodeStartupCB
 LogicalDecodeTruncateCB
 LogicalDecodingContext
-- 
1.8.3.1

