From 001bcf625a0339169cad7599ef4adb2da484b6b3 Mon Sep 17 00:00:00 2001
From: wangw <wangw.fnst@fujitsu.com>
Date: Mon, 7 Nov 2022 11:26:42 +0800
Subject: [PATCH v2] Fix the logical replication timeout during processing of
 DDL.

The problem is when there is a DDL in a transaction that generates lots of
temporary data due to rewrite rules, these temporary data will not be processed
by the pgoutput - plugin. Therefore, the previous fix (f95d53e) for DML had no
impact on this case.

To fix this, we introduced a new optional output plugin callback -
'update_progress_cb'. This callback is called to update the process after each
change has been processed during sending data of a transaction (and its
subtransactions) to the output plugin.

For pgoutput, in this callback we try to update progress and send keep-alive
messages when too many changes have been processed.
---
 doc/src/sgml/logicaldecoding.sgml             | 20 +++-
 src/backend/replication/logical/logical.c     | 52 +++++++++++
 .../replication/logical/reorderbuffer.c       |  2 +
 src/backend/replication/pgoutput/pgoutput.c   | 93 +++++++++----------
 src/include/replication/output_plugin.h       |  9 ++
 src/include/replication/reorderbuffer.h       | 11 +++
 src/tools/pgindent/typedefs.list              |  2 +
 7 files changed, 139 insertions(+), 50 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 4cf863a76f..619cba36d3 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -473,6 +473,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeStreamChangeCB stream_change_cb;
     LogicalDecodeStreamMessageCB stream_message_cb;
     LogicalDecodeStreamTruncateCB stream_truncate_cb;
+    LogicalDecodeUpdateProgressCB update_progress_cb;
 } OutputPluginCallbacks;
 
 typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
@@ -481,8 +482,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>,
      <function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
-     and <function>shutdown_cb</function> are optional.
-     If <function>truncate_cb</function> is not set but a
+     <function>shutdown_cb</function>, and <function>update_progress_cb</function>
+     are optional. If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
 
@@ -1040,6 +1041,21 @@ typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ct
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-update-progress">
+     <title>Update Progress Callback</title>
+
+     <para>
+      The optional <function>update_progress_cb</function> callback is called
+      after handling every change. This callback is to update the process
+      during sending data of a transaction (and its subtransactions) to the
+      output plugin.
+<programlisting>
+typedef void (*LogicalDecodeUpdateProgressCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn);
+</programlisting>
+     </para>
+    </sect3>
+
    </sect2>
 
    <sect2 id="logicaldecoding-output-plugin-output">
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 52d1fe6269..0e5f02c2ca 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,6 +93,11 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
+/* update progress callback */
+static void update_progress_cb_wrapper(ReorderBuffer *cache,
+									   ReorderBufferTXN *txn,
+									   ReorderBufferChange *change);
+
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
 
 /*
@@ -278,6 +283,11 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
 	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
 
+	/*
+	 * Callback to support updating progress.
+	 */
+	ctx->reorder->update_progress = update_progress_cb_wrapper;
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -1582,6 +1592,48 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static void
+update_progress_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   ReorderBufferChange *change)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	if (!ctx->callbacks.update_progress_cb)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "update_progress";
+	state.report_location = change->lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->write_xid = txn->xid;
+
+	/*
+	 * Report this change's lsn so replies from clients can give an up-to-date
+	 * answer. This won't ever be enough (and shouldn't be!) to confirm
+	 * receipt of this transaction, but it might allow another transaction's
+	 * commit to be confirmed with one message.
+	 */
+	ctx->write_location = change->lsn;
+
+	ctx->end_xact = false;
+
+	ctx->callbacks.update_progress_cb(ctx, txn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 54ee824e6c..2e134bd011 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2446,6 +2446,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
 			}
+
+			rb->update_progress(rb, txn, change);
 		}
 
 		/* speculative insertion record must be freed by now */
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 19c10c028f..865384897e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -78,6 +78,8 @@ static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 								   XLogRecPtr commit_lsn);
 static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 										ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
+static void pgoutput_update_progress(LogicalDecodingContext *ctx,
+									 ReorderBufferTXN *txn);
 
 static bool publications_valid;
 static bool in_streaming;
@@ -92,8 +94,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 static void send_repl_origin(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id, XLogRecPtr origin_lsn,
 							 bool send_origin);
-static void update_replication_progress(LogicalDecodingContext *ctx,
-										bool skipped_xact);
 
 /*
  * Only 3 publication actions are used for row filtering ("insert", "update",
@@ -276,6 +276,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_truncate_cb = pgoutput_truncate;
 	/* transaction streaming - two-phase commit */
 	cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
+
+	cb->update_progress_cb = pgoutput_update_progress;
 }
 
 static void
@@ -586,7 +588,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * from this transaction has been sent to the downstream.
 	 */
 	sent_begin_txn = txndata->sent_begin_txn;
-	update_replication_progress(ctx, !sent_begin_txn);
+	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
 	pfree(txndata);
 	txn->output_plugin_private = NULL;
 
@@ -625,7 +627,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -639,7 +641,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -655,7 +657,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1386,8 +1388,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
-	update_replication_progress(ctx, false);
-
 	if (!is_publishable_relation(relation))
 		return;
 
@@ -1622,8 +1622,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	Oid		   *relids;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	/* Remember the xid for the change in streaming mode. See pgoutput_change. */
 	if (in_streaming)
 		xid = change->txn->xid;
@@ -1687,8 +1685,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	TransactionId xid = InvalidTransactionId;
 
-	update_replication_progress(ctx, false);
-
 	if (!data->messages)
 		return;
 
@@ -1888,7 +1884,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1909,12 +1905,47 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	update_replication_progress(ctx, false);
+	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
 }
 
+/*
+ * Update progress callback
+ *
+ * Try to update progress and send a keepalive message if too many changes were
+ * processed.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are either not published or
+ * got filtered out.
+ */
+static void
+pgoutput_update_progress(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+{
+	static int	changes_count = 0;
+
+	/*
+	 * We don't want to try sending a keepalive message after processing each
+	 * change as that can have overhead. Tests revealed that there is no
+	 * noticeable overhead in doing it after continuously processing 100 or so
+	 * changes.
+	 */
+#define CHANGES_THRESHOLD 100
+
+	/*
+	 * After continuously processing CHANGES_THRESHOLD changes, we
+	 * try to send a keepalive message if required.
+	 */
+	if (++changes_count >= CHANGES_THRESHOLD)
+	{
+		OutputPluginUpdateProgress(ctx, false);
+		changes_count = 0;
+	}
+}
+
 /*
  * Initialize the relation schema sync cache for a decoding session.
  *
@@ -2409,37 +2440,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
 		}
 	}
 }
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are either not published or
- * got filtered out.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
-{
-	static int	changes_count = 0;
-
-	/*
-	 * We don't want to try sending a keepalive message after processing each
-	 * change as that can have overhead. Tests revealed that there is no
-	 * noticeable overhead in doing it after continuously processing 100 or so
-	 * changes.
-	 */
-#define CHANGES_THRESHOLD 100
-
-	/*
-	 * If we are at the end of transaction LSN, update progress tracking.
-	 * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
-	 * try to send a keepalive message if required.
-	 */
-	if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
-	{
-		OutputPluginUpdateProgress(ctx, skipped_xact);
-		changes_count = 0;
-	}
-}
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2d89d26586..e877bcac93 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -210,6 +210,12 @@ typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ct
 											   Relation relations[],
 											   ReorderBufferChange *change);
 
+/*
+ * Callback for updating progress.
+ */
+typedef void (*LogicalDecodeUpdateProgressCB) (struct LogicalDecodingContext *ctx,
+											   ReorderBufferTXN *txn);
+
 /*
  * Output plugin callbacks
  */
@@ -240,6 +246,9 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeStreamChangeCB stream_change_cb;
 	LogicalDecodeStreamMessageCB stream_message_cb;
 	LogicalDecodeStreamTruncateCB stream_truncate_cb;
+
+	/* update progress */
+	LogicalDecodeUpdateProgressCB update_progress_cb;
 } OutputPluginCallbacks;
 
 /* Functions in replication/logical/logical.c */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f6c4dd75db..5897b27b42 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -525,6 +525,12 @@ typedef void (*ReorderBufferStreamTruncateCB) (
 											   Relation relations[],
 											   ReorderBufferChange *change);
 
+/* update progress callback signature */
+typedef void (*ReorderBufferUpdateProgressCB) (
+											   ReorderBuffer *rb,
+											   ReorderBufferTXN *txn,
+											   ReorderBufferChange *change);
+
 struct ReorderBuffer
 {
 	/*
@@ -588,6 +594,11 @@ struct ReorderBuffer
 	ReorderBufferStreamMessageCB stream_message;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
+	/*
+	 * Callbacks to be called when updating progress.
+	 */
+	ReorderBufferUpdateProgressCB update_progress;
+
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 23bafec5f7..96d940e05c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1455,6 +1455,7 @@ LogicalDecodeStreamStartCB
 LogicalDecodeStreamStopCB
 LogicalDecodeStreamTruncateCB
 LogicalDecodeTruncateCB
+LogicalDecodeUpdateProgressCB
 LogicalDecodingContext
 LogicalDecodingMode
 LogicalErrorCallbackState
@@ -2310,6 +2311,7 @@ ReorderBufferToastEnt
 ReorderBufferTupleBuf
 ReorderBufferTupleCidEnt
 ReorderBufferTupleCidKey
+ReorderBufferUpdateProgressCB
 ReorderTuple
 RepOriginId
 ReparameterizeForeignPathByChild_function
-- 
2.23.0.windows.1

