On Wed, Jul 23, 2025 at 4:32 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Jul 21, 2025 at 6:59 PM Ajin Cherian <itsa...@gmail.com> wrote:
> >
> > Yes, this is correct. I have also verified this in my testing that
> > when there is a second subscription, a deadlock can still happen with
> > my previous patch. I have now modified the patch in tablesync worker
> > to take locks on both SubscriptionRelationId and
> > SubscriptionRelRelationId prior to taking lock on
> > ReplicationOriginRelationId. I have also found that there is a similar
> > problem in DropSubscription() where lock on SubscriptionRelRelationId
> > is not taken before dropping the tracking origin. I've also modified
> > the signature of UpdateSubscriptionRelState to take a bool
> > "lock_needed" which if false, the SubscriptionRelationId is not
> > locked. I've made a new version of the patch on PG_15.
> >
>
> Review comments:
> ================
> 1.
> + if (!rel)
> + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
>
> Why did you acquire AccessExclusiveLock here when the current code has
> RowExclusiveLock? It should be RowExclusiveLock.
>

Yes, you are correct. I have replaced it with RowExclusiveLock.

> 2.
> + *
> + * Lock SubscriptionRelationId with AccessShareLock and
> + * take AccessExclusiveLock on SubscriptionRelRelationId to
> + * prevent any deadlocks with user concurrently performing
> + * refresh on the subscription.
>   */
>
> Try to tell in the comments that we want to keep the locking order
> same as DDL commands to prevent deadlocks.
>

Modified.

> 3.
> + * Also close any tables prior to the commit.
>   */
> + if (rel)
> + {
> + table_close(rel, AccessExclusiveLock);
> + rel = NULL;
>
> We don't need to explicitly release lock on table_close, it will be
> done at transaction end, so use NoLock here as we do in current HEAD
> code.
>

Done.

> 4.
> DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
>  {
> - Relation rel;
> + Relation rel, sub_rel;
>   ObjectAddress myself;
>   HeapTuple tup;
>   Oid subid;
> @@ -1460,7 +1460,13 @@ DropSubscription(DropSubscriptionStmt *stmt,
> bool isTopLevel)
>   * Note that the state can't change because we have already stopped both
>   * the apply and tablesync workers and they can't restart because of
>   * exclusive lock on the subscription.
> + *
> + * Lock pg_subscription_rel with AccessExclusiveLock to prevent any
> + * deadlock with apply workers of other subscriptions trying
> + * to drop tracking origin.
>   */
> + sub_rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
>
> I don't think we need AccessExclusiveLock on SubscriptionRelRelationId
> in DropSubscription. Kindly test again after fixing the first comment
> above.
> --

Yes, it was failing because I was taking AccessExclusiveLock in the
apply worker, and that was causing the deadlock in my testing. I
tested this worked.



On Wed, Jul 23, 2025 at 7:12 PM Hayato Kuroda (Fujitsu)
<kuroda.hay...@fujitsu.com> wrote:
>
> Dear Ajin,
>
> Thanks for the patch. Firstly let me confirm my understanding. While altering 
> the
> subscription, locks are acquired with below ordering:
>
> Order   target                                          level
> 1               pg_subscription                         row exclusive
> 2               pg_subscription, my tuple       access exclusive
> 3               pg_subscription_rel                     access exclusive
> 4               pg_replication_origin           excluive
>
> In contrast, apply worker works like:
>
> Order   target                                          level
> 1               pg_replication_origin           excluive
> 2               pg_subscription, my tuple       access share
> 3               pg_subscrition_rel                      row exclusive
>
> Thus a backend may wait at step 4, and apply worker can stuck at step 2 or 3.
>

Yes, that is correct.


> Below are my comments:
>
> ```
> @@ -340,7 +341,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
>                  * is dropped. So passing missing_ok = false.
>                  */
>                 ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, 
> syncslotname, false);
> -
> ```
> This change is not needed.

Removed.

>
> ```
> +                               if (!rel)
> +                                       rel = 
> table_open(SubscriptionRelRelationId, AccessExclusiveLock);
> +
> ```
>
> I feel it is too strong, isn't it enough to use row exclusive as initially 
> used?
>

Yes, agree. Fixed.

> ```
>  UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
> -                                                  XLogRecPtr sublsn)
> +                                                  XLogRecPtr sublsn, bool 
> lock_needed)
> ```
>
> I feel the name `lock_needed` is bit misleading, because the function still 
> opens
> the pg_subscription_rel catalog with row exclusive. How about 
> "lock_shared_object"?
>

I have modified it to not take lock while table_open as well and
changed the name of the variable to already_locked.

> ```
> @@ -1460,7 +1460,13 @@ DropSubscription(DropSubscriptionStmt *stmt, bool 
> isTopLevel)
>          * Note that the state can't change because we have already stopped 
> both
>          * the apply and tablesync workers and they can't restart because of
>          * exclusive lock on the subscription.
> +        *
> +        * Lock pg_subscription_rel with AccessExclusiveLock to prevent any
> +        * deadlock with apply workers of other subscriptions trying
> +        * to drop tracking origin.
>          */
> +       sub_rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock);
> ```
>
> Hmm. Per my understanding, DropSubscription acquires below locks till it 
> reaches
> replorigin_drop_by_name().
>
> Order   target                                          level
> 1               pg_subscription                         access exclusive
> 2               pg_subscription, my tuple       access exclusive
> 3               pg_replication_origin           excluive
>
> IIUC we must preserve the ordering, not the target of locks.

I have removed this change as this does not now conflict with the apply worker.

Two patches attached. One for HEAD and the other for PG_15.

regards,
Ajin Cherian
Fujitsu Australia
From 74d9751501b0f7de40636bfe2beeb6ba65b19df5 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 23 Jul 2025 07:06:36 -0400
Subject: [PATCH v4] Fix a possible deadlock during ALTER SUBSCRIPTION ... DROP
 PUBLICATION

In most situations the tablesync worker will drop the corresponding origin before it
finishes executing, but if an error causes the tablesync worker to fail just prior to
dropping the origin, or if it is delayed or it didn't get cpu time, the apply worker
could find the origin and attempt to drop the origin. During this time if the
user simultaneously issues an  ALTER SUBSCRIPTION ... DROP PUBLICATION, there is a
possibility of a deadlock between the apply worker and the user process, because of the
order in which the locks are taken.

The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId in that order when dropping tracking origins.
---
 src/backend/catalog/pg_subscription.c       | 12 ++++++----
 src/backend/replication/logical/tablesync.c | 35 +++++++++++++++++++++++++----
 src/include/catalog/pg_subscription_rel.h   |  2 +-
 3 files changed, 40 insertions(+), 9 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 63c2992..18ffbe7 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -320,7 +320,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
  */
 void
 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-						   XLogRecPtr sublsn)
+						   XLogRecPtr sublsn, bool already_locked)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -328,9 +328,13 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
 
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
-
-	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	if (already_locked)
+		rel = table_open(SubscriptionRelRelationId, NoLock);
+	else
+	{
+		LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+		rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	}
 
 	/* Try finding existing mapping. */
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 3fea0a0..80d29d2 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
-								   MyLogicalRepWorker->relstate_lsn);
+								   MyLogicalRepWorker->relstate_lsn,
+								   false);
 
 		/*
 		 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	ListCell   *lc;
 	bool		started_tx = false;
 	bool		should_exit = false;
+	Relation	rel = NULL;
 
 	Assert(!IsTransactionState());
 
@@ -492,7 +494,18 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * worker to remove the origin tracking as if there is any
 				 * error while dropping we won't restart it to drop the
 				 * origin. So passing missing_ok = true.
+				 *
+				 * Lock SubscriptionRelationId with AccessShareLock and
+				 * take RowExclusiveLock on SubscriptionRelRelationId to
+				 * keep the same locking order as refresh from an
+				 * user issued DDL command to prevent any deadlocks.
 				 */
+				LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+								 0, AccessShareLock);
+
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
 				ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
 												   rstate->relid,
 												   originname,
@@ -504,7 +517,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
-										   rstate->lsn);
+										   rstate->lsn, true);
 			}
 		}
 		else
@@ -555,7 +568,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						 * This is required to avoid any undetected deadlocks
 						 * due to any existing lock as deadlock detector won't
 						 * be able to detect the waits on the latch.
+						 *
+						 * Also close any tables prior to the commit.
 						 */
+						if (rel)
+						{
+							table_close(rel, NoLock);
+							rel = NULL;
+						}
 						CommitTransactionCommand();
 						pgstat_report_stat(false);
 					}
@@ -623,6 +643,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		}
 	}
 
+	/* Close table if opened */
+	if (rel)
+		table_close(rel, NoLock);
+
+
 	if (started_tx)
 	{
 		/*
@@ -1414,7 +1439,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   MyLogicalRepWorker->relstate,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   false);
 	CommitTransactionCommand();
 	pgstat_report_stat(true);
 
@@ -1547,7 +1573,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   SUBREL_STATE_FINISHEDCOPY,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   false);
 
 	CommitTransactionCommand();
 
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index c91797c..f458447 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -85,7 +85,7 @@ typedef struct SubscriptionRelState
 extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
 									XLogRecPtr sublsn, bool retain_lock);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-									   XLogRecPtr sublsn);
+									   XLogRecPtr sublsn, bool already_locked);
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-- 
1.8.3.1

From 3b6e923614a93040e1487bd2b19f5523d1ff81ed Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Mon, 21 Jul 2025 09:16:59 -0400
Subject: [PATCH v4] Fix a deadlock during ALTER SUBSCRIPTION... DROP
 PUBLICATION

When user drops a publication from a subscription, this will result in a
publication refresh on the subscriber which will try and drop any pending origins.
Meanwhile the apply worker could also be trying to cleanup origins. There could be
a deadlock if the order of locking of SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId are not aligned between functions
process_syncing_tables_for_apply() and AlterSubscription_refresh().

The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and
ReplicationOriginRelationId in that order when dropping tracking origins.
---
 src/backend/catalog/pg_subscription.c       | 12 ++++++----
 src/backend/replication/logical/tablesync.c | 36 +++++++++++++++++++++++++----
 src/include/catalog/pg_subscription_rel.h   |  2 +-
 3 files changed, 40 insertions(+), 10 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index add51ca..d48ed20 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -324,7 +324,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
  */
 void
 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-						   XLogRecPtr sublsn)
+						   XLogRecPtr sublsn, bool already_locked)
 {
 	Relation	rel;
 	HeapTuple	tup;
@@ -332,9 +332,13 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
 	Datum		values[Natts_pg_subscription_rel];
 	bool		replaces[Natts_pg_subscription_rel];
 
-	LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
-
-	rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	if (already_locked)
+		rel = table_open(SubscriptionRelRelationId, NoLock);
+	else
+	{
+		LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+		rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+	}
 
 	/* Try finding existing mapping. */
 	tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e6159ac..1015b2b 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -314,7 +314,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 								   MyLogicalRepWorker->relid,
 								   MyLogicalRepWorker->relstate,
-								   MyLogicalRepWorker->relstate_lsn);
+								   MyLogicalRepWorker->relstate_lsn,
+								   false);
 
 		/*
 		 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -340,7 +341,6 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 		 * is dropped. So passing missing_ok = false.
 		 */
 		ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
-
 		finish_sync_worker();
 	}
 	else
@@ -379,6 +379,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 	static HTAB *last_start_times = NULL;
 	ListCell   *lc;
 	bool		started_tx = false;
+	Relation	rel = NULL;
 
 	Assert(!IsTransactionState());
 
@@ -470,7 +471,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * refresh for the subscription where we remove the table
 				 * state and its origin and by this time the origin might be
 				 * already removed. So passing missing_ok = true.
+				 *
+				 * Lock SubscriptionRelationId with AccessShareLock and
+				 * take RowExclusiveLock on SubscriptionRelRelationId to
+				 * keep the same locking order as refresh from an
+				 * user issued DDL command to prevent any deadlocks.
 				 */
+				LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
+								 0, AccessShareLock);
+				if (!rel)
+					rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
+
 				ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
 												  rstate->relid,
 												  originname,
@@ -482,7 +493,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 */
 				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 										   rstate->relid, rstate->state,
-										   rstate->lsn);
+										   rstate->lsn, true);
 			}
 		}
 		else
@@ -533,7 +544,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 						 * This is required to avoid any undetected deadlocks
 						 * due to any existing lock as deadlock detector won't
 						 * be able to detect the waits on the latch.
+						 *
+						 * Also close any tables prior to the commit.
 						 */
+						if (rel)
+						{
+							table_close(rel, NoLock);
+							rel = NULL;
+						}
 						CommitTransactionCommand();
 						pgstat_report_stat(false);
 					}
@@ -593,6 +611,12 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		}
 	}
 
+	/* Close table if opened */
+	if (rel)
+	{
+		table_close(rel, NoLock);
+	}
+
 	if (started_tx)
 	{
 		CommitTransactionCommand();
@@ -1310,7 +1334,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   MyLogicalRepWorker->relstate,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   false);
 	CommitTransactionCommand();
 	pgstat_report_stat(true);
 
@@ -1431,7 +1456,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
 							   MyLogicalRepWorker->relid,
 							   SUBREL_STATE_FINISHEDCOPY,
-							   MyLogicalRepWorker->relstate_lsn);
+							   MyLogicalRepWorker->relstate_lsn,
+							   false);
 
 	CommitTransactionCommand();
 
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9df99c3..0fcff92 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -83,7 +83,7 @@ typedef struct SubscriptionRelState
 extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
 									XLogRecPtr sublsn);
 extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
-									   XLogRecPtr sublsn);
+									   XLogRecPtr sublsn, bool already_locked);
 extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
-- 
1.8.3.1

Reply via email to