Em seg., 21 de out. de 2019 às 21:20, Jeff Janes <jeff.ja...@gmail.com> escreveu: > > After setting up logical replication of a slowly changing table using the > built in pub/sub facility, I noticed way more network traffic than made > sense. Looking into I see that every transaction in that database on the > master gets sent to the replica. 99.999+% of them are empty transactions > ('B' message and 'C' message with nothing in between) because the > transactions don't touch any tables in the publication, only non-replicated > tables. Is doing it this way necessary for some reason? Couldn't we hold > the transmission of 'B' until something else comes along, and then if that > next thing is 'C' drop both of them? > That is not optimal. Those empty transactions is a waste of bandwidth. We can suppress them if no changes will be sent. test_decoding implements "skip empty transaction" as you described above and I did something similar to it. Patch is attached.
-- Euler Taveira Timbira - http://www.timbira.com.br/ PostgreSQL: Consultoria, Desenvolvimento, Suporte 24x7 e Treinamento
From 433ea40a02ab823f3aa70c18928b9862f0eb004b Mon Sep 17 00:00:00 2001 From: Euler Taveira <eu...@timbira.com.br> Date: Fri, 8 Nov 2019 12:48:03 -0300 Subject: [PATCH] Skip empty transactions for logical replication The current logical replication behavior is to send every transaction to subscriber even though the transaction is empty (because it does not contain changes from the selected publications). It is a waste of CPU cycles and network bandwidth to build/transmit those empty transactions. Postpone the BEGIN message until the first change. While processing a COMMIT message, if there is not a previous wrote change for that transaction, does not send COMMIT message. It means that pgoutput will skip BEGIN / COMMIT messages for transactions that do not wrote changes. Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48flosprmqyeyys5zwkazgd41rjr10xin...@mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 34 +++++++++++++++++++++++++++++ src/include/replication/pgoutput.h | 3 +++ 2 files changed, 37 insertions(+) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c08757..eed1093 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -212,6 +212,22 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = ctx->output_plugin_private; + + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, common scenarios is to replicate a set + * of tables (instead of all tables) and transactions whose changes were to + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->xact_wrote_changes = false; +} + +static void +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; OutputPluginPrepareWrite(ctx, !send_replication_origin); @@ -249,8 +265,14 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputData *data = ctx->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* skip COMMIT message if nothing was sent */ + if (!data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); @@ -335,6 +357,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (!data->xact_wrote_changes) + pgoutput_begin(ctx, txn); + + data->xact_wrote_changes = true; + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -415,6 +443,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* output BEGIN if we haven't yet */ + if (!data->xact_wrote_changes) + pgoutput_begin(ctx, txn); + + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, nrelids, diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 8870721..cb57e76 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -20,6 +20,9 @@ typedef struct PGOutputData MemoryContext context; /* private memory context for transient * allocations */ + /* control wether messages can already be sent */ + bool xact_wrote_changes; + /* client info */ uint32 protocol_version; -- 2.7.4