From 089abd1ddd9c9e7ccf2520fd9d9638f99715b6cc Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH v1] Don't advance origin during apply failure.

We advance origin progress during abort on successful streaming and
application of ROLLBACK in parallel streaming mode. But the origin
shouldn't be advanced during an error or unsuccessful apply due to
shutdown. Otherwise, it will result in a transaction loss as such a
transaction won't be sent again by the server.

Reported-by: Hou Zhijie
Author: Hayato Kuroda and Shveta Malik
Reviewed-by: Amit Kapila
Backpatch-through: 16
Discussion: https://postgr.es/m/TYAPR01MB5692FAC23BE40C69DA8ED4AFF5B92@TYAPR01MB5692.jpnprd01.prod.outlook.com
---
 src/backend/replication/logical/worker.c | 35 ++++++++++++++++++++++++
 src/backend/utils/error/elog.c           | 17 ++++++++++++
 src/include/utils/elog.h                 |  1 +
 src/test/subscription/t/021_twophase.pl  | 14 +++++++++-
 4 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..d091a1dd27 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4639,6 +4639,17 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4667,6 +4678,19 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an in-complete transaction which could
+	 * otherwise lead to its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
@@ -4893,12 +4917,23 @@ void
 apply_error_callback(void *arg)
 {
 	ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+	int			elevel;
 
 	if (apply_error_callback_arg.command == 0)
 		return;
 
 	Assert(errarg->origin_name);
 
+	elevel = geterrlevel();
+
+	/*
+	 * Reset the origin state to prevent the advancement of origin progress if
+	 * we fail to apply. Otherwise, this will result in transaction loss as
+	 * that transaction won't be sent again by the server.
+	 */
+	if (elevel >= ERROR)
+		replorigin_reset(0, (Datum) 0);
+
 	if (errarg->rel == NULL)
 	{
 		if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index d1d1632bdd..b924b524d0 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1568,6 +1568,23 @@ geterrcode(void)
 	return edata->sqlerrcode;
 }
 
+/*
+ * geterrlevel --- return the currently set error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+	ErrorData  *edata = &errordata[errordata_stack_depth];
+
+	/* we don't bother incrementing recursion_depth */
+	CHECK_STACK_DEPTH();
+
+	return edata->elevel;
+}
+
 /*
  * geterrposition --- return the currently set error position (0 if none)
  *
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 054dd2bf62..e54eca5b48 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -226,6 +226,7 @@ extern int	internalerrquery(const char *query);
 extern int	err_generic_string(int field, const char *str);
 
 extern int	geterrcode(void);
+extern int	geterrlevel(void);
 extern int	geterrposition(void);
 extern int	getinternalerrposition(void);
 
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 9437cd4c3b..e635be74c6 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.34.1

