From 0d626e462fbf25f4973de5b4771835f57ed5e56d Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Mon, 22 Dec 2025 14:01:08 +0800
Subject: [PATCH v1] Fix unexpected origin advancement during parallel apply
 failure

The logical replication parallel apply worker may erroneously advance the origin
progress during an error or unsuccessful apply. This can lead to transaction
loss, as these transactions will not be resent by the server.

Commit 3f28b2fc addressed a similar issue in both the apply worker and table
sync worker, by registering a before_shmem_exit callback to reset the origin
information, preventing the worker from advancing it during transaction abortion
on shutdown. This commit registers the same callback for the parallel apply
worker, ensuring consistent behavior across all workers.
---
 src/backend/replication/logical/worker.c      | 30 +++++++++-------
 .../subscription/t/023_twophase_stream.pl     | 34 +++++++++++++++++++
 2 files changed, 51 insertions(+), 13 deletions(-)

diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fc64476a9ef..e009c0c9d48 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5849,6 +5849,23 @@ InitializeLogRepWorker(void)
 					   MySubscription->name));
 
 	CommitTransactionCommand();
+
+	/*
+	 * Register a callback to reset the origin state before aborting any
+	 * pending transaction during shutdown (see ShutdownPostgres()). This will
+	 * avoid origin advancement for an in-complete transaction which could
+	 * otherwise lead to its loss as such a transaction won't be sent by the
+	 * server again.
+	 *
+	 * Note that even a LOG or DEBUG statement placed after setting the origin
+	 * state may process a shutdown signal before committing the current apply
+	 * operation. So, it is important to register such a callback here.
+	 *
+	 * Register this callback here to ensure that all types of logical
+	 * replication workers that set up origins and apply remote transactions
+	 * are protected.
+	 */
+	before_shmem_exit(replorigin_reset, (Datum) 0);
 }
 
 /*
@@ -5892,19 +5909,6 @@ SetupApplyOrSyncWorker(int worker_slot)
 
 	InitializeLogRepWorker();
 
-	/*
-	 * Register a callback to reset the origin state before aborting any
-	 * pending transaction during shutdown (see ShutdownPostgres()). This will
-	 * avoid origin advancement for an in-complete transaction which could
-	 * otherwise lead to its loss as such a transaction won't be sent by the
-	 * server again.
-	 *
-	 * Note that even a LOG or DEBUG statement placed after setting the origin
-	 * state may process a shutdown signal before committing the current apply
-	 * operation. So, it is important to register such a callback here.
-	 */
-	before_shmem_exit(replorigin_reset, (Datum) 0);
-
 	/* Connect to the origin and start the replication. */
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
index e01347ca699..9b9f189308e 100644
--- a/src/test/subscription/t/023_twophase_stream.pl
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -429,6 +429,40 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
 is($result, qq(1), 'transaction is committed on subscriber');
 
+# Test the ability to re-apply a transaction when a parallel apply worker fails
+# to prepare the transaction due to insufficient max_prepared_transactions
+# setting.
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 0));
+$node_subscriber->restart;
+
+$node_publisher->safe_psql(
+	'postgres', q{
+	BEGIN;
+	INSERT INTO test_tab_2 values(2);
+	PREPARE TRANSACTION 'xact';
+	COMMIT PREPARED 'xact';
+	});
+
+$offset = -s $node_subscriber->logfile;
+
+# Confirm the ERROR is reported because max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+	qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/,
+	$offset);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check that transaction is committed on subscriber
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
+is($result, qq(2), 'transaction is committed on subscriber after retrying');
+
 ###############################
 # check all the cleanup
 ###############################
-- 
2.51.1.windows.1

