From 7c0c403625bef87ef67b3930be7fd3171628cc3e Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Wed, 21 Jul 2021 06:29:57 -0400
Subject: [PATCH v8] Skip empty transactions for logical replication.

The current logical replication behaviour is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications). It is a waste of CPU
cycles and network bandwidth to build/transmit these empty transactions.

This patch addresses the above problem by postponing the BEGIN / BEGIN
PREPARE messages until the first change is encountered.
If (when processing a COMMIT / PREPARE message) we find there had been
no other change for that transaction, then do not send the COMMIT /
PREPARE message. This means that pgoutput will skip BEGIN / COMMIT
or BEGIN PREPARE / PREPARE  messages for transactions that are empty.

Discussion:
https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
---
 contrib/test_decoding/test_decoding.c           |   7 +-
 doc/src/sgml/logicaldecoding.sgml               |  13 +-
 doc/src/sgml/protocol.sgml                      |  15 ++
 src/backend/replication/logical/logical.c       |   9 +-
 src/backend/replication/logical/proto.c         |  16 +-
 src/backend/replication/logical/reorderbuffer.c |   2 +-
 src/backend/replication/logical/worker.c        |  38 +++--
 src/backend/replication/pgoutput/pgoutput.c     | 188 +++++++++++++++++++++++-
 src/include/replication/logicalproto.h          |   8 +-
 src/include/replication/output_plugin.h         |   4 +-
 src/include/replication/reorderbuffer.h         |   4 +-
 src/test/subscription/t/020_messages.pl         |   5 +-
 src/test/subscription/t/021_twophase.pl         |  46 +++++-
 src/tools/pgindent/typedefs.list                |   1 +
 14 files changed, 316 insertions(+), 40 deletions(-)

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e5cd84e..408dbfc 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -86,7 +86,9 @@ static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
 								  XLogRecPtr prepare_lsn);
 static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
 										  ReorderBufferTXN *txn,
-										  XLogRecPtr commit_lsn);
+										  XLogRecPtr commit_lsn,
+										  XLogRecPtr prepare_end_lsn,
+										  TimestampTz prepare_time);
 static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx,
 											ReorderBufferTXN *txn,
 											XLogRecPtr prepare_end_lsn,
@@ -390,7 +392,8 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 /* COMMIT PREPARED callback */
 static void
 pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-							  XLogRecPtr commit_lsn)
+							  XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+							  TimestampTz prepare_time)
 {
 	TestDecodingData *data = ctx->output_plugin_private;
 
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 89b8090..27811e5 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -884,11 +884,20 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
       The required <function>commit_prepared_cb</function> callback is called
       whenever a transaction <command>COMMIT PREPARED</command> has been decoded.
       The <parameter>gid</parameter> field, which is part of the
-      <parameter>txn</parameter> parameter, can be used in this callback.
+      <parameter>txn</parameter> parameter, can be used in this callback. The
+      parameters <parameter>prepare_end_lsn</parameter> and
+      <parameter>prepare_time</parameter> can be used to check if the plugin
+      has received this <command>PREPARE TRANSACTION</command> command or not.
+      If yes, it can commit the transaction, otherwise, it can skip the commit.
+      The <parameter>gid</parameter> alone is not sufficient to determine this
+      because the downstream may already have a prepared transaction with the
+      same identifier.
 <programlisting>
 typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                                ReorderBufferTXN *txn,
-                                               XLogRecPtr commit_lsn);
+                                               XLogRecPtr commit_lsn,
+                                               XLogRecPtr prepare_end_lsn,
+                                               TimestampTz prepare_time);
 </programlisting>
      </para>
     </sect3>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index e8cb78f..5e68dfb 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -7550,6 +7550,13 @@ are available since protocol version 3.
 <varlistentry>
 <term>Int64</term>
 <listitem><para>
+                The end LSN of the prepare.
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
                 The LSN of the commit prepared.
 </para></listitem>
 </varlistentry>
@@ -7564,6 +7571,14 @@ are available since protocol version 3.
 <varlistentry>
 <term>Int64</term>
 <listitem><para>
+                Prepare timestamp of the transaction. The value is in number
+                of microseconds since PostgreSQL epoch (2000-01-01).
+</para></listitem>
+</varlistentry>
+
+<varlistentry>
+<term>Int64</term>
+<listitem><para>
                 Commit timestamp of the transaction. The value is in number
                 of microseconds since PostgreSQL epoch (2000-01-01).
 </para></listitem>
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d61ef4c..67c762a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -63,7 +63,8 @@ static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn
 static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr prepare_lsn);
 static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
-									   XLogRecPtr commit_lsn);
+									   XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+									   TimestampTz prepare_time);
 static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 										 XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -936,7 +937,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 static void
 commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
-						   XLogRecPtr commit_lsn)
+						   XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+						   TimestampTz prepare_time)
 {
 	LogicalDecodingContext *ctx = cache->private_data;
 	LogicalErrorCallbackState state;
@@ -972,7 +974,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 						"commit_prepared_cb")));
 
 	/* do the actual work: call callback */
-	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn, prepare_end_lsn,
+									  prepare_time);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index a245252..47a7489 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -206,7 +206,9 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
  */
 void
 logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
-								 XLogRecPtr commit_lsn)
+								 XLogRecPtr commit_lsn,
+								 XLogRecPtr prepare_end_lsn,
+								 TimestampTz prepare_time)
 {
 	uint8		flags = 0;
 
@@ -222,8 +224,10 @@ logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
 	pq_sendbyte(out, flags);
 
 	/* send fields */
+	pq_sendint64(out, prepare_end_lsn);
 	pq_sendint64(out, commit_lsn);
 	pq_sendint64(out, txn->end_lsn);
+	pq_sendint64(out, prepare_time);
 	pq_sendint64(out, txn->xact_time.commit_time);
 	pq_sendint32(out, txn->xid);
 
@@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *
 		elog(ERROR, "unrecognized flags %u in commit prepared message", flags);
 
 	/* read fields */
+	prepare_data->prepare_end_lsn = pq_getmsgint64(in);
+	if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr)
+		elog(ERROR, "prepare_end_lsn is not set in commit prepared message");
 	prepare_data->commit_lsn = pq_getmsgint64(in);
 	if (prepare_data->commit_lsn == InvalidXLogRecPtr)
 		elog(ERROR, "commit_lsn is not set in commit prepared message");
-	prepare_data->end_lsn = pq_getmsgint64(in);
-	if (prepare_data->end_lsn == InvalidXLogRecPtr)
-		elog(ERROR, "end_lsn is not set in commit prepared message");
+	prepare_data->commit_end_lsn = pq_getmsgint64(in);
+	if (prepare_data->commit_end_lsn == InvalidXLogRecPtr)
+		elog(ERROR, "commit_end_lsn is not set in commit prepared message");
+	prepare_data->prepare_time = pq_getmsgint64(in);
 	prepare_data->commit_time = pq_getmsgint64(in);
 	prepare_data->xid = pq_getmsgint(in, 4);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7378beb..5a707e2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2794,7 +2794,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 	txn->origin_lsn = origin_lsn;
 
 	if (is_commit)
-		rb->commit_prepared(rb, txn, commit_lsn);
+		rb->commit_prepared(rb, txn, commit_lsn, prepare_end_lsn, prepare_time);
 	else
 		rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b9a7a7f..63e19bc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -966,27 +966,39 @@ apply_handle_commit_prepared(StringInfo s)
 	/* Compute GID for two_phase transactions. */
 	TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
 						   gid, sizeof(gid));
-
-	/* There is no transaction when COMMIT PREPARED is called */
-	begin_replication_step();
-
 	/*
-	 * Update origin state so we can restart streaming from correct position
-	 * in case of crash.
+	 * It is possible that we haven't received the prepare because
+	 * the transaction did not have any changes relevant to this
+	 * subscription and so was essentially an empty prepare. In this case,
+	 * the walsender is optimized to drop the empty transaction and the
+	 * accompanying prepare. Silently ignore if we don't find the prepared
+	 * transaction.
 	 */
-	replorigin_session_origin_lsn = prepare_data.end_lsn;
-	replorigin_session_origin_timestamp = prepare_data.commit_time;
+	if (LookupGXact(gid, prepare_data.prepare_end_lsn,
+					prepare_data.prepare_time))
+	{
 
-	FinishPreparedTransaction(gid, true);
-	end_replication_step();
-	CommitTransactionCommand();
+		/* There is no transaction when COMMIT PREPARED is called */
+		begin_replication_step();
+
+		/*
+		 * Update origin state so we can restart streaming from correct position
+		 * in case of crash.
+		 */
+		replorigin_session_origin_lsn = prepare_data.commit_end_lsn;
+		replorigin_session_origin_timestamp = prepare_data.commit_time;
+
+		FinishPreparedTransaction(gid, true);
+		end_replication_step();
+		CommitTransactionCommand();
+	}
 	pgstat_report_stat(false);
 
-	store_flush_position(prepare_data.end_lsn);
+	store_flush_position(prepare_data.commit_end_lsn);
 	in_remote_transaction = false;
 
 	/* Process any tables that are being synchronized in parallel. */
-	process_syncing_tables(prepare_data.end_lsn);
+	process_syncing_tables(prepare_data.commit_end_lsn);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
 }
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e4314af..d82db45 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -56,7 +56,9 @@ static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
 								 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
 static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
-										 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+										 ReorderBufferTXN *txn, XLogRecPtr commit_lsn,
+										 XLogRecPtr prepare_end_lsn,
+										 TimestampTz prepare_time);
 static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 										   ReorderBufferTXN *txn,
 										   XLogRecPtr prepare_end_lsn,
@@ -130,6 +132,11 @@ typedef struct RelationSyncEntry
 	TupleConversionMap *map;
 } RelationSyncEntry;
 
+typedef struct PGOutputTxnData
+{
+	bool sent_begin_txn;    /* flag indicating whether begin has been sent */
+} PGOutputTxnData;
+
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
@@ -406,14 +413,38 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 
 /*
  * BEGIN callback
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
  */
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	PGOutputTxnData    *data = MemoryContextAllocZero(ctx->context,
+														sizeof(PGOutputTxnData));
+
+	data->sent_begin_txn = false;
+	txn->output_plugin_private = data;
+}
+
+/*
+ * Send BEGIN.
+ * This is where the BEGIN is actually sent. This is called
+ * while processing the first change of the transaction.
+ */
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData	*data = (PGOutputTxnData *) txn->output_plugin_private;
 
+	Assert(data);
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
+	data->sent_begin_txn = true;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
@@ -428,23 +459,66 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
+	PGOutputTxnData	*data = (PGOutputTxnData *) txn->output_plugin_private;
+	bool            skip;
+
+	Assert(data);
+
+	/*
+	 * If a BEGIN message was not yet sent, then it means there were no relevant
+	 * changes encountered, so we can skip the COMMIT message too.
+	 */
+	skip = !data->sent_begin_txn;
+	pfree(data);
+	txn->output_plugin_private = NULL;
 	OutputPluginUpdateProgress(ctx);
 
+	if (skip)
+	{
+		elog(DEBUG1, "skipping replication of an empty transaction");
+		return;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
 }
 
 /*
- * BEGIN PREPARE callback
+ * BEGIN PREPARE callback.
+ * Don't send BEGIN PREPARE message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN PREPARE and COMMIT PREPARED messages
+ * to subscribers, using bandwidth on something with little/no use
+ * for logical replication.
  */
 static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	/*
+	 * Delegate to assign the begin sent flag as false same as for the
+	 * BEGIN message.
+	 */
+	pgoutput_begin_txn(ctx, txn);
+}
+
+/*
+ * Send BEGIN PREPARE.
+ * This is where the BEGIN PREPARE is actually sent. This is called while
+ * processing the first change of the prepared transaction.
+ */
+static void
+pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
 
+	Assert(data);
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin_prepare(ctx->out, txn);
+	data->sent_begin_txn = true;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
@@ -459,8 +533,21 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
+	PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
+	Assert(data);
 	OutputPluginUpdateProgress(ctx);
 
+	/*
+	 * If the BEGIN was not yet sent, then it means there were no relevant
+	 * changes encountered, so we can skip the PREPARE message too.
+	 */
+	if (!data->sent_begin_txn)
+	{
+		elog(DEBUG1, "skipping replication of an empty prepared transaction");
+		return;
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -471,12 +558,34 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  */
 static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
-							 XLogRecPtr commit_lsn)
+							 XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn,
+							 TimestampTz prepare_time)
 {
+	PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
 	OutputPluginUpdateProgress(ctx);
 
+	/*
+	 * If the BEGIN PREPARE was not yet sent, then it means there were no
+	 * relevant changes encountered, so we can skip the COMMIT PREPARED
+	 * messsage too.
+	 */
+	if (data)
+	{
+		bool skip = !data->sent_begin_txn;
+		pfree(data);
+		txn->output_plugin_private = NULL;
+		if (skip)
+		{
+			elog(DEBUG1,
+				 "skipping replication of COMMIT PREPARED of an empty transaction");
+			return;
+		}
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
-	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
+	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn, prepare_end_lsn,
+									 prepare_time);
 	OutputPluginWrite(ctx, true);
 }
 
@@ -489,8 +598,27 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
+	PGOutputTxnData    *data = (PGOutputTxnData *) txn->output_plugin_private;
+
 	OutputPluginUpdateProgress(ctx);
 
+   /*
+    * If the BEGIN PREPARE was not yet sent, then it means there were no
+    * relevant changes encountered, so we can skip the ROLLBACK PREPARED
+    * messsage too.
+    */
+	if (data)
+	{
+		bool skip = !data->sent_begin_txn;
+		pfree(data);
+		txn->output_plugin_private = NULL;
+		if (skip)
+		{
+			elog(DEBUG1,
+				 "skipping replication of ROLLBACK of an empty transaction");
+			return;
+		}
+	}
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
 									   prepare_time);
@@ -639,11 +767,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				Relation relation, ReorderBufferChange *change)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 	TransactionId xid = InvalidTransactionId;
 	Relation	ancestor = NULL;
 
+	/* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+	if (!in_streaming)
+		Assert(txndata);
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -677,6 +810,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/*
+	 * output BEGIN / BEGIN PREPARE if we haven't yet,
+     * while streaming no need to send BEGIN / BEGIN PREPARE.
+	 */
+	if (!in_streaming && !txndata->sent_begin_txn)
+	{
+		if (rbtxn_prepared(txn))
+			pgoutput_begin_prepare(ctx, txn);
+		else
+			pgoutput_begin(ctx, txn);
+	}
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -779,6 +924,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				  int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 	int			i;
@@ -786,6 +932,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
+	/* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+	if (!in_streaming)
+		Assert(txndata);
+
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -822,6 +972,18 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		/*
+		 * output BEGIN / BEGIN PREPARE if we haven't yet,
+		 * while streaming no need to send BEGIN / BEGIN PREPARE.
+		 */
+		if (!in_streaming && !txndata->sent_begin_txn)
+		{
+			if (rbtxn_prepared(txn))
+				pgoutput_begin_prepare(ctx, txn);
+			else
+				pgoutput_begin(ctx, txn);
+		}
+
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  xid,
@@ -854,6 +1016,24 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = txn->xid;
 
+	/*
+	 * Output BEGIN if we haven't yet.
+	 * Avoid for streaming and non-transactional messages
+	 */
+	if (!in_streaming && transactional)
+	{
+		PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+		Assert(txndata);
+		if (!txndata->sent_begin_txn)
+		{
+			if (rbtxn_prepared(txn))
+				pgoutput_begin_prepare(ctx, txn);
+			else
+				pgoutput_begin(ctx, txn);
+		}
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 63de90d..0be0a07 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -148,8 +148,10 @@ typedef struct LogicalRepPreparedTxnData
  */
 typedef struct LogicalRepCommitPreparedTxnData
 {
+	XLogRecPtr	prepare_end_lsn;
 	XLogRecPtr	commit_lsn;
-	XLogRecPtr	end_lsn;
+	XLogRecPtr	commit_end_lsn;
+	TimestampTz prepare_time;
 	TimestampTz commit_time;
 	TransactionId xid;
 	char		gid[GIDSIZE];
@@ -188,7 +190,9 @@ extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
 extern void logicalrep_read_prepare(StringInfo in,
 									LogicalRepPreparedTxnData *prepare_data);
 extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn,
-											 XLogRecPtr commit_lsn);
+											 XLogRecPtr commit_lsn,
+											 XLogRecPtr prepare_end_lsn,
+											 TimestampTz prepare_time);
 extern void logicalrep_read_commit_prepared(StringInfo in,
 											LogicalRepCommitPreparedTxnData *prepare_data);
 extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn,
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 810495e..0d28306 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -128,7 +128,9 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
  */
 typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
 											   ReorderBufferTXN *txn,
-											   XLogRecPtr commit_lsn);
+											   XLogRecPtr commit_lsn,
+											   XLogRecPtr prepare_end_lsn,
+											   TimestampTz prepare_time);
 
 /*
  * Called for ROLLBACK PREPARED.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 5b40ff7..11e2e1e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -442,7 +442,9 @@ typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb,
 /* commit prepared callback signature */
 typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb,
 											   ReorderBufferTXN *txn,
-											   XLogRecPtr commit_lsn);
+											   XLogRecPtr commit_lsn,
+											   XLogRecPtr prepare_end_lsn,
+											   TimestampTz prepare_time);
 
 /* rollback  prepared callback signature */
 typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb,
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
index 0e218e0..3d246be 100644
--- a/src/test/subscription/t/020_messages.pl
+++ b/src/test/subscription/t/020_messages.pl
@@ -87,9 +87,8 @@ $result = $node_publisher->safe_psql(
 			'publication_names', 'tap_pub')
 ));
 
-# 66 67 == B C == BEGIN COMMIT
-is( $result, qq(66
-67),
+# no message and no BEGIN and COMMIT because of empty transaction optimization
+is($result, qq(),
 	'option messages defaults to false so message (M) is not available on slot'
 );
 
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index c6ada92..b954630 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 24;
+use Test::More tests => 25;
 
 ###############################
 # Setup
@@ -318,10 +318,9 @@ $node_publisher->safe_psql('postgres', "
 
 $node_publisher->wait_for_catchup($appname_copy);
 
-# Check that the transaction has been prepared on the subscriber, there will be 2
-# prepared transactions for the 2 subscriptions.
+# Check that the transaction has been prepared on the subscriber
 $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
-is($result, qq(2), 'transaction is prepared on subscriber');
+is($result, qq(1), 'transaction is prepared on subscriber');
 
 # Now commit the insert and verify that it IS replicated
 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'mygid';");
@@ -337,6 +336,45 @@ is($result, qq(2), 'replicated data in subscriber table');
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
 
+##############################
+# Test empty prepares
+##############################
+
+# create a table that is not part of the publication
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab_nopub (a int PRIMARY KEY)");
+
+# disable the subscription so that we can peek at the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+# wait for the replication slot to become inactive in the publisher
+$node_publisher->poll_query_until('postgres',
+   "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", 1);
+
+# create a transaction with no changes relevant to the slot
+$node_publisher->safe_psql('postgres', "
+   BEGIN;
+   INSERT INTO tab_nopub SELECT generate_series(1,10);
+   PREPARE TRANSACTION 'empty_transaction';
+   COMMIT PREPARED 'empty_transaction';");
+
+# peek at the contents of the slot
+$result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT get_byte(data, 0)
+       FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
+           'proto_version', '3',
+           'publication_names', 'tap_pub')
+));
+
+# the empty transaction should be skipped
+is($result, qq(),
+   'empty transaction dropped on slot'
+);
+
+# enable the subscription to test cleanup
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+
 ###############################
 # check all the cleanup
 ###############################
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 37cf4b2..75639ab 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1606,6 +1606,7 @@ PGMessageField
 PGModuleMagicFunction
 PGNoticeHooks
 PGOutputData
+PGOutputTxnData
 PGPROC
 PGP_CFB
 PGP_Context
-- 
1.8.3.1

