From 45b191eba494ae61fab47660a20587ef760271f7 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Mon, 28 Feb 2022 14:20:00 +0800
Subject: [PATCH] Fix the timeout of subscriber in long transactions.

We don't send keep-alive messages for a long time while processing large
transactions during logical replication where we don't send any data of such
transactions (say because the table modified in the transaction is not
published) and then subscriber will timeout. So in this case, send keepalive
message to the subscriber.
---
 src/backend/replication/logical/logical.c     | 43 ++++++++++++++++++-
 .../replication/logical/reorderbuffer.c       | 22 +++++++---
 src/backend/replication/pgoutput/pgoutput.c   | 31 ++++++++++---
 src/backend/replication/walsender.c           | 31 +++++++++++--
 src/include/replication/logical.h             |  4 +-
 src/include/replication/output_plugin.h       |  2 +-
 6 files changed, 114 insertions(+), 19 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d..7ea6397c17 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -683,12 +683,12 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
  * Update progress tracking (if supported).
  */
 void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive)
 {
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, send_keep_alive);
 }
 
 /*
@@ -1930,3 +1930,42 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
 }
+
+/*
+ * Try to send keepalive message if too many changes was skipped.
+ *
+ * When we loop through changes in a transaction(see ReorderBufferProcessTXN),
+ * if no message is sent to standby for a long time during a large transaction,
+ * we should send a keepalive message to ensure that the standby will not
+ * timeout.
+ */
+void
+SendKeepaliveIfNecessary(LogicalDecodingContext *ctx, bool skipped)
+{
+	static int skipped_changes_count = 0;
+
+	/*
+	 * skipped_changes_count is reset when processing changes that do not
+	 * need to be skipped.
+	 */
+	if (!skipped)
+	{
+		skipped_changes_count = 0;
+		return;
+	}
+
+	/*
+	* After continuously skipping SKIPPED_CHANGES_THRESHOLD changes, try to send a
+	* keepalive message.
+	*/
+	#define SKIPPED_CHANGES_THRESHOLD 10000
+
+	if (++skipped_changes_count >= SKIPPED_CHANGES_THRESHOLD)
+	{
+		/* Try to send keepalive message. */
+		OutputPluginUpdateProgress(ctx, true);
+
+		/* After trying to send keepalive message, reset the flag. */
+		skipped_changes_count = 0;
+	}
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c2d9be81fa..fa6c14a7c2 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2636,12 +2636,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					break;
 
 				case REORDER_BUFFER_CHANGE_INVALIDATION:
-					/* Execute the invalidation messages locally */
-					ReorderBufferExecuteInvalidations(
-													  change->data.inval.ninvalidations,
-													  change->data.inval.invalidations);
-					break;
+					{
+						LogicalDecodingContext *ctx = rb->private_data;
+
+						Assert(!ctx->fast_forward);
+
+						/* set output state */
+						ctx->accept_writes = true;
+						ctx->write_xid = txn->xid;
+						ctx->write_location = change->lsn;
 
+						SendKeepaliveIfNecessary(ctx, true);
+
+						/* Execute the invalidation messages locally */
+						ReorderBufferExecuteInvalidations(
+														change->data.inval.ninvalidations,
+														change->data.inval.invalidations);
+						break;
+					}
 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 					/* get rid of the old */
 					TeardownHistoricSnapshot(false);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f..f164838aa4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -475,7 +475,7 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -506,7 +506,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -520,7 +520,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -536,7 +536,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1169,20 +1169,31 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				SendKeepaliveIfNecessary(ctx, true);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
+			{
+				SendKeepaliveIfNecessary(ctx, true);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
+			{
+				SendKeepaliveIfNecessary(ctx, true);
 				return;
+			}
 			break;
 		default:
 			Assert(false);
 	}
 
+	SendKeepaliveIfNecessary(ctx, false);
+
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -1397,6 +1408,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		SendKeepaliveIfNecessary(ctx, false);
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  xid,
@@ -1406,6 +1418,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 								  change->data.truncate.restart_seqs);
 		OutputPluginWrite(ctx, true);
 	}
+	else
+		SendKeepaliveIfNecessary(ctx, true);
 
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
@@ -1420,7 +1434,10 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TransactionId xid = InvalidTransactionId;
 
 	if (!data->messages)
+	{
+		SendKeepaliveIfNecessary(ctx, true);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the message in streaming mode. See
@@ -1429,6 +1446,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = txn->xid;
 
+	SendKeepaliveIfNecessary(ctx, false);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,
@@ -1598,7 +1617,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1619,7 +1638,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5a718b1fe9..b6599292bb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -249,7 +249,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1447,24 +1447,47 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * LogicalDecodingContext 'update_progress' callback.
  *
  * Write the current position to the lag tracker (see XLogSendPhysical).
+ * If send_keep_alive is true, try to send keepalive message to standby.
  */
 static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive)
 {
+	static TimestampTz trackTime = 0;
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
 
+	if (send_keep_alive)
+	{
+		/*
+		 * If half of wal_sender_timeout has lapsed without send message standby,
+		 * send a keep-alive message to the standby.
+		 */
+		TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+											wal_sender_timeout / 2);
+		if (now >= ping_time)
+		{
+			WalSndKeepalive(false);
+
+			/* Try to flush pending output to the client */
+			if (pq_flush_if_writable() != 0)
+				WalSndShutdown();
+			sendTime = now;
+		}
+	}
+	else
+		sendTime = 0;
+
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (!TimestampDifferenceExceeds(sendTime, now,
+	if (!TimestampDifferenceExceeds(trackTime, now,
 									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
 		return;
 
 	LagTrackerWrite(lsn, now);
-	sendTime = now;
+	trackTime = now;
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799..c87399c767 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
 														 XLogRecPtr Ptr,
-														 TransactionId xid
+														 TransactionId xid,
+														 bool send_keep_alive
 );
 
 typedef struct LogicalDecodingContext
@@ -140,5 +141,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void SendKeepaliveIfNecessary(LogicalDecodingContext *ctx, bool skipped);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76c..ed802b58ef 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -270,6 +270,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive);
 
 #endif							/* OUTPUT_PLUGIN_H */
-- 
2.18.4

