From 630ad065e680d950950cc656fdfab768ae5af0e8 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 19 Dec 2025 15:25:50 +0800
Subject: [PATCH v2] Fix orphaned origin in shared memory after subscription
 drop

Currently, we store replication origin information in both the catalog and
shared memory (once the origin is set up). During a transaction, if a
replication origin is created and then set up, but the transaction subsequently
aborts, only the catalog changes are rolled back, leaving an orphaned origin in
shared memory. This is confusing because users can still see orphaned records
in pg_replication_origin_status even after dropping the subscription, and this
prevents other origins from reusing the position.

This issue is more likely to happen during table synchronization, where the
replication origin is created within the same transaction that performs the
initial copy. If the COPY operation fails, the origin in shared memory remains
uncleared.

To address this, the commit moves origin creation to the end of a separate
transaction preceding the one executing the initial COPY. This prevents the
risk that changes to the origin in shared memory cannot be rolled back if the
transaction aborts.
---
 src/backend/commands/subscriptioncmds.c     | 10 ++--
 src/backend/replication/logical/tablesync.c | 58 ++++++++++-----------
 src/test/subscription/t/004_sync.pl         |  7 +++
 3 files changed, 40 insertions(+), 35 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index abbcaff0838..921cd9674f1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1099,10 +1099,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 					 *
 					 * It is possible that the origin is not yet created for
 					 * tablesync worker, this can happen for the states before
-					 * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
-					 * apply worker can also concurrently try to drop the
-					 * origin and by this time the origin might be already
-					 * removed. For these reasons, passing missing_ok = true.
+					 * SUBREL_STATE_DATASYNC. The tablesync worker or apply
+					 * worker can also concurrently try to drop the origin and
+					 * by this time the origin might be already removed. For
+					 * these reasons, passing missing_ok = true.
 					 */
 					ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
 													   sizeof(originname));
@@ -2174,7 +2174,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 		 *
 		 * It is possible that the origin is not yet created for tablesync
 		 * worker so passing missing_ok = true. This can happen for the states
-		 * before SUBREL_STATE_FINISHEDCOPY.
+		 * before SUBREL_STATE_DATASYNC.
 		 */
 		ReplicationOriginNameForLogicalRep(subid, relid, originname,
 										   sizeof(originname));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 6bb0cbeedad..2522e372036 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1333,13 +1333,27 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-	/* Update the state and make it visible to others. */
+	/*
+	 * Update the state, create the replication origin, and make them visible
+	 * to others.
+	 */
 	StartTransactionCommand();
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   MyLogicalRepWorker->relstate,
 							   MyLogicalRepWorker->relstate_lsn,
 							   false);
+
+	/*
+	 * Create the replication origin in a separate transaction from the one
+	 * that sets up the origin in shared memory. This prevents the risk that
+	 * changes to the origin in shared memory cannot be rolled back if the
+	 * transaction aborts.
+	 */
+	originid = replorigin_by_name(originname, true);
+	if (!OidIsValid(originid))
+		originid = replorigin_create(originname);
+
 	CommitTransactionCommand();
 	pgstat_report_stat(true);
 
@@ -1379,37 +1393,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 					   CRS_USE_SNAPSHOT, origin_startpos);
 
 	/*
-	 * Setup replication origin tracking. The purpose of doing this before the
-	 * copy is to avoid doing the copy again due to any error in setting up
-	 * origin tracking.
+	 * Advance the origin to the LSN got from walrcv_create_slot and then set
+	 * up the origin. The advancement is WAL logged for the purpose of
+	 * recovery. Locks are to prevent the replication origin from vanishing
+	 * while advancing.
+	 *
+	 * The purpose of doing these before the copy is to avoid doing the copy
+	 * again due to any error in advancing or setting up origin tracking.
 	 */
-	originid = replorigin_by_name(originname, true);
-	if (!OidIsValid(originid))
-	{
-		/*
-		 * Origin tracking does not exist, so create it now.
-		 *
-		 * Then advance to the LSN got from walrcv_create_slot. This is WAL
-		 * logged for the purpose of recovery. Locks are to prevent the
-		 * replication origin from vanishing while advancing.
-		 */
-		originid = replorigin_create(originname);
-
-		LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
-		replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
-						   true /* go backward */ , true /* WAL log */ );
-		UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+	LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+	replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+					   true /* go backward */ , true /* WAL log */ );
+	UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
-		replorigin_session_setup(originid, 0);
-		replorigin_session_origin = originid;
-	}
-	else
-	{
-		ereport(ERROR,
-				(errcode(ERRCODE_DUPLICATE_OBJECT),
-				 errmsg("replication origin \"%s\" already exists",
-						originname)));
-	}
+	replorigin_session_setup(originid, 0);
+	replorigin_session_origin = originid;
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index d5eac05a3b3..5efd7e116f0 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -172,6 +172,13 @@ ok( $node_publisher->poll_query_until(
 		'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'),
 	'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
 
+# After dropping the subscription, all replication origins, whether created by
+# an apply worker or table sync worker, should have been cleaned up.
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_replication_origin_status");
+is($result, qq(0),
+	'all replication origins have been cleaned up');
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
-- 
2.31.1

