From 615f51df20cb5c56ad7b2894d3e4e8b0d899abe8 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Mon, 21 Jul 2025 09:16:59 -0400
Subject: [PATCH v3] Fix a deadlock during ALTER SUBSCRIPTION... DROP
 PUBLICATION

When user drops a publication from a subscription, this will result in a
publication refresh on the subscriber which will try and drop any pending origins.
Meanwhile the apply worker could also be trying to cleanup origins. There could be
a deadlock if the order of locking of SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId are not aligned between functions
process_syncing_tables_for_apply() and AlterSubscription_refresh().

The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId in that order when dropping tracking origins.
---
 src/backend/catalog/pg_subscription.c       |  5 ++--
 src/backend/commands/subscriptioncmds.c     | 11 ++++++++-
 src/backend/replication/logical/tablesync.c | 37 +++++++++++++++++++++++++----
 src/include/catalog/pg_subscription_rel.h   |  2 +-
 4 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index add51ca..518a25e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -324,7 +324,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
  */
 void
 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-						   XLogRecPtr sublsn)
+						   XLogRecPtr sublsn, bool lock_needed)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -332,7 +332,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
 
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+	if (lock_needed)
+		LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
 
 	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 334717c..c0df1e5 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1323,7 +1323,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 void
 DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 {
-	Relation	rel;
+	Relation	rel, sub_rel;
 	ObjectAddress myself;
 	HeapTuple	tup;
 	Oid			subid;
@@ -1460,7 +1460,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * Note that the state can't change because we have already stopped both
 	 * the apply and tablesync workers and they can't restart because of
 	 * exclusive lock on the subscription.
+	 *
+	 * Lock pg_subscription_rel with AccessExclusiveLock to prevent any
+	 * deadlock with apply workers of other subscriptions trying
+	 * to drop tracking origin.
 	 */
+	sub_rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
 	rstates = GetSubscriptionNotReadyRelations(subid);
 	foreach(lc, rstates)
 	{
@@ -1493,6 +1499,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	snprintf(originname, sizeof(originname), "pg_%u", subid);
 	replorigin_drop_by_name(originname, true, false);
 
+	/* Once the origin tracking has been dropped, we can release lock */
+	table_close(sub_rel, AccessExclusiveLock);
+
 	/*
 	 * Tell the cumulative stats system that the subscription is getting
 	 * dropped.
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e6159ac..9aee210 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -314,7 +314,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
-								   MyLogicalRepWorker->relstate_lsn);
+								   MyLogicalRepWorker->relstate_lsn,
+								   true);
 
 		/*
 		 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -340,7 +341,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 * is dropped. So passing missing_ok = false.
 		 */
 		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
-
 		finish_sync_worker();
 	}
 	else
@@ -379,6 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 	bool		started_tx = false;
+	Relation	rel = NULL;
 
 	Assert(!IsTransactionState());
 
@@ -470,7 +471,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * refresh for the subscription where we remove the table
 				 * state and its origin and by this time the origin might be
 				 * already removed. So passing missing_ok = true.
+				 *
+				 * Lock SubscriptionRelationId with AccessShareLock and
+				 * take AccessExclusiveLock on SubscriptionRelRelationId to
+				 * prevent any deadlocks with user concurrently performing
+				 * refresh on the subscription.
 				 */
+				LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+								 0, AccessShareLock);
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
+
 				ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
 												  rstate->relid,
 												  originname,
@@ -482,7 +493,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
-										   rstate->lsn);
+										   rstate->lsn, false);
+
 			}
 		}
 		else
@@ -533,7 +545,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						 * This is required to avoid any undetected deadlocks
 						 * due to any existing lock as deadlock detector won't
 						 * be able to detect the waits on the latch.
+						 *
+						 * Also close any tables prior to the commit.
 						 */
+						if (rel)
+						{
+							table_close(rel, AccessExclusiveLock);
+							rel = NULL;
+						}
 						CommitTransactionCommand();
 						pgstat_report_stat(false);
 					}
@@ -593,6 +612,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		}
 	}
 
+	/* close and unlock table if opened*/
+	if (rel)
+	{
+		table_close(rel, AccessExclusiveLock);
+	}
+
 	if (started_tx)
 	{
 		CommitTransactionCommand();
@@ -1310,7 +1335,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   MyLogicalRepWorker->relstate,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   true);
 	CommitTransactionCommand();
 	pgstat_report_stat(true);
 
@@ -1431,7 +1457,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   SUBREL_STATE_FINISHEDCOPY,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   true);
 
 	CommitTransactionCommand();
 
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9df99c3..a1c5bb1 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -83,7 +83,7 @@ typedef struct SubscriptionRelState
 extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
 									XLogRecPtr sublsn);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-									   XLogRecPtr sublsn);
+									   XLogRecPtr sublsn, bool lock_needed);
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-- 
1.8.3.1

