Em seg., 21 de out. de 2019 às 21:20, Jeff Janes
<jeff.ja...@gmail.com> escreveu:
>
> After setting up logical replication of a slowly changing table using the 
> built in pub/sub facility, I noticed way more network traffic than made 
> sense.  Looking into I see that every transaction in that database on the 
> master gets sent to the replica.  99.999+% of them are empty transactions 
> ('B' message and 'C' message with nothing in between) because the 
> transactions don't touch any tables in the publication, only non-replicated 
> tables.  Is doing it this way necessary for some reason?  Couldn't we hold 
> the transmission of 'B' until something else comes along, and then if that 
> next thing is 'C' drop both of them?
>
That is not optimal. Those empty transactions is a waste of bandwidth.
We can suppress them if no changes will be sent. test_decoding
implements "skip empty transaction" as you described above and I did
something similar to it. Patch is attached.


-- 
   Euler Taveira                                   Timbira -
http://www.timbira.com.br/
   PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento
From 433ea40a02ab823f3aa70c18928b9862f0eb004b Mon Sep 17 00:00:00 2001
From: Euler Taveira <eu...@timbira.com.br>
Date: Fri, 8 Nov 2019 12:48:03 -0300
Subject: [PATCH] Skip empty transactions for logical replication

The current logical replication behavior is to send every transaction to
subscriber even though the transaction is empty (because it does not
contain changes from the selected publications). It is a waste of CPU
cycles and network bandwidth to build/transmit those empty transactions.
Postpone the BEGIN message until the first change. While processing a
COMMIT message, if there is not a previous wrote change for that
transaction, does not send COMMIT message. It means that pgoutput will
skip BEGIN / COMMIT messages for transactions that do not wrote changes.

Discussion:
https://postgr.es/m/CAMkU=1yohp9-dv48flosprmqyeyys5zwkazgd41rjr10xin...@mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 34 +++++++++++++++++++++++++++++
 src/include/replication/pgoutput.h          |  3 +++
 2 files changed, 37 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 9c08757..eed1093 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -212,6 +212,22 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 static void
 pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 {
+	PGOutputData	*data = ctx->output_plugin_private;
+
+	/*
+	 * Don't send BEGIN message here. Instead, postpone it until the first
+	 * change. In logical replication, common scenarios is to replicate a set
+	 * of tables (instead of all tables) and transactions whose changes were to
+	 * table(s) that are not published will produce empty transactions. These
+	 * empty transactions will send BEGIN and COMMIT messages to subscribers,
+	 * using bandwidth on something with little/no use for logical replication.
+	 */
+	data->xact_wrote_changes = false;
+}
+
+static void
+pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
 	bool		send_replication_origin = txn->origin_id != InvalidRepOriginId;
 
 	OutputPluginPrepareWrite(ctx, !send_replication_origin);
@@ -249,8 +265,14 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
+	PGOutputData	*data = ctx->output_plugin_private;
+
 	OutputPluginUpdateProgress(ctx);
 
+	/* skip COMMIT message if nothing was sent */
+	if (!data->xact_wrote_changes)
+		return;
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
@@ -335,6 +357,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/* output BEGIN if we haven't yet */
+	if (!data->xact_wrote_changes)
+		pgoutput_begin(ctx, txn);
+
+	data->xact_wrote_changes = true;
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -415,6 +443,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		/* output BEGIN if we haven't yet */
+		if (!data->xact_wrote_changes)
+			pgoutput_begin(ctx, txn);
+
+		data->xact_wrote_changes = true;
+
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  nrelids,
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index 8870721..cb57e76 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -20,6 +20,9 @@ typedef struct PGOutputData
 	MemoryContext context;		/* private memory context for transient
 								 * allocations */
 
+	/* control wether messages can already be sent */
+	bool		xact_wrote_changes;
+
 	/* client info */
 	uint32		protocol_version;
 
-- 
2.7.4

Reply via email to