From af5076813556b6d96530050b073048891655dde8 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 13 Apr 2022 12:18:56 +0530
Subject: [PATCH v15] Fix the logical replication timeout during large
 transactions.

The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published or because all the changes
got filtered. We do try to send the keep_alive if necessary at the end of
the transaction (via WalSndWriteData()) but by that time the
subscriber-side can timeout and exit.

To fix this we try to send the keepalive message if required after
processing certain threshold of changes.

As this requires an exposed API change, we didn't backpatched it.
---
 src/backend/replication/logical/logical.c   |  7 +--
 src/backend/replication/pgoutput/pgoutput.c | 57 ++++++++++++++++++---
 src/backend/replication/walsender.c         | 29 +++++++++--
 src/include/replication/logical.h           |  3 +-
 src/include/replication/output_plugin.h     |  3 +-
 5 files changed, 83 insertions(+), 16 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 788769dd73..d3fe045c84 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -669,17 +669,18 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
- * Update progress tracking (if supported).
+ * Update progress tracking and try to send a keepalive message (if supported).
  */
 void
 OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
-						   bool skipped_xact)
+						   bool skipped_xact,
+						   bool end_xact)
 {
 	if (!ctx->update_progress)
 		return;
 
 	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
-						 skipped_xact);
+						 skipped_xact, end_xact);
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index fe5accca57..53c0a5b109 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -91,6 +91,7 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
+static void update_progress(LogicalDecodingContext *ctx, bool skipped_xact, bool end_xact);
 
 /*
  * Only 3 publication actions are used for row filtering ("insert", "update",
@@ -558,7 +559,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * from this transaction has been sent to the downstream.
 	 */
 	sent_begin_txn = txndata->sent_begin_txn;
-	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+	update_progress(ctx, !sent_begin_txn, true);
 	pfree(txndata);
 	txn->output_plugin_private = NULL;
 
@@ -597,7 +598,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx, false);
+	update_progress(ctx, false, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -611,7 +612,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx, false);
+	update_progress(ctx, false, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -627,7 +628,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx, false);
+	update_progress(ctx, false, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1361,6 +1362,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
+	update_progress(ctx, false, false);
+
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1593,6 +1596,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
+	update_progress(ctx, false, false);
+
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -1656,6 +1661,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
 
+	update_progress(ctx, false, false);
+
 	if (!data->messages)
 		return;
 
@@ -1848,7 +1855,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx, false);
+	update_progress(ctx, false, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1869,7 +1876,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx, false);
+	update_progress(ctx, false, true);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -2362,3 +2369,41 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
 		}
 	}
 }
+
+/*
+ * Try to update progress and send a keepalive message if too many changes were
+ * processed.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time then it can timeout. This can happen when all or most of the
+ * changes are either not published or got filtered out.
+ */
+static void
+update_progress(LogicalDecodingContext *ctx, bool skipped_xact, bool end_xact)
+{
+	static int	changes_count = 0;
+
+	if (end_xact)
+	{
+		/* Update progress tracking at xact end. */
+		OutputPluginUpdateProgress(ctx, skipped_xact, end_xact);
+		changes_count = 0;
+		return;
+	}
+
+	/*
+	 * After continuously processing CHANGES_THRESHOLD changes, we try to send
+	 * a keepalive message if required.
+	 *
+	 * We don't want to try sending a keepalive message after processing each
+	 * change as that can have overhead. Testing reveals that there is no
+	 * noticeable overhead in doing it after continuously processing 100 or so
+	 * changes.
+	 */
+#define CHANGES_THRESHOLD 100
+	if (++changes_count >= CHANGES_THRESHOLD)
+	{
+		OutputPluginUpdateProgress(ctx, skipped_xact, end_xact);
+		changes_count = 0;
+	}
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75400a53f2..2469082766 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -251,7 +251,7 @@ static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
-								 bool skipped_xact);
+								 bool skipped_xact, bool end_xact);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1461,21 +1461,28 @@ ProcessPendingWrites(void)
  * Write the current position to the lag tracker (see XLogSendPhysical).
  *
  * When skipping empty transactions, send a keepalive message if necessary.
+ *
+ * If too many changes are processed then try to send a keepalive message to
+ * receiver to avoid timeouts.
  */
 static void
 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
-					 bool skipped_xact)
+					 bool skipped_xact, bool end_xact)
 {
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
+	bool		pending_writes = false;
 
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
+	 *
+	 * We don't have a mechanism to get the ack for any LSN other than end xact
+	 * lsn from the downstream. So, we track lag only for end xact lsn's.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (TimestampDifferenceExceeds(sendTime, now,
-								   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+	if (end_xact && TimestampDifferenceExceeds(sendTime, now,
+												 WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
 	{
 		LagTrackerWrite(lsn, now);
 		sendTime = now;
@@ -1501,8 +1508,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 
 		/* If we have pending write here, make sure it's actually flushed */
 		if (pq_is_send_pending())
-			ProcessPendingWrites();
+			pending_writes = true;
 	}
+
+	/*
+	 * Process pending writes if any or try to send a keepalive if required. We
+	 * don't need to try sending keep alive messages at the transaction end as
+	 * that will be done at a later point in time. This is required only for
+	 * large transactions where we don't send any changes to the downstream and
+	 * the receiver can timeout due to that.
+	 */
+	if (pending_writes || (!end_xact &&
+						   now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
+															  wal_sender_timeout / 2)))
+		ProcessPendingWrites();
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index a6ef16ad5b..e4c50a193e 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -27,7 +27,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
 														 XLogRecPtr Ptr,
 														 TransactionId xid,
-														 bool skipped_xact
+														 bool skipped_xact,
+														 bool end_xact
 );
 
 typedef struct LogicalDecodingContext
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 539dc8e697..82929db313 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -243,6 +243,7 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact,
+									   bool end_xact);
 
 #endif							/* OUTPUT_PLUGIN_H */
-- 
2.28.0.windows.1

