From f1493845fcc91a355697ae2b10d2061a4eda2c19 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Fri, 18 Mar 2022 13:11:44 +0800
Subject: [PATCH v3] Fix the timeout of subscriber in long transactions.

We don't send keep-alive messages for a long time while processing large
transactions during logical replication where we don't send any data of such
transactions (say because the table modified in the transaction is not
published) and then subscriber will timeout. So in this case, send keepalive
message to the subscriber.
---
 src/backend/replication/logical/logical.c     | 45 +++++++++++++++--
 .../replication/logical/reorderbuffer.c       | 16 ++++--
 src/backend/replication/pgoutput/pgoutput.c   | 50 ++++++++++++++++---
 src/backend/replication/walsender.c           | 35 +++++++++++--
 src/include/replication/logical.h             |  4 +-
 src/include/replication/output_plugin.h       |  2 +-
 6 files changed, 131 insertions(+), 21 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d..ae7ff14c95 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -680,15 +680,15 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 }
 
 /*
- * Update progress tracking (if supported).
+ * Update progress tracking and try to send a keepalive message (if supported).
  */
 void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
+OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive)
 {
 	if (!ctx->update_progress)
 		return;
 
-	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
+	ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, send_keep_alive);
 }
 
 /*
@@ -1930,3 +1930,42 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
 }
+
+/*
+ * Try to send a keepalive message if too many changes were skipped.
+ *
+ * When we loop through changes in a transaction(see ReorderBufferProcessTXN),
+ * if no message is sent to standby for a long time during a large transaction,
+ * we should send a keepalive message to ensure that the standby will not
+ * timeout.
+ */
+void
+UpdateProgress(LogicalDecodingContext *ctx, bool skipped)
+{
+	static int skipped_changes_count = 0;
+
+	/*
+	 * skipped_changes_count is reset when processing changes that do not
+	 * need to be skipped.
+	 */
+	if (!skipped)
+	{
+		skipped_changes_count = 0;
+		return;
+	}
+
+	/*
+	 * After continuously skipping SKIPPED_CHANGES_THRESHOLD changes, try to send a
+	 * keepalive message.
+	 */
+	#define SKIPPED_CHANGES_THRESHOLD 100
+
+	if (++skipped_changes_count >= SKIPPED_CHANGES_THRESHOLD)
+	{
+		/* Try to send a keepalive message. */
+		OutputPluginUpdateProgress(ctx, true);
+
+		/* After trying to send a keepalive message, reset the flag. */
+		skipped_changes_count = 0;
+	}
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c2d9be81fa..d173234879 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2636,12 +2636,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					break;
 
 				case REORDER_BUFFER_CHANGE_INVALIDATION:
-					/* Execute the invalidation messages locally */
-					ReorderBufferExecuteInvalidations(
-													  change->data.inval.ninvalidations,
-													  change->data.inval.invalidations);
-					break;
+					{
+						LogicalDecodingContext *ctx = rb->private_data;
+
+						/* Try to send a keepalive message. */
+						UpdateProgress(ctx, true);
 
+						/* Execute the invalidation messages locally */
+						ReorderBufferExecuteInvalidations(
+														change->data.inval.ninvalidations,
+														change->data.inval.invalidations);
+						break;
+					}
 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 					/* get rid of the old */
 					TeardownHistoricSnapshot(false);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d869f3e93e..b3a96203ae 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -475,7 +475,7 @@ static void
 pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -506,7 +506,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -520,7 +520,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -536,7 +536,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1149,9 +1149,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
+	bool is_send = false;
 
 	if (!is_publishable_relation(relation))
+	{
+		/* Try to send a keepalive message. */
+		UpdateProgress(ctx, true);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1169,15 +1174,27 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
 			if (!relentry->pubactions.pubinsert)
+			{
+				/* Try to send a keepalive message. */
+				UpdateProgress(ctx, true);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (!relentry->pubactions.pubupdate)
+			{
+				/* Try to send a keepalive message. */
+				UpdateProgress(ctx, true);
 				return;
+			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (!relentry->pubactions.pubdelete)
+			{
+				/* Try to send a keepalive message. */
+				UpdateProgress(ctx, true);
 				return;
+			}
 			break;
 		default:
 			Assert(false);
@@ -1226,6 +1243,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
 									data->binary);
 			OutputPluginWrite(ctx, true);
+			is_send = true;
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
 			if (change->data.tp.oldtuple)
@@ -1293,6 +1311,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			}
 
 			OutputPluginWrite(ctx, true);
+			is_send = true;
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (change->data.tp.oldtuple)
@@ -1330,6 +1349,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				logicalrep_write_delete(ctx->out, xid, targetrel,
 										old_slot, data->binary);
 				OutputPluginWrite(ctx, true);
+				is_send = true;
 			}
 			else
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
@@ -1338,6 +1358,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	/*
+	 * Reset the counter for skipped changes if is_send is true, otherwise try to
+	 * send a keepalive message.
+	 */
+	UpdateProgress(ctx, !is_send);
+
 	if (RelationIsValid(ancestor))
 	{
 		RelationClose(ancestor);
@@ -1397,6 +1423,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	if (nrelids > 0)
 	{
+		/* Reset the counter for skipped changes. */
+		UpdateProgress(ctx, false);
 		OutputPluginPrepareWrite(ctx, true);
 		logicalrep_write_truncate(ctx->out,
 								  xid,
@@ -1406,6 +1434,9 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 								  change->data.truncate.restart_seqs);
 		OutputPluginWrite(ctx, true);
 	}
+	else
+		/* Try to send a keepalive message. */
+		UpdateProgress(ctx, true);
 
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
@@ -1420,7 +1451,11 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TransactionId xid = InvalidTransactionId;
 
 	if (!data->messages)
+	{
+		/* Try to send a keepalive message. */
+		UpdateProgress(ctx, true);
 		return;
+	}
 
 	/*
 	 * Remember the xid for the message in streaming mode. See
@@ -1429,6 +1464,9 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = txn->xid;
 
+	/* Reset the counter for skipped changes. */
+	UpdateProgress(ctx, false);
+
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_message(ctx->out,
 							 xid,
@@ -1598,7 +1636,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1619,7 +1657,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2d0292a092..3ca3270e8a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -249,7 +249,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1447,24 +1447,49 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
  * LogicalDecodingContext 'update_progress' callback.
  *
  * Write the current position to the lag tracker (see XLogSendPhysical).
+ * If send_keep_alive is true, try to send a keepalive message to standby, and
+ * there is no need to track lag (Refer to the purpose of LagTrackerWrite).
  */
 static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive)
 {
-	static TimestampTz sendTime = 0;
+	static TimestampTz trackTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
 
+	if (send_keep_alive)
+	{
+		/*
+		 * Check for input from the client.
+		 *
+		 * If the standby does not receive any message from the primary for more than
+		 * (wal_receiver_timeout / 2), the standby will send a message requesting a
+		 * reply to the primary. If receive this message, reply immediately to avoid
+		 * timeout.
+		 */
+		ProcessRepliesIfAny();
+
+		/* die if timeout was reached */
+		WalSndCheckTimeOut();
+
+		/* Send keepalive if the time has come */
+		WalSndKeepaliveIfNecessary();
+
+		/* Try to flush pending output to the client */
+		if (pq_flush_if_writable() != 0)
+			WalSndShutdown();
+	}
+
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
 	 * avoid flooding the lag tracker when we commit frequently.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (!TimestampDifferenceExceeds(sendTime, now,
+	if (!TimestampDifferenceExceeds(trackTime, now,
 									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
 		return;
 
 	LagTrackerWrite(lsn, now);
-	sendTime = now;
+	trackTime = now;
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799..1d8ab2a56b 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -26,7 +26,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 
 typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
 														 XLogRecPtr Ptr,
-														 TransactionId xid
+														 TransactionId xid,
+														 bool send_keep_alive
 );
 
 typedef struct LogicalDecodingContext
@@ -140,5 +141,6 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void UpdateProgress(LogicalDecodingContext *ctx, bool skipped);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76c..ed802b58ef 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -270,6 +270,6 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx);
+extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool send_keep_alive);
 
 #endif							/* OUTPUT_PLUGIN_H */
-- 
2.18.4

