From ec22f424ede4cc828d79363502be38c04d43ac58 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 23 Jul 2024 08:40:29 +0000
Subject: [PATCH v3] Prevent origin progress advancement if failed to apply a
 transaction

The origin progress is advanced when aborting a transaction, intented to
signify the successful streaming and application of the ROLLBACK from the
publisher to the subscriber in streaming parallel mode. But when an error
occurred during the commit or prepare after setting
replorigin_session_origin_lsn, the origin progress was advanced as well which
is unexpected. This led to skipped transactions that were not replicated again.

Fix it by resetting replorigin_session_origin_lsn in case of error.

Originally reported by Hou Zhijie
---
 src/backend/replication/logical/worker.c | 27 ++++++++++++++++++++++++
 src/test/subscription/t/021_twophase.pl  | 14 +++++++++++-
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 245e9be6f2..c2849c8bfa 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -413,6 +413,8 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+static void replorigin_reset(int code, Datum arg);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -4432,6 +4434,12 @@ start_apply(XLogRecPtr origin_startpos)
 	}
 	PG_CATCH();
 	{
+		/*
+		 * Reset the origin data to prevent the advancement of origin progress
+		 * if the transaction failed to apply.
+		 */
+		replorigin_reset(0, (Datum) 0);
+
 		if (MySubscription->disableonerr)
 			DisableSubscriptionAndExit();
 		else
@@ -4642,6 +4650,18 @@ InitializeLogRepWorker(void)
 	CommitTransactionCommand();
 }
 
+/*
+ * Reset the origin state. This is needed to prevent the advancement of origin
+ * progress if the transaction failed to apply.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+	replorigin_session_origin = InvalidRepOriginId;
+	replorigin_session_origin_lsn = InvalidXLogRecPtr;
+	replorigin_session_origin_timestamp = 0;
+}
+
 /* Common function to setup the leader apply or tablesync worker. */
 void
 SetupApplyOrSyncWorker(int worker_slot)
@@ -4670,6 +4690,13 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
+	/*
+	 * Register a callback to reset the origin state before aborting the
+	 * transaction in ShutdownPostgres(). This is to prevent the advancement
+	 * of origin progress if the transaction failed to apply.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
+
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 5e50f1af33..19147f31e2 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 $node_subscriber->init;
 $node_subscriber->append_conf('postgresql.conf',
-	qq(max_prepared_transactions = 10));
+	qq(max_prepared_transactions = 0));
 $node_subscriber->start;
 
 # Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
 # then COMMIT PREPARED
 ###############################
 
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
 $node_publisher->safe_psql(
 	'postgres', "
 	BEGIN;
 	INSERT INTO tab_full VALUES (11);
 	PREPARE TRANSACTION 'test_prepared_tab_full';");
 
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
 $node_publisher->wait_for_catchup($appname);
 
 # check that transaction is in prepared state on subscriber
-- 
2.43.0

