From 0ba8ae921ed4807c0eb5e8c7909e160547c5e678 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 19 Dec 2025 15:25:50 +0800
Subject: [PATCH v1] Fix orphaned origin in shared memory after subscription
 drop

Currently, we store replication origin information in both the catalog and
shared memory (once the origin is set up). During a transaction, if a
replication origin is created and then set up, but the transaction subsequently
aborts, only the catalog changes are rolled back, leaving an orphaned origin in
shared memory. This prevents others from reusing the position.

This issue is more likely to happen during table synchronization, where the
replication origin is created within the same transaction that performs the
initial copy. If the COPY operation fails, the origin in shared memory remains
uncleared.

To address this, the commit moves origin creation to the end of a separate
transaction preceding the one executing the initial COPY. This change minimizes
the chances of a transaction abort occurring immediately after origin creation.
---
 src/backend/commands/subscriptioncmds.c     | 10 ++---
 src/backend/replication/logical/tablesync.c | 50 ++++++++++-----------
 src/test/subscription/t/004_sync.pl         |  7 +++
 3 files changed, 35 insertions(+), 32 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index abbcaff0838..921cd9674f1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1099,10 +1099,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 					 *
 					 * It is possible that the origin is not yet created for
 					 * tablesync worker, this can happen for the states before
-					 * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
-					 * apply worker can also concurrently try to drop the
-					 * origin and by this time the origin might be already
-					 * removed. For these reasons, passing missing_ok = true.
+					 * SUBREL_STATE_DATASYNC. The tablesync worker or apply
+					 * worker can also concurrently try to drop the origin and
+					 * by this time the origin might be already removed. For
+					 * these reasons, passing missing_ok = true.
 					 */
 					ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
 													   sizeof(originname));
@@ -2174,7 +2174,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		 *
 		 * It is possible that the origin is not yet created for tablesync
 		 * worker so passing missing_ok = true. This can happen for the states
-		 * before SUBREL_STATE_FINISHEDCOPY.
+		 * before SUBREL_STATE_DATASYNC.
 		 */
 		ReplicationOriginNameForLogicalRep(subid, relid, originname,
 										   sizeof(originname));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6bb0cbeedad..79d2758d02c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1340,6 +1340,17 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 							   MyLogicalRepWorker->relstate,
 							   MyLogicalRepWorker->relstate_lsn,
 							   false);
+
+	/*
+	 * Create the replication origin in a separate transaction from the one
+	 * that sets up the origin in shared memory. This prevents the risk that
+	 * changes to the origin in shared memory cannot be rolled back if the
+	 * transaction aborts.
+	 */
+	originid = replorigin_by_name(originname, true);
+	if (!OidIsValid(originid))
+		originid = replorigin_create(originname);
+
 	CommitTransactionCommand();
 	pgstat_report_stat(true);
 
@@ -1378,38 +1389,23 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					   MySubscription->failover,
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
+	/*
+	 * Advance the origin to the LSN got from walrcv_create_slot. This is WAL
+	 * logged for the purpose of recovery. Locks are to prevent the
+	 * replication origin from vanishing while advancing.
+	 */
+	LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+	replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+					   true /* go backward */ , true /* WAL log */ );
+	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
 	/*
 	 * Setup replication origin tracking. The purpose of doing this before the
 	 * copy is to avoid doing the copy again due to any error in setting up
 	 * origin tracking.
 	 */
-	originid = replorigin_by_name(originname, true);
-	if (!OidIsValid(originid))
-	{
-		/*
-		 * Origin tracking does not exist, so create it now.
-		 *
-		 * Then advance to the LSN got from walrcv_create_slot. This is WAL
-		 * logged for the purpose of recovery. Locks are to prevent the
-		 * replication origin from vanishing while advancing.
-		 */
-		originid = replorigin_create(originname);
-
-		LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
-		replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
-						   true /* go backward */ , true /* WAL log */ );
-		UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
-
-		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
-	}
-	else
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_DUPLICATE_OBJECT),
-				 errmsg("replication origin \"%s\" already exists",
-						originname)));
-	}
+	replorigin_session_setup(originid, 0);
+	replorigin_session_origin = originid;
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index d5eac05a3b3..5efd7e116f0 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -172,6 +172,13 @@ ok( $node_publisher->poll_query_until(
 		'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'),
 	'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
 
+# After dropping the subscription, all replication origins, whether created by
+# an apply worker or table sync worker, should have been cleaned up.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin_status");
+is($result, qq(0),
+	'all replication origins have been cleaned up');
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
-- 
2.31.1

