From 445e9fb4892dc5d67b94ab41ab1929f83e23492c Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 8 Feb 2023 11:57:37 -0800
Subject: [PATCH v7 5/6] Rework
 LogicalOutputPluginWriterUpdateProgressAndKeepalive

Currently whenever we need to add a callback to the output plugin, the
output plugin needs to confirm whether it needs to call
WalSndUpdateProgressAndKeepalive or not. As the number of callbacks
increases, this framework becomes less maintainable. So we decided to
improve this by calling WalSndUpdateProgressAndKeepalive from the
common code, rather than in the output plugins.

We introduced LogicalDecodingContext->did_write to confirm whether the
output plugins actually sends data or not. We use this flag to check
if we should send a keepalive message to the subscriber prevent delays,
when an empty transaction is skipped in synchronous logical replication.

Since we can know whether the change is an end of transaction change in
the common code, we removed the LogicalDecodingContext->end_xact
introduced in commit f95d53e.

In commit 8c58624, in order to fix logical replication timeout during
large DDLs, we tried to send the keepalive message to the subscriber for
all change types in the function ReorderBufferProcessTXN. Now, we only
handle the timeout-related issues in the common change type (see function
is_keepalive_threshold_exceeded).
---
 src/backend/replication/logical/decode.c      |   3 +
 src/backend/replication/logical/logical.c     | 240 +++++++++++-------
 .../replication/logical/reorderbuffer.c       |  33 +--
 src/backend/replication/pgoutput/pgoutput.c   |  10 -
 src/backend/replication/walsender.c           |  27 +-
 src/include/replication/logical.h             |  10 +-
 src/include/replication/output_plugin.h       |   1 -
 src/include/replication/reorderbuffer.h       |  12 -
 src/tools/pgindent/typedefs.list              |   1 -
 9 files changed, 194 insertions(+), 143 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1..53cfc13a93 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -666,6 +666,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 		}
 		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
 
+		UpdateDecodingProgressAndKeepalive(ctx, xid, buf->endptr, true);
 		return;
 	}
 
@@ -763,6 +764,8 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	{
 		ReorderBufferSkipPrepare(ctx->reorder, xid);
 		ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
+
+		UpdateDecodingProgressAndKeepalive(ctx, xid, buf->endptr, true);
 		return;
 	}
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index a33b14792b..187c03eb5d 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -93,10 +93,10 @@ static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *tx
 static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 									   int nrelations, Relation relations[], ReorderBufferChange *change);
 
-/* callback to update txn's progress */
-static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
-										   ReorderBufferTXN *txn,
-										   XLogRecPtr lsn);
+static void UpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+									   bool finished_xact);
+
+static bool is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
 
@@ -283,12 +283,6 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
 	ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
 
-	/*
-	 * Callback to support updating progress during sending data of a
-	 * transaction (and its subtransactions) to the output plugin.
-	 */
-	ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
-
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -662,7 +656,8 @@ void
 OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
 {
 	if (!ctx->accept_writes)
-		elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
+		elog(ERROR, "writes are only accepted in output plugin callbacks, "
+			 "except startup, shutdown, filter_by_origin, and filter_prepare");
 
 	ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
 	ctx->prepared_write = true;
@@ -679,20 +674,7 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
 
 	ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
 	ctx->prepared_write = false;
-}
-
-/*
- * Update progress tracking (if supported).
- */
-void
-OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
-						   bool skipped_xact)
-{
-	if (!ctx->update_progress_and_keepalive)
-		return;
-
-	ctx->update_progress_and_keepalive(ctx, ctx->write_location, ctx->write_xid,
-									   skipped_xact);
+	ctx->did_write = true;
 }
 
 /*
@@ -759,13 +741,14 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
 
 	/* set output state */
 	ctx->accept_writes = false;
-	ctx->end_xact = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.startup_cb(ctx, opt, is_init);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	/* No progress has been made, so don't call UpdateProgressAndKeepalive */
 }
 
 static void
@@ -787,13 +770,14 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
 
 	/* set output state */
 	ctx->accept_writes = false;
-	ctx->end_xact = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.shutdown_cb(ctx);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	/* No progress has been made, so don't call UpdateProgressAndKeepalive */
 }
 
 
@@ -823,7 +807,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->first_lsn;
-	ctx->end_xact = false;
+	ctx->did_write = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.begin_cb(ctx, txn);
@@ -855,13 +839,15 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 /*
@@ -896,7 +882,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->first_lsn;
-	ctx->end_xact = false;
+	ctx->did_write = false;
 
 	/*
 	 * If the plugin supports two-phase commits then begin prepare callback is
@@ -941,7 +927,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/*
 	 * If the plugin supports two-phase commits then prepare callback is
@@ -958,6 +944,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -986,7 +974,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/*
 	 * If the plugin support two-phase commits then commit prepared callback
@@ -1003,6 +991,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -1032,7 +1022,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn; /* points to the end of the record */
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/*
 	 * If the plugin support two-phase commits then rollback prepared callback
@@ -1050,6 +1040,8 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -1074,6 +1066,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this change's lsn so replies from clients can give an up-to-date
@@ -1083,12 +1076,13 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
-	ctx->end_xact = false;
-
 	ctx->callbacks.change_cb(ctx, txn, relation, change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 static void
@@ -1102,7 +1096,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	Assert(!ctx->fast_forward);
 
 	if (!ctx->callbacks.truncate_cb)
-		return;
+		goto out;
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1116,6 +1110,7 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this change's lsn so replies from clients can give an up-to-date
@@ -1125,12 +1120,14 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
-	ctx->end_xact = false;
-
 	ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+out:
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 bool
@@ -1154,7 +1151,6 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
 
 	/* set output state */
 	ctx->accept_writes = false;
-	ctx->end_xact = false;
 
 	/* do the actual work: call callback */
 	ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1162,6 +1158,8 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 
+	/* No progress has been made, so don't call UpdateProgressAndKeepalive */
+
 	return ret;
 }
 
@@ -1185,7 +1183,6 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 
 	/* set output state */
 	ctx->accept_writes = false;
-	ctx->end_xact = false;
 
 	/* do the actual work: call callback */
 	ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1193,6 +1190,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
 
+	/* No progress has been made, so don't call UpdateProgressAndKeepalive */
+
 	return ret;
 }
 
@@ -1208,7 +1207,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	Assert(!ctx->fast_forward);
 
 	if (ctx->callbacks.message_cb == NULL)
-		return;
+		goto out;
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1223,7 +1222,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
 	ctx->write_location = message_lsn;
-	ctx->end_xact = false;
+	ctx->did_write = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1231,6 +1230,10 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+out:
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 static void
@@ -1258,6 +1261,7 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this message's lsn so replies from clients can give an
@@ -1267,8 +1271,6 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = first_lsn;
 
-	ctx->end_xact = false;
-
 	/* in streaming mode, stream_start_cb is required */
 	if (ctx->callbacks.stream_start_cb == NULL)
 		ereport(ERROR,
@@ -1280,6 +1282,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	/* No progress has been made, so don't call UpdateProgressAndKeepalive */
 }
 
 static void
@@ -1307,6 +1311,7 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this message's lsn so replies from clients can give an
@@ -1316,8 +1321,6 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = last_lsn;
 
-	ctx->end_xact = false;
-
 	/* in streaming mode, stream_stop_cb is required */
 	if (ctx->callbacks.stream_stop_cb == NULL)
 		ereport(ERROR,
@@ -1329,6 +1332,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	/* No progress has been made, so don't call UpdateProgressAndKeepalive */
 }
 
 static void
@@ -1357,7 +1362,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = abort_lsn;
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/* in streaming mode, stream_abort_cb is required */
 	if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1370,6 +1375,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, (txn->toptxn == NULL));
 }
 
 static void
@@ -1402,7 +1409,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn;
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/* in streaming mode with two-phase commits, stream_prepare_cb is required */
 	if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1415,6 +1422,8 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -1443,7 +1452,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
 	ctx->write_location = txn->end_lsn;
-	ctx->end_xact = true;
+	ctx->did_write = false;
 
 	/* in streaming mode, stream_commit_cb is required */
 	if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1456,6 +1465,8 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	UpdateProgressAndKeepalive(ctx, true);
 }
 
 static void
@@ -1483,6 +1494,7 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this change's lsn so replies from clients can give an up-to-date
@@ -1492,8 +1504,6 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
-	ctx->end_xact = false;
-
 	/* in streaming mode, stream_change_cb is required */
 	if (ctx->callbacks.stream_change_cb == NULL)
 		ereport(ERROR,
@@ -1505,6 +1515,9 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 static void
@@ -1523,7 +1536,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* this callback is optional */
 	if (ctx->callbacks.stream_message_cb == NULL)
-		return;
+		goto out;
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1538,7 +1551,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	ctx->accept_writes = true;
 	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
 	ctx->write_location = message_lsn;
-	ctx->end_xact = false;
+	ctx->did_write = false;
 
 	/* do the actual work: call callback */
 	ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1546,6 +1559,10 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+out:
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
 static void
@@ -1564,7 +1581,7 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 
 	/* this callback is optional */
 	if (!ctx->callbacks.stream_truncate_cb)
-		return;
+		goto out;
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
@@ -1578,6 +1595,7 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	/* set output state */
 	ctx->accept_writes = true;
 	ctx->write_xid = txn->xid;
+	ctx->did_write = false;
 
 	/*
 	 * Report this change's lsn so replies from clients can give an up-to-date
@@ -1587,51 +1605,80 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	 */
 	ctx->write_location = change->lsn;
 
-	ctx->end_xact = false;
-
 	ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
 
 	/* Pop the error context stack */
 	error_context_stack = errcallback.previous;
+
+out:
+	if (is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, false);
 }
 
-static void
-update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
-							   XLogRecPtr lsn)
+/*
+ * Helper function to check for continuous skipping of many changes without
+ * sending them to the output plugin.
+ */
+static bool
+is_keepalive_threshold_exceeded(LogicalDecodingContext *ctx)
 {
-	LogicalDecodingContext *ctx = cache->private_data;
-	LogicalErrorCallbackState state;
-	ErrorContextCallback errcallback;
-
-	Assert(!ctx->fast_forward);
-
-	/* Push callback + info on the error context stack */
-	state.ctx = ctx;
-	state.callback_name = "update_progress_txn";
-	state.report_location = lsn;
-	errcallback.callback = output_plugin_error_callback;
-	errcallback.arg = (void *) &state;
-	errcallback.previous = error_context_stack;
-	error_context_stack = &errcallback;
+	/*
+	 * The counter for accumulating the number of consecutively skipped
+	 * changes.
+	 *
+	 * XXX This counter is not reset at the end of a transaction. The worst
+	 * case this leads to is a delay in sending the keepalive message for the
+	 * next transaction. But testing shows that using CHANGES_THRESHOLD (see
+	 * below) is safe enough.
+	 */
+	static int	changes_count = 0;
 
-	/* set output state */
-	ctx->accept_writes = false;
-	ctx->write_xid = txn->xid;
+	/* If the change was published, reset the counter and return false */
+	if (ctx->did_write)
+	{
+		changes_count = 0;
+		return false;
+	}
 
 	/*
-	 * Report this change's lsn so replies from clients can give an up-to-date
-	 * answer. This won't ever be enough (and shouldn't be!) to confirm
-	 * receipt of this transaction, but it might allow another transaction's
-	 * commit to be confirmed with one message.
+	 * It is possible that the data is not sent to downstream for a long time
+	 * either because the output plugin filtered it or there is a DDL that
+	 * generates a lot of data that is not processed by the plugin. So, in
+	 * such cases, the downstream can timeout. To avoid that we try to send a
+	 * keepalive message if required.  Trying to send a keepalive message
+	 * after every change has some overhead, but testing showed there is no
+	 * noticeable overhead if we do it after every ~100 changes.
 	 */
-	ctx->write_location = lsn;
+#define CHANGES_THRESHOLD 100
+	if (++changes_count >= CHANGES_THRESHOLD)
+	{
+		changes_count = 0;
+		return true;
+	}
 
-	ctx->end_xact = false;
+	return false;
+}
 
-	OutputPluginUpdateProgress(ctx, false);
+/*
+ * Update progress tracking and send keepalive (if required).
+ *
+ * We should update progress in time when the transaction is completed, and
+ * send keepalive messages if required to prevent the subscriber from timing
+ * out during the decoding process.
+ *
+ * Note: We ignore timeout handling on some code paths. This is because the
+ * harm of this handling due to the increased overhead is higher than the
+ * benefit.
+ */
+static void
+UpdateProgressAndKeepalive(LogicalDecodingContext *ctx, bool finished_xact)
+{
+	if (!ctx->update_progress_and_keepalive)
+		return;
 
-	/* Pop the error context stack */
-	error_context_stack = errcallback.previous;
+	ctx->update_progress_and_keepalive(ctx, ctx->write_location,
+									   ctx->write_xid, ctx->did_write,
+									   finished_xact);
 }
 
 /*
@@ -1922,3 +1969,26 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
 }
+
+/*
+ * UpdateDecodingProgressAndKeepalive
+ *
+ * During the logical decoding process, when no data is sent to the subscriber
+ * due to skipping the entire transaction or processing unlogged table data
+ * and temporary data, try to update progress and send a keepalive message.
+ */
+void
+UpdateDecodingProgressAndKeepalive(LogicalDecodingContext *ctx,
+								   TransactionId xid,
+								   XLogRecPtr lsn,
+								   bool finished_xact)
+{
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->write_xid = xid;
+	ctx->write_location = lsn;
+	ctx->did_write = false;
+
+	if (finished_xact || is_keepalive_threshold_exceeded(ctx))
+		UpdateProgressAndKeepalive(ctx, finished_xact);
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2d17c551a8..161531db0e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2110,8 +2110,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	PG_TRY();
 	{
 		ReorderBufferChange *change;
-		int			changes_count = 0;	/* used to accumulate the number of
-										 * changes */
 
 		if (using_subtxn)
 			BeginInternalSubTransaction(streaming ? "stream" : "replay");
@@ -2226,14 +2224,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										 MAIN_FORKNUM));
 
 					if (!RelationIsLogicallyLogged(relation))
+					{
+						UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *) rb->private_data,
+														   txn->xid,
+														   change->lsn, false);
 						goto change_done;
+					}
 
 					/*
 					 * Ignore temporary heaps created during DDL unless the
 					 * plugin has asked for them.
 					 */
 					if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+					{
+						UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *) rb->private_data,
+														   txn->xid,
+														   change->lsn, false);
 						goto change_done;
+					}
 
 					/*
 					 * For now ignore sequence changes entirely. Most of the
@@ -2452,24 +2460,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
 			}
-
-			/*
-			 * It is possible that the data is not sent to downstream for a
-			 * long time either because the output plugin filtered it or there
-			 * is a DDL that generates a lot of data that is not processed by
-			 * the plugin. So, in such cases, the downstream can timeout. To
-			 * avoid that we try to send a keepalive message if required.
-			 * Trying to send a keepalive message after every change has some
-			 * overhead, but testing showed there is no noticeable overhead if
-			 * we do it after every ~100 changes.
-			 */
-#define CHANGES_THRESHOLD 100
-
-			if (++changes_count >= CHANGES_THRESHOLD)
-			{
-				rb->update_progress_txn(rb, txn, change->lsn);
-				changes_count = 0;
-			}
 		}
 
 		/* speculative insertion record must be freed by now */
@@ -2926,6 +2916,9 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 			ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
 											   txn->invalidations);
 	}
+	else
+		UpdateDecodingProgressAndKeepalive((LogicalDecodingContext *)rb->private_data,
+										   xid, lsn, (txn->toptxn == NULL));
 
 	/* cosmetic... */
 	txn->final_lsn = lsn;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00a2d73dab..4791107837 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -593,7 +593,6 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	 * from this transaction has been sent to the downstream.
 	 */
 	sent_begin_txn = txndata->sent_begin_txn;
-	OutputPluginUpdateProgress(ctx, !sent_begin_txn);
 	pfree(txndata);
 	txn->output_plugin_private = NULL;
 
@@ -632,8 +631,6 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					 XLogRecPtr prepare_lsn)
 {
-	OutputPluginUpdateProgress(ctx, false);
-
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
@@ -646,8 +643,6 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 							 XLogRecPtr commit_lsn)
 {
-	OutputPluginUpdateProgress(ctx, false);
-
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
@@ -662,8 +657,6 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
 							   XLogRecPtr prepare_end_lsn,
 							   TimestampTz prepare_time)
 {
-	OutputPluginUpdateProgress(ctx, false);
-
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
 									   prepare_time);
@@ -1908,8 +1901,6 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
 	Assert(!in_streaming);
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx, false);
-
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
 	OutputPluginWrite(ctx, true);
@@ -1929,7 +1920,6 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
 	Assert(rbtxn_is_streamed(txn));
 
-	OutputPluginUpdateProgress(ctx, false);
 	OutputPluginPrepareWrite(ctx, true);
 	logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
 	OutputPluginWrite(ctx, true);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 7ac1d7430e..d72c8f7bb8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -250,8 +250,11 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
-											 bool skipped_xact);
+static void WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx,
+											 XLogRecPtr lsn,
+											 TransactionId xid,
+											 bool did_write,
+											 bool finished_xact);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1462,16 +1465,17 @@ WalSndSendPending(void)
  *
  * Write the current position to the lag tracker (see XLogSendPhysical).
  *
- * When skipping empty transactions, send a keepalive message if necessary.
+ * When a transaction is skipped or the data is not sent to subscriber for a
+ * long time, send a keepalive message if necessary.
  */
 static void
-WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
-								 bool skipped_xact)
+WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx, XLogRecPtr lsn,
+								 TransactionId xid, bool did_write,
+								 bool finished_xact)
 {
 	static TimestampTz sendTime = 0;
 	TimestampTz now = GetCurrentTimestamp();
 	bool		pending_writes = false;
-	bool		end_xact = ctx->end_xact;
 
 	/*
 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
@@ -1482,8 +1486,9 @@ WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tr
 	 * transaction LSN.
 	 */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
-	if (end_xact && TimestampDifferenceExceeds(sendTime, now,
-											   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+	if (finished_xact &&
+		TimestampDifferenceExceeds(sendTime, now,
+								   WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
 	{
 		LagTrackerWrite(lsn, now);
 		sendTime = now;
@@ -1497,7 +1502,7 @@ WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tr
 	 * the worst case we will just send an extra keepalive message when it is
 	 * really not required.
 	 */
-	if (skipped_xact &&
+	if (finished_xact && !did_write &&
 		((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
 	{
 		WalSndKeepalive(false, lsn);
@@ -1513,12 +1518,12 @@ WalSndUpdateProgressAndKeepalive(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tr
 
 	/*
 	 * Process pending writes if any or try to send a keepalive if required.
-	 * We don't need to try sending keep alive messages at the transaction end
+	 * We don't need to try sending keepalive messages at the transaction end
 	 * as that will be done at a later point in time. This is required only
 	 * for large transactions where we don't send any changes to the
 	 * downstream and the receiver can timeout due to that.
 	 */
-	if (pending_writes || (!end_xact && wal_sender_timeout > 0 &&
+	if (pending_writes || (!finished_xact && wal_sender_timeout > 0 &&
 						   now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
 															  wal_sender_timeout / 2)))
 		WalSndSendPending();
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c39a619b98..dc7fee6728 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -27,7 +27,8 @@ typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
 typedef void (*LogicalOutputPluginWriterUpdateProgressAndKeepalive) (struct LogicalDecodingContext *lr,
 																	 XLogRecPtr Ptr,
 																	 TransactionId xid,
-																	 bool skipped_xact
+																	 bool did_write,
+																	 bool finished_xact
 );
 
 typedef struct LogicalDecodingContext
@@ -105,10 +106,9 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		accept_writes;
 	bool		prepared_write;
+	bool		did_write;
 	XLogRecPtr	write_location;
 	TransactionId write_xid;
-	/* Are we processing the end LSN of a transaction? */
-	bool		end_xact;
 } LogicalDecodingContext;
 
 
@@ -144,5 +144,9 @@ extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
+extern void UpdateDecodingProgressAndKeepalive(LogicalDecodingContext *ctx,
+											   TransactionId xid,
+											   XLogRecPtr lsn,
+											   bool finished_xact);
 
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2d89d26586..b9358e1544 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -245,6 +245,5 @@ typedef struct OutputPluginCallbacks
 /* Functions in replication/logical/logical.c */
 extern void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
 extern void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write);
-extern void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact);
 
 #endif							/* OUTPUT_PLUGIN_H */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 215d1494e9..e5db041df1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -526,12 +526,6 @@ typedef void (*ReorderBufferStreamTruncateCB) (
 											   Relation relations[],
 											   ReorderBufferChange *change);
 
-/* update progress txn callback signature */
-typedef void (*ReorderBufferUpdateProgressTxnCB) (
-												  ReorderBuffer *rb,
-												  ReorderBufferTXN *txn,
-												  XLogRecPtr lsn);
-
 struct ReorderBuffer
 {
 	/*
@@ -595,12 +589,6 @@ struct ReorderBuffer
 	ReorderBufferStreamMessageCB stream_message;
 	ReorderBufferStreamTruncateCB stream_truncate;
 
-	/*
-	 * Callback to be called when updating progress during sending data of a
-	 * transaction (and its subtransactions) to the output plugin.
-	 */
-	ReorderBufferUpdateProgressTxnCB update_progress_txn;
-
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
 	 */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 53fd0fe621..adc943e470 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2322,7 +2322,6 @@ ReorderBufferToastEnt
 ReorderBufferTupleBuf
 ReorderBufferTupleCidEnt
 ReorderBufferTupleCidKey
-ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
 ReparameterizeForeignPathByChild_function
-- 
2.39.1.windows.1

