From a621a8f37d476055773717d7d8d2e5080bcfac0d Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Mon, 7 Mar 2022 10:30:38 +0800
Subject: [PATCH v1] Fix the timeout of subscriber in long transactions.

We don't send keep-alive messages for a long time while processing large
transactions during logical replication where we don't send any data of such
transactions (say because the table modified in the transaction is not
published) and then subscriber will timeout. So in this case, send keepalive
message to the subscriber.
---
 src/backend/replication/logical/logical.c     | 45 +++++++++++++++++--
 .../replication/logical/reorderbuffer.c       | 16 ++++---
 src/backend/replication/pgoutput/pgoutput.c   | 43 +++++++++++++++---
 src/backend/replication/walsender.c           | 35 +++++++++++++--
 src/include/replication/logical.h             |  4 +-
 src/include/replication/output_plugin.h       |  2 +-
 6 files changed, 125 insertions(+), 20 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d..1f4e691722 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -680,15 +680,15 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
- * Update progress tracking (if supported).
+ * Update progress tracking and try to send a keepalive message (if supported).
  */
 void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive)
 {
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, send_keep_alive);
 }
 
 /*
@@ -1930,3 +1930,42 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
 }
+
+/*
+ * Try to send a keepalive message if too many changes was skipped.
+ *
+ * When we loop through changes in a transaction(see ReorderBufferProcessTXN),
+ * if no message is sent to standby for a long time during a large transaction,
+ * we should send a keepalive message to ensure that the standby will not
+ * timeout.
+ */
+void
+SendKeepaliveIfNecessary(LogicalDecodingContext *ctx, bool skipped)
+{
+	static int skipped_changes_count = 0;
+
+	/*
+	 * skipped_changes_count is reset when processing changes that do not
+	 * need to be skipped.
+	 */
+	if (!skipped)
+	{
+		skipped_changes_count = 0;
+		return;
+	}
+
+	/*
+	 * After continuously skipping SKIPPED_CHANGES_THRESHOLD changes, try to send a
+	 * keepalive message.
+	 */
+	#define SKIPPED_CHANGES_THRESHOLD 10000
+
+	if (++skipped_changes_count >= SKIPPED_CHANGES_THRESHOLD)
+	{
+		/* Try to send a keepalive message. */
+		OutputPluginUpdateProgress(ctx, true);
+
+		/* After trying to send a keepalive message, reset the flag. */
+		skipped_changes_count = 0;
+	}
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c2d9be81fa..e6591596c1 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2636,12 +2636,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					break;
 
 				case REORDER_BUFFER_CHANGE_INVALIDATION:
-					/* Execute the invalidation messages locally */
-					ReorderBufferExecuteInvalidations(
-													  change->data.inval.ninvalidations,
-													  change->data.inval.invalidations);
-					break;
+					{
+						LogicalDecodingContext *ctx = rb->private_data;
+
+						/* Try to send a keepalive message. */
+						SendKeepaliveIfNecessary(ctx, true);
 
+						/* Execute the invalidation messages locally */
+						ReorderBufferExecuteInvalidations(
+														change->data.inval.ninvalidations,
+														change->data.inval.invalidations);
+						break;
+					}
 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 					/* get rid of the old */
 					TeardownHistoricSnapshot(false);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f..48d3147a8d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -475,7 +475,7 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -506,7 +506,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -520,7 +520,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -536,7 +536,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1151,7 +1151,11 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *new_slot = NULL;
 
 	if (!is_publishable_relation(relation))
+	{
+		/* Try to send a keepalive message. */
+		SendKeepaliveIfNecessary(ctx, true);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1169,20 +1173,35 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				/* Try to send a keepalive message. */
+				SendKeepaliveIfNecessary(ctx, true);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
+			{
+				/* Try to send a keepalive message. */
+				SendKeepaliveIfNecessary(ctx, true);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
+			{
+				/* Try to send a keepalive message. */
+				SendKeepaliveIfNecessary(ctx, true);
 				return;
+			}
 			break;
 		default:
 			Assert(false);
 	}
 
+	/* Reset the counter for skipped changes. */
+	SendKeepaliveIfNecessary(ctx, false);
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -1397,6 +1416,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		/* Reset the counter for skipped changes. */
+		SendKeepaliveIfNecessary(ctx, false);
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  xid,
@@ -1406,6 +1427,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 								  change->data.truncate.restart_seqs);
 		OutputPluginWrite(ctx, true);
 	}
+	else
+		/* Try to send a keepalive message. */
+		SendKeepaliveIfNecessary(ctx, true);
 
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
@@ -1420,7 +1444,11 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TransactionId xid = InvalidTransactionId;
 
 	if (!data->messages)
+	{
+		/* Try to send a keepalive message. */
+		SendKeepaliveIfNecessary(ctx, true);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the message in streaming mode. See
@@ -1429,6 +1457,9 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = txn->xid;
 
+	/* Reset the counter for skipped changes. */
+	SendKeepaliveIfNecessary(ctx, false);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,
@@ -1598,7 +1629,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1619,7 +1650,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5a718b1fe9..942d2cbcd8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -249,7 +249,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1447,24 +1447,51 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * LogicalDecodingContext 'update_progress' callback.
  *
  * Write the current position to the lag tracker (see XLogSendPhysical).
+ * If send_keep_alive is true, try to send a keepalive message to standby, and
+ * there is no need to track lag (Refer to the purpose of LagTrackerWrite).
  */
 static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive)
 {
+	static TimestampTz trackTime = 0;
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
 
+	if (send_keep_alive)
+	{
+		/*
+		 * If half of wal_sender_timeout has lapsed without send message standby,
+		 * send a keep-alive message to the standby.
+		 */
+		TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+											wal_sender_timeout / 2);
+		if (now >= ping_time)
+		{
+			WalSndKeepalive(false);
+
+			/* Try to flush pending output to the client */
+			if (pq_flush_if_writable() != 0)
+				WalSndShutdown();
+			sendTime = now;
+		}
+
+		/* If try to send a keepalive message, no need track lag. */
+		return;
+	}
+	else
+		sendTime = 0;
+
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (!TimestampDifferenceExceeds(sendTime, now,
+	if (!TimestampDifferenceExceeds(trackTime, now,
 									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
 		return;
 
 	LagTrackerWrite(lsn, now);
-	sendTime = now;
+	trackTime = now;
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799..c87399c767 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
 														 XLogRecPtr Ptr,
-														 TransactionId xid
+														 TransactionId xid,
+														 bool send_keep_alive
 );
 
 typedef struct LogicalDecodingContext
@@ -140,5 +141,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void SendKeepaliveIfNecessary(LogicalDecodingContext *ctx, bool skipped);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76c..ed802b58ef 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -270,6 +270,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive);
 
 #endif							/* OUTPUT_PLUGIN_H */
-- 
2.18.4

