From 78b967c2cfa2110ad1a51a582b5cf894ec67c285 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Mon, 28 Mar 2022 01:36:55 -0400
Subject: [PATCH v8] Fix the logical replication timeout during large
 transactions.

The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published or because all the changes
got filtered. We do try to send the keep_alive if necessary at the end of
the transaction (via WalSndWriteData()) but by that time the
subscriber-side can timeout and exit.

To fix this we try to send the keepalive message if required after
skipping certain threshold of changes.
---
 src/backend/replication/logical/logical.c   |  6 +-
 src/backend/replication/pgoutput/pgoutput.c | 88 +++++++++++++++++++--
 src/backend/replication/walsender.c         | 46 +++++++++--
 src/include/replication/logical.h           |  3 +-
 src/include/replication/output_plugin.h     |  2 +-
 5 files changed, 127 insertions(+), 18 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d..922b16c7c8 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -680,15 +680,15 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
- * Update progress tracking (if supported).
+ * Update progress tracking and try to send a keepalive message (if supported).
  */
 void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool last_write)
 {
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, last_write);
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 893833ea83..a86326e75c 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -96,6 +96,7 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
+static void update_progress(LogicalDecodingContext *ctx, bool last_write);
 
 /*
  * Only 3 publication actions are used for row filtering ("insert", "update",
@@ -511,7 +512,7 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -542,7 +543,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -556,7 +557,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -572,7 +573,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1303,9 +1304,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
+	bool		change_sent = false;
 
 	if (!is_publishable_relation(relation))
+	{
+		update_progress(ctx, false);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1323,15 +1328,24 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				update_progress(ctx, false);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
+			{
+				update_progress(ctx, false);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
+			{
+				update_progress(ctx, false);
 				return;
+			}
 			break;
 		default:
 			Assert(false);
@@ -1380,6 +1394,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
 									data->binary, relentry->columns);
 			OutputPluginWrite(ctx, true);
+			change_sent = true;
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (change->data.tp.oldtuple)
@@ -1449,6 +1464,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			}
 
 			OutputPluginWrite(ctx, true);
+			change_sent = true;
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (change->data.tp.oldtuple)
@@ -1486,6 +1502,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				logicalrep_write_delete(ctx->out, xid, targetrel,
 										old_slot, data->binary);
 				OutputPluginWrite(ctx, true);
+				change_sent = true;
 			}
 			else
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
@@ -1494,6 +1511,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	update_progress(ctx, change_sent);
+
 	if (RelationIsValid(ancestor))
 	{
 		RelationClose(ancestor);
@@ -1561,6 +1580,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 								  change->data.truncate.cascade,
 								  change->data.truncate.restart_seqs);
 		OutputPluginWrite(ctx, true);
+		update_progress(ctx, true);
+	}
+	else
+	{
+		update_progress(ctx, false);
 	}
 
 	MemoryContextSwitchTo(old);
@@ -1576,7 +1600,10 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TransactionId xid = InvalidTransactionId;
 
 	if (!data->messages)
+	{
+		update_progress(ctx, false);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the message in streaming mode. See
@@ -1594,6 +1621,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 sz,
 							 message);
 	OutputPluginWrite(ctx, true);
+	update_progress(ctx, true);
 }
 
 static void
@@ -1607,10 +1635,16 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
 	RelationSyncEntry *relentry;
 
 	if (!data->sequences)
+	{
+		update_progress(ctx, false);
 		return;
+	}
 
 	if (!is_publishable_relation(relation))
+	{
+		update_progress(ctx, false);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the message in streaming mode. See
@@ -1627,7 +1661,10 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
 	 * We handle just REORDER_BUFFER_CHANGE_SEQUENCE here.
 	 */
 	if (!relentry->pubactions.pubsequence)
+	{
+		update_progress(ctx, false);
 		return;
+	}
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_sequence(ctx->out,
@@ -1639,6 +1676,7 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
 							  log_cnt,
 							  is_called);
 	OutputPluginWrite(ctx, true);
+	update_progress(ctx, true);
 }
 
 /*
@@ -1799,7 +1837,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, true);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1820,7 +1858,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, true);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -2318,3 +2356,41 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
 		}
 	}
 }
+
+/*
+ * Try to update progress and send a keepalive message if too many changes were
+ * skipped.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time then it can timeout. This can happen when all or most of the
+ * changes are either not published or got filtered out.
+ */
+static void
+update_progress(LogicalDecodingContext *ctx, bool last_write)
+{
+	static int	skipped_changes_count = 0;
+
+	/* reset the skipped count after sending a change */
+	if (last_write)
+	{
+		skipped_changes_count = 0;
+		return;
+	}
+
+	/*
+	 * After continuously skipping SKIPPED_CHANGES_THRESHOLD changes, update
+	 * progress which will also try to send a keepalive message if required.
+	 *
+	 * We don't want to try sending a keepalive message or updating progress
+	 * after skipping each change as that can have overhead. Testing reveals
+	 * that there is no noticeable overhead in doing it after continuously
+	 * skipping 100 or so changes.
+	 */
+#define SKIPPED_CHANGES_THRESHOLD 100
+
+	if (++skipped_changes_count >= SKIPPED_CHANGES_THRESHOLD)
+	{
+		OutputPluginUpdateProgress(ctx, false);
+		skipped_changes_count = 0;
+	}
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2d0292a092..bfd6e7ce4e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -249,7 +249,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1447,9 +1447,13 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * LogicalDecodingContext 'update_progress' callback.
  *
  * Write the current position to the lag tracker (see XLogSendPhysical).
+ *
+  * If the last write is skipped then try to send a keepalive message to
+  * receiver to avoid timeouts.
  */
 static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+					 bool last_write)
 {
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
@@ -1459,12 +1463,40 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 	 * avoid flooding the lag tracker when we commit frequently.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (!TimestampDifferenceExceeds(sendTime, now,
-									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
-		return;
+	if (TimestampDifferenceExceeds(sendTime, now,
+								   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+	{
+		LagTrackerWrite(lsn, now);
+		sendTime = now;
+	}
 
-	LagTrackerWrite(lsn, now);
-	sendTime = now;
+	/* try to send a keepalive if required */
+	if (!last_write)
+	{
+		/*
+		 * We don't need to try sending keepalive unless we get too close to
+		 * walsender timeout.
+		 */
+		if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
+											  wal_sender_timeout / 2))
+			return;
+
+		/* Check for input from the client. */
+		ProcessRepliesIfAny();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut();
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary();
+
+		if (!pq_is_send_pending())
+			return;
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+	}
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799..2c27ed6e50 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
 														 XLogRecPtr Ptr,
-														 TransactionId xid
+														 TransactionId xid,
+														 bool last_write
 );
 
 typedef struct LogicalDecodingContext
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76c..3659e5a93a 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -270,6 +270,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool last_write);
 
 #endif							/* OUTPUT_PLUGIN_H */
-- 
2.27.0

