From e2ebbc83c09c11b2751e2dd3b03b57e7bb8aeae0 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Fri, 23 Apr 2021 00:39:07 -0400
Subject: [PATCH v4] Skip empty transactions for logical replication.

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications). It is a waste of CPU
cycles and network bandwidth to build/transmit these empty transactions.

This patch addresses the above problem by postponing the BEGIN message
until the first change. While processing a COMMIT message,
if there is no other change for that transaction,
do not send COMMIT message. It means that pgoutput will
skip BEGIN / COMMIT messages for transactions that are empty.

Discussion:
https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 43 +++++++++++++++++++++++++++++
 src/include/replication/pgoutput.h          |  3 ++
 src/test/subscription/t/020_messages.pl     |  5 ++--
 3 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f68348d..f4a3576 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -345,10 +345,29 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	PGOutputData	*data = (PGOutputData *) ctx->output_plugin_private;
+
+	(void)txn; /* keep compiler quiet */
+	/*
+	 * Don't send BEGIN message here. Instead, postpone it until the first
+	 * change. In logical replication, a common scenario is to replicate a set
+	 * of tables (instead of all tables) and transactions whose changes were on
+	 * table(s) that are not published will produce empty transactions. These
+	 * empty transactions will send BEGIN and COMMIT messages to subscribers,
+	 * using bandwidth on something with little/no use for logical replication.
+	 */
+	data->sent_begin_txn = false;
+}
+
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
+	PGOutputData    *data = (PGOutputData *) ctx->output_plugin_private;
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
 	logicalrep_write_begin(ctx->out, txn);
+	data->sent_begin_txn = true;
 
 	if (send_replication_origin)
 	{
@@ -384,8 +403,14 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
+	PGOutputData	*data = (PGOutputData *) ctx->output_plugin_private;
+
 	OutputPluginUpdateProgress(ctx);
 
+	/* skip COMMIT message if nothing was sent */
+	if (!data->sent_begin_txn)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
@@ -551,6 +576,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/* output BEGIN if we haven't yet */
+	if (!data->sent_begin_txn && !in_streaming)
+	{
+		pgoutput_begin(ctx, txn);
+	}
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -693,6 +724,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		/* output BEGIN if we haven't yet */
+		if (!data->sent_begin_txn && !in_streaming)
+		{
+			pgoutput_begin(ctx, txn);
+		}
+
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  xid,
@@ -725,6 +762,12 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = txn->xid;
 
+	/* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */
+	if (!data->sent_begin_txn && !in_streaming && transactional)
+	{
+		pgoutput_begin(ctx, txn);
+	}
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 51e7c03..abd92bd 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -20,6 +20,9 @@ typedef struct PGOutputData
 	MemoryContext context;		/* private memory context for transient
 								 * allocations */
 
+	bool        sent_begin_txn; 	/* flag indicating whether begin
+									 * has already been sent */
+
 	/* client-supplied info: */
 	uint32		protocol_version;
 	List	   *publication_names;
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
index c8be26b..2ea790f 100644
--- a/src/test/subscription/t/020_messages.pl
+++ b/src/test/subscription/t/020_messages.pl
@@ -78,9 +78,8 @@ $result = $node_publisher->safe_psql(
 			'publication_names', 'tap_pub')
 ));
 
-# 66 67 == B C == BEGIN COMMIT
-is($result, qq(66
-67),
+# no message and no BEGIN and COMMIT because of empty transaction optimization
+is($result, qq(),
 	'option messages defaults to false so message (M) is not available on slot');
 
 $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
-- 
1.8.3.1

