From ff9b213d4a605c8237363802899747de9ea38d9f Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Mon, 7 Nov 2022 11:26:42 +0800
Subject: [PATCH v1] Fix the logical replication timeout during processing of
 DDL.

The problem is when there is a DDL in a transaction that generates lots of
temporary data due to rewrite rules, these temporary data will not be processed
by the pgoutput - plugin. Therefore, the previous fix (f95d53e) for DML had no
impact on this case.

To fix this, we try to send the keepalive messages after each change is
processed by walsender, not in the pgoutput - plugin.
---
 .../replication/logical/reorderbuffer.c       | 42 +++++++++++++++
 src/backend/replication/pgoutput/pgoutput.c   | 54 +++----------------
 2 files changed, 48 insertions(+), 48 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 22a15a482a..ee97a36842 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2031,6 +2031,46 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	}
 }
 
+/*
+ * Helper function for ReorderBufferProcessTXN for updating progress.
+ */
+static inline void
+ReorderBufferUpdateProgress(ReorderBuffer *rb, ReorderBufferTXN *txn,
+							ReorderBufferChange *change)
+{
+	LogicalDecodingContext *ctx = rb->private_data;
+	static int	changes_count = 0;
+
+	if (!ctx->update_progress)
+		return;
+
+	Assert(!ctx->fast_forward);
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = change->lsn;
+	ctx->end_xact = false;
+
+	/*
+	 * We don't want to try sending a keepalive message after processing each
+	 * change as that can have overhead. Tests revealed that there is no
+	 * noticeable overhead in doing it after continuously processing 100 or so
+	 * changes.
+	 */
+#define CHANGES_THRESHOLD 100
+
+	/*
+	 * After continuously processing CHANGES_THRESHOLD changes, we
+	 * try to send a keepalive message if required.
+	 */
+	if (++changes_count >= CHANGES_THRESHOLD)
+	{
+		ctx->update_progress(ctx, ctx->write_location, ctx->write_xid, false);
+		changes_count = 0;
+	}
+}
+
 /*
  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
  *
@@ -2419,6 +2459,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
 			}
+
+			ReorderBufferUpdateProgress(rb, txn, change);
 		}
 
 		/* speculative insertion record must be freed by now */
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 2ecaa5b907..6aa12a39e5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -91,8 +91,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
-static void update_replication_progress(LogicalDecodingContext *ctx,
-										bool skipped_xact);
 
 /*
  * Only 3 publication actions are used for row filtering ("insert", "update",
@@ -578,7 +576,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * from this transaction has been sent to the downstream.
 	 */
 	sent_begin_txn = txndata->sent_begin_txn;
-	update_replication_progress(ctx, !sent_begin_txn);
+	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
 	pfree(txndata);
 	txn->output_plugin_private = NULL;
 
@@ -617,7 +615,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -631,7 +629,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -647,7 +645,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1378,8 +1376,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
-	update_replication_progress(ctx, false);
-
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1612,8 +1608,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -1677,8 +1671,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	if (!data->messages)
 		return;
 
@@ -1874,7 +1866,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1895,7 +1887,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -2380,37 +2372,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
 		}
 	}
 }
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are either not published or
- * got filtered out.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
-{
-	static int	changes_count = 0;
-
-	/*
-	 * We don't want to try sending a keepalive message after processing each
-	 * change as that can have overhead. Tests revealed that there is no
-	 * noticeable overhead in doing it after continuously processing 100 or so
-	 * changes.
-	 */
-#define CHANGES_THRESHOLD 100
-
-	/*
-	 * If we are at the end of transaction LSN, update progress tracking.
-	 * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
-	 * try to send a keepalive message if required.
-	 */
-	if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
-	{
-		OutputPluginUpdateProgress(ctx, skipped_xact);
-		changes_count = 0;
-	}
-}
-- 
2.23.0.windows.1

