From aaa79d6c340c146fb02c4afbebd31f729da406be Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Thu, 21 Apr 2022 15:53:29 +0800
Subject: [PATCH v1] Fix the logical replication timeout during large
 transactions.

The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published. We do try to send the keep_alive
if necessary at the end of the transaction (via WalSndWriteData()) but
by that time the subscriber-side can timeout and exit.

To fix this we try to send the keepalive message if required after
processing certain threshold of changes.
---
 src/backend/replication/logical/logical.c   | 48 ++++++++++++++++++++-
 src/backend/replication/pgoutput/pgoutput.c |  2 +
 src/backend/replication/walsender.c         | 39 +++++++++++++++--
 src/include/replication/logical.h           |  1 +
 4 files changed, 84 insertions(+), 6 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1851a626f9..7cc3b5ac60 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -515,15 +515,38 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
- * Update progress tracking (if supported).
+ * Update progress tracking and try to send a keepalive message (if supported).
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are not published.
  */
 void
 OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
 {
+	static int	changes_count = 0;
+
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+	/*
+	 * We don't want to try sending a keepalive message after processing each
+	 * change as that can have overhead. Tests revealed that there is no
+	 * noticeable overhead in doing it after continuously processing 100 or so
+	 * changes.
+	 */
+#define CHANGES_THRESHOLD 100
+
+	/*
+	 * If we are at the end of transaction LSN, update progress tracking.
+	 * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
+	 * try to send a keepalive message if required.
+	 */
+	if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
+	{
+		ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+		changes_count = 0;
+	}
 }
 
 /*
@@ -590,6 +613,9 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
 	/* set output state */
 	ctx->accept_writes = false;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.startup_cb(ctx, opt, is_init);
 
@@ -615,6 +641,9 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
 	/* set output state */
 	ctx->accept_writes = false;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.shutdown_cb(ctx);
 
@@ -648,6 +677,9 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->first_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.begin_cb(ctx, txn);
 
@@ -677,6 +709,9 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
 
+	/* set lsn state */
+	ctx->end_xact = true;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
 
@@ -713,6 +748,9 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	ctx->callbacks.change_cb(ctx, txn, relation, change);
 
 	/* Pop the error context stack */
@@ -738,6 +776,9 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	/* set output state */
 	ctx->accept_writes = false;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
 
@@ -773,6 +814,9 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
 	ctx->write_location = message_lsn;
 
+	/* set lsn state */
+	ctx->end_xact = false;
+
 	/* do the actual work: call callback */
 	ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
 							  message_size, message);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 529a5c0b48..12db2ad4fd 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -264,6 +264,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContext old;
 	RelationSyncEntry *relentry;
 
+	OutputPluginUpdateProgress(ctx);
+
 	if (!is_publishable_relation(relation))
 		return;
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3c9b9f5376..2e4e86f409 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -243,6 +243,7 @@ static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
 static void ProcessRepliesIfAny(void);
+static void ProcessPendingWrites(void);
 static void WalSndKeepalive(bool requestReply);
 static void WalSndKeepaliveIfNecessary(void);
 static void WalSndCheckTimeOut(void);
@@ -1190,6 +1191,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	}
 
 	/* If we have pending write here, go to slow path */
+	ProcessPendingWrites();
+}
+
+/*
+ * Wait until there is no pending write. Also process replies from the other
+ * side and check timeouts during that.
+ */
+static void
+ProcessPendingWrites(void)
+{
 	for (;;)
 	{
 		int			wakeEvents;
@@ -1250,24 +1261,44 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * LogicalDecodingContext 'progress_update' callback.
  *
  * Write the current position to the log tracker (see XLogSendPhysical).
+ *
+ * In logical replication, if too many changes are processed then try to send a
+ * keepalive message. It might avoid a timeout in the subscriber.
  */
 static void
 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
 {
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
+	bool end_xact = ctx->end_xact;
 
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
+	 *
+	 * We don't have a mechanism to get the ack for any LSN other than end xact
+	 * LSN from the downstream. So, we track lag only for end of transaction
+	 * LSN.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (!TimestampDifferenceExceeds(sendTime, now,
+	if (end_xact && TimestampDifferenceExceeds(sendTime, now,
 									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
-		return;
+	{
+		LagTrackerWrite(lsn, now);
+		sendTime = now;
+	}
 
-	LagTrackerWrite(lsn, now);
-	sendTime = now;
+	/*
+	 * Try to send a keepalive if required. We don't need to try sending keep
+	 * alive messages at the transaction end as that will be done at a later
+	 * point in time. This is required only for large transactions where we
+	 * don't send any changes to the downstream and the receiver can timeout
+	 * due to that.
+	 */
+	if (!end_xact &&
+		now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
+										   wal_sender_timeout / 2))
+		ProcessPendingWrites();
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7f0e0fa881..b25a935bc0 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -80,6 +80,7 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		accept_writes;
 	bool		prepared_write;
+	bool		end_xact;
 	XLogRecPtr	write_location;
 	TransactionId write_xid;
 } LogicalDecodingContext;
-- 
2.23.0.windows.1

