From 7ec18bfbee7c841fb237d49ff823045cd755e51c Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH v4] Prevent origin progress advancement if failed to apply a
 transaction

The origin progress is advanced when aborting a transaction, intented to
signify the successful streaming and application of the ROLLBACK from the
publisher to the subscriber in streaming parallel mode. But when an error
occurred during the commit or prepare after setting
replorigin_session_origin_lsn, the origin progress was advanced as well which
is unexpected. This led to skipped transactions that were not replicated again.

Fix it by resetting replorigin_session_origin_lsn in case of error.

Originally reported by Hou Zhijie
---
 src/backend/replication/logical/worker.c | 46 ++++++++++++++++++++++++
 src/backend/utils/error/elog.c           | 17 +++++++++
 src/include/utils/elog.h                 |  1 +
 src/test/subscription/t/021_twophase.pl  | 14 +++++++-
 4 files changed, 77 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 6dc54c7283..837e63ea45 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4619,6 +4619,19 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state. This is needed to prevent the advancement of origin
+ * progress if the transaction failed to apply or a shutdown was triggered
+ * before worker could commit the transaction.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4647,6 +4660,25 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting the
+	 * transaction in ShutdownPostgres(). This is necessary to handle
+	 * situations where a user shuts down a subscriber while the worker is
+	 * applying changes. In such cases, the worker may shut down before
+	 * committing the transaction. If the worker does not reset the origin
+	 * state information before calling AbortOutOfAnyTransaction() during
+	 * shutdown, the origin LSN will advance incorrectly, resulting in the
+	 * loss of that transaction, as it will not be replayed when the
+	 * subscriber is restarted.
+	 *
+	 * Since errfinish includes CHECK_FOR_INTERRUPTS(), any LOG or DEBUG
+	 * statements placed in the code after the origin state is set may process
+	 * a shutdown signal before committing the current apply operation, which
+	 * could lead to above explained situation.
+	 *
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
@@ -4873,12 +4905,26 @@ void
 apply_error_callback(void *arg)
 {
 	ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+	int			elevel;
 
 	if (apply_error_callback_arg.command == 0)
 		return;
 
 	Assert(errarg->origin_name);
 
+	elevel = geterrlevel();
+
+	/*
+	 * Reset the origin state to prevent the advancement of origin progress if
+	 * the transaction fails to apply. It is crucial to reset this information
+	 * before calling AbortOutOfAnyTransaction(). Otherwise, the abort will
+	 * incorrectly advance the origin based on the current origin state for
+	 * which the worker could not perform the commit. This will result in
+	 * transaction loss, as that transaction will never be replayed.
+	 */
+	if (elevel >= ERROR)
+		replorigin_reset(0, (Datum) 0);
+
 	if (errarg->rel == NULL)
 	{
 		if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 943d8588f3..e26b9f3a66 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1568,6 +1568,23 @@ geterrcode(void)
 	return edata->sqlerrcode;
 }
 
+/*
+ * geterrlevel --- return the currently set SQLSTATE error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+	ErrorData  *edata = &errordata[errordata_stack_depth];
+
+	/* we don't bother incrementing recursion_depth */
+	CHECK_STACK_DEPTH();
+
+	return edata->elevel;
+}
+
 /*
  * geterrposition --- return the currently set error position (0 if none)
  *
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 054dd2bf62..e54eca5b48 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -226,6 +226,7 @@ extern int	internalerrquery(const char *query);
 extern int	err_generic_string(int field, const char *str);
 
 extern int	geterrcode(void);
+extern int	geterrlevel(void);
 extern int	geterrposition(void);
 extern int	getinternalerrposition(void);
 
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 5e50f1af33..19147f31e2 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.34.1

