From 858fc8976c5099f4833aebc54f8337e78fdb6222 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 11 Jan 2022 04:35:09 -0500
Subject: [PATCH v16] Skip empty transactions for logical replication.

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications). It is a waste of CPU
cycles and network bandwidth to build/transmit these empty transactions.

This patch addresses the above problem by postponing the BEGIN message
until the first change. While processing a COMMIT message,
if there is no other change for that transaction,
do not send COMMIT message. It means that pgoutput will
skip BEGIN / COMMIT messages for transactions that are empty. The patch
also makes sure that in synchronous replication mode, when skipping empty
transactions, keepalive messages are sent to keep the LSN locations updated
on the standby.
This patch does not skip empty transactions that are "streaming" or "two-phase".

Discussion:
https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
---
 src/backend/replication/logical/logical.c   |   4 +-
 src/backend/replication/pgoutput/pgoutput.c | 105 ++++++++++++++++++++++++++--
 src/backend/replication/syncrep.c           |  12 +++-
 src/backend/replication/walsender.c         |  22 ++++--
 src/include/replication/logical.h           |   3 +-
 src/include/replication/output_plugin.h     |   2 +-
 src/include/replication/syncrep.h           |   1 +
 src/test/subscription/t/020_messages.pl     |   5 +-
 src/tools/pgindent/typedefs.list            |   1 +
 9 files changed, 135 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 9bc3a2d..fb1c26a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -672,12 +672,12 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
  * Update progress tracking (if supported).
  */
 void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keepalive)
 {
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, send_keepalive);
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index af8d51a..75296a8 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -132,6 +132,17 @@ typedef struct RelationSyncEntry
 	TupleConversionMap *map;
 } RelationSyncEntry;
 
+/*
+ * Maintain a per-transaction level variable to track whether the
+ * transaction has sent BEGIN. BEGIN is only sent when the first
+ * change in a transaction is processed. This makes it possible
+ * to skip transactions that are empty.
+ */
+typedef struct PGOutputTxnData
+{
+	bool sent_begin_txn;    /* flag indicating whether begin has been sent */
+} PGOutputTxnData;
+
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
@@ -396,15 +407,40 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 }
 
 /*
- * BEGIN callback
+ * BEGIN callback.
+ *
+ * Don't send BEGIN message here. Instead, postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set
+ * of tables (instead of all tables) and transactions whose changes were on
+ * table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
  */
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+														 sizeof(PGOutputTxnData));
+
+	txndata->sent_begin_txn = false;
+	txn->output_plugin_private = txndata;
+}
+
+/*
+ * Send BEGIN.
+ * This is where the BEGIN is actually sent. This is called
+ * while processing the first change of the transaction.
+ */
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData	*txndata = (PGOutputTxnData *) txn->output_plugin_private;
 
+	Assert(txndata);
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
+	txndata->sent_begin_txn = true;
 
 	send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
 					 send_replication_origin);
@@ -419,7 +455,25 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	PGOutputTxnData	*txndata = (PGOutputTxnData *) txn->output_plugin_private;
+	bool            skip;
+
+	Assert(txndata);
+
+	/*
+	 * If a BEGIN message was not yet sent, then it means there were no relevant
+	 * changes encountered, so we can skip the COMMIT message too.
+	 */
+	skip = !txndata->sent_begin_txn;
+	pfree(txndata);
+	txn->output_plugin_private = NULL;
+	OutputPluginUpdateProgress(ctx, skip);
+
+	if (skip)
+	{
+		elog(DEBUG1, "skipping replication of an empty transaction");
+		return;
+	}
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -433,6 +487,8 @@ static void
 pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputTxnData    *txndata = MemoryContextAllocZero(ctx->context,
+														 sizeof(PGOutputTxnData));
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin_prepare(ctx->out, txn);
@@ -441,6 +497,8 @@ pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 					 send_replication_origin);
 
 	OutputPluginWrite(ctx, true);
+	txndata->sent_begin_txn = true;
+	txn->output_plugin_private = txndata;
 }
 
 /*
@@ -450,7 +508,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -464,7 +522,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -480,7 +538,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -630,11 +688,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				Relation relation, ReorderBufferChange *change)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 	TransactionId xid = InvalidTransactionId;
 	Relation	ancestor = NULL;
 
+	/* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+	Assert(in_streaming || txndata);
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -668,6 +730,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/*
+	 * Output BEGIN if we haven't yet, unless streaming.
+	 */
+	if (!in_streaming && !txndata->sent_begin_txn)
+		pgoutput_begin(ctx, txn);
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -770,6 +838,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				  int nrelations, Relation relations[], ReorderBufferChange *change)
 {
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 	int			i;
@@ -777,6 +846,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
+	/* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */
+	Assert(in_streaming || txndata);
+
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -813,6 +885,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		/*
+		 * output BEGIN if we haven't yet, unless streaming.
+		 */
+		if (!in_streaming && !txndata->sent_begin_txn)
+			pgoutput_begin(ctx, txn);
+
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  xid,
@@ -845,6 +923,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = txn->xid;
 
+	/*
+	 * Output BEGIN if we haven't yet.
+	 * Avoid for streaming and non-transactional messages
+	 */
+	if (!in_streaming && transactional)
+	{
+		PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+		Assert(txndata);
+		if (!txndata->sent_begin_txn)
+			pgoutput_begin(ctx, txn);
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,
@@ -1011,7 +1102,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1032,7 +1123,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index ce163b9..b945165 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -171,8 +171,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 	 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
 	 * it's false, the lock is not necessary because we don't touch the queue.
 	 */
-	if (!SyncRepRequested() ||
-		!((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+	if (!SyncRepEnabled())
 		return;
 
 	/* Cap the level for anything other than commit to remote flush only. */
@@ -330,6 +329,15 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 }
 
 /*
+ * Check if Synchronous Replication is enabled
+ */
+bool
+SyncRepEnabled(void)
+{
+	return SyncRepRequested() && ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined;
+}
+
+/*
  * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
  *
  * Usually we will go at tail of queue, though it's possible that we arrive
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4cf95ce..43a480f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -170,6 +170,9 @@ static TimestampTz last_reply_timestamp = 0;
 /* Have we sent a heartbeat message asking for reply, since last reply? */
 static bool waiting_for_ping_response = false;
 
+/* Force keep alive when skipping transactions in synchronous replication mode */
+static bool force_keepalive_syncrep = false;
+
 /*
  * While streaming WAL in Copy mode, streamingDoneSending is set to true
  * after we have sent CopyDone. We should not send any more CopyData messages
@@ -248,7 +251,8 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+								 bool send_keepalive);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1448,12 +1452,19 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * Write the current position to the lag tracker (see XLogSendPhysical).
  */
 static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+					 bool send_keepalive)
 {
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
 
 	/*
+	 * When skipping empty transactions in synchronous replication, we need
+	 * to send a keep alive to keep the MyWalSnd locations updated.
+	 */
+	force_keepalive_syncrep = send_keepalive && SyncRepEnabled();
+
+	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
 	 */
@@ -1546,10 +1557,13 @@ WalSndWaitForWal(XLogRecPtr loc)
 		 * otherwise idle, this keepalive will trigger a reply. Processing the
 		 * reply will update these MyWalSnd locations.
 		 */
-		if (MyWalSnd->flush < sentPtr &&
+		if (force_keepalive_syncrep ||
+			(MyWalSnd->flush < sentPtr &&
 			MyWalSnd->write < sentPtr &&
-			!waiting_for_ping_response)
+			!waiting_for_ping_response))
+		{
 			WalSndKeepalive(false);
+		}
 
 		/* check whether we're done */
 		if (loc <= RecentFlushPtr)
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9..9f59855 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
 														 XLogRecPtr Ptr,
-														 TransactionId xid
+														 TransactionId xid,
+														 bool send_keepalive
 );
 
 typedef struct LogicalDecodingContext
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 41157fd..6f84614 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -243,6 +243,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keepalive);
 
 #endif							/* OUTPUT_PLUGIN_H */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 27be230..7086532 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -90,6 +90,7 @@ extern void SyncRepCleanupAtProcExit(void);
 /* called by wal sender */
 extern void SyncRepInitConfig(void);
 extern void SyncRepReleaseWaiters(void);
+extern bool SyncRepEnabled(void);
 
 /* called by wal sender and user backend */
 extern int	SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
index 9bb31ce..1d52e47 100644
--- a/src/test/subscription/t/020_messages.pl
+++ b/src/test/subscription/t/020_messages.pl
@@ -87,9 +87,8 @@ $result = $node_publisher->safe_psql(
 			'publication_names', 'tap_pub')
 ));
 
-# 66 67 == B C == BEGIN COMMIT
-is( $result, qq(66
-67),
+# no message and no BEGIN and COMMIT because of empty transaction optimization
+is($result, qq(),
 	'option messages defaults to false so message (M) is not available on slot'
 );
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5015fa7..c407d54 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1607,6 +1607,7 @@ PGMessageField
 PGModuleMagicFunction
 PGNoticeHooks
 PGOutputData
+PGOutputTxnData
 PGPROC
 PGP_CFB
 PGP_Context
-- 
1.8.3.1

