From 0717db080b079d0242fc0c3826f1284d793911fd Mon Sep 17 00:00:00 2001
From: Mikhail Nikalayeu <mihailnikalayeu@gmail.com>
Date: Sun, 26 Apr 2026 11:50:34 +0200
Subject: [PATCH v2] Protect concurrent repack lock upgrades

REPACK CONCURRENTLY loses significant work when its final ShareUpdateExclusiveLock to AccessExclusiveLock upgrade is canceled by a deadlock. This commit introduces "protected lock upgrades", allowing a backend to atomically record a future upgrade intention (upgradeIntent) on its PROCLOCK via LockAcquireExtended. When a deadlock cycle involves a protected backend's announced upgrade, the detector preempts a blocking waiter rather than the protected upgrader.

Preemption triggers via two paths. JoinWaitQueue fast-fails incoming requests if a holder's upgradeIntent and current modes guarantee a future cycle. Alternatively, if DeadLockCheck evaluates a cycle where the waiter is a protected upgrader, it selects a blocking waiter as the cancellation victim.
---
 src/backend/catalog/namespace.c               |  32 +++-
 src/backend/commands/repack.c                 |  29 ++-
 src/backend/storage/lmgr/README               |  26 +++
 src/backend/storage/lmgr/deadlock.c           |  90 ++++++++-
 src/backend/storage/lmgr/lmgr.c               |  49 +++--
 src/backend/storage/lmgr/lock.c               | 154 +++++++++++++--
 src/backend/storage/lmgr/proc.c               | 134 ++++++++++---
 src/include/catalog/namespace.h               |   7 +
 src/include/storage/lmgr.h                    |   3 +
 src/include/storage/lock.h                    |  19 +-
 src/include/storage/proc.h                    |   7 -
 .../injection_points/expected/repack.out      | 181 +++++++++++++++++-
 .../injection_points/specs/repack.spec        | 108 +++++++++++
 13 files changed, 757 insertions(+), 82 deletions(-)

diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 56b87d878e8..31a958198e0 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -415,8 +415,18 @@ spcache_insert(const char *searchPath, Oid roleid)
 	}
 }
 
+/* Wrapper preserving the historical signature (no upgrade intent). */
+Oid
+RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
+						 uint32 flags,
+						 RangeVarGetRelidCallback callback, void *callback_arg)
+{
+	return RangeVarGetRelidWithUpgradeIntent(relation, lockmode, NoLock,
+											 flags, callback, callback_arg);
+}
+
 /*
- * RangeVarGetRelidExtended
+ * RangeVarGetRelidWithUpgradeIntent
  *		Given a RangeVar describing an existing relation,
  *		select the proper namespace and look up the relation OID.
  *
@@ -435,13 +445,17 @@ spcache_insert(const char *searchPath, Oid roleid)
  * return value of InvalidOid could either mean the relation is missing or it
  * could not be locked.
  *
+ * If upgradeMode is not NoLock, an upgrade-intent announcement is installed
+ * atomically with the grant; see LockAcquireExtended.
+ *
  * Callback allows caller to check permissions or acquire additional locks
  * prior to grabbing the relation lock.
  */
 Oid
-RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
-						 uint32 flags,
-						 RangeVarGetRelidCallback callback, void *callback_arg)
+RangeVarGetRelidWithUpgradeIntent(const RangeVar *relation, LOCKMODE lockmode,
+								  LOCKMODE upgradeMode, uint32 flags,
+								  RangeVarGetRelidCallback callback,
+								  void *callback_arg)
 {
 	uint64		inval_count;
 	Oid			relId;
@@ -451,6 +465,9 @@ RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
 
 	/* verify that flags do no conflict */
 	Assert(!((flags & RVR_NOWAIT) && (flags & RVR_SKIP_LOCKED)));
+	/* Upgrade intent requires the blocking acquisition path. */
+	Assert(upgradeMode == NoLock ||
+		   !(flags & (RVR_NOWAIT | RVR_SKIP_LOCKED)));
 
 	/*
 	 * We check the catalog name and then ignore it.
@@ -590,7 +607,12 @@ RangeVarGetRelidExtended(const RangeVar *relation, LOCKMODE lockmode,
 		if (!OidIsValid(relId))
 			AcceptInvalidationMessages();
 		else if (!(flags & (RVR_NOWAIT | RVR_SKIP_LOCKED)))
-			LockRelationOid(relId, lockmode);
+		{
+			if (upgradeMode != NoLock)
+				LockRelationOidWithUpgradeIntent(relId, lockmode, upgradeMode);
+			else
+				LockRelationOid(relId, lockmode);
+		}
 		else if (!ConditionalLockRelationOid(relId, lockmode))
 		{
 			int			elevel = (flags & RVR_SKIP_LOCKED) ? DEBUG1 : ERROR;
diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c
index bafdca80810..c63d2107ef2 100644
--- a/src/backend/commands/repack.c
+++ b/src/backend/commands/repack.c
@@ -2322,12 +2322,25 @@ process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel,
 				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				errmsg("ANALYZE option must be specified when a column list is provided"));
 
-	/* Find, lock, and check permissions on the table. */
-	tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
-										lockmode,
-										0,
-										RangeVarCallbackMaintainsTable,
-										NULL);
+	/*
+	 * Find, lock, and check permissions on the table.
+	 *
+	 * For CONCURRENTLY, announce the future AEL upgrade so a conflicting
+	 * holder is preempted instead of stalling us through the copy phase.
+	 */
+	if ((params->options & CLUOPT_CONCURRENT) != 0)
+		tableOid = RangeVarGetRelidWithUpgradeIntent(stmt->relation->relation,
+													 lockmode,
+													 AccessExclusiveLock,
+													 0,
+													 RangeVarCallbackMaintainsTable,
+													 NULL);
+	else
+		tableOid = RangeVarGetRelidExtended(stmt->relation->relation,
+											lockmode,
+											0,
+											RangeVarCallbackMaintainsTable,
+											NULL);
 	rel = table_open(tableOid, NoLock);
 
 	/*
@@ -3055,6 +3068,10 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	/*
 	 * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
 	 * is one), all its indexes, so that we can swap the files.
+	 *
+	 * The upgrade intent announced at initial SUEL acquisition makes the
+	 * detector preempt a blocker on a deadlock here, preserving the work
+	 * already done.
 	 */
 	LockRelationOid(old_table_oid, AccessExclusiveLock);
 
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index 45de0fd2bd6..ced7d285279 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -586,6 +586,32 @@ The caller can then send a cancellation signal.  This implements the
 principle that autovacuum has a low locking priority (eg it must not block
 DDL on the table).
 
+Protected Lock Upgrades
+-----------------------
+
+A backend can announce at initial-lock acquisition that it intends to upgrade
+to a stronger mode later (PROCLOCK->upgradeIntent), installed atomically with
+the grant via LockAcquireExtended.  REPACK CONCURRENTLY uses this so that a
+deadlock on its final SUEL->AEL swap cancels a blocker rather than throwing
+away the copy/rebuild phase.
+
+When a cycle involving the protected backend is detected, a blocking waiter is
+canceled instead of the protected backend.  This can happen either in
+JoinWaitQueue's fast-fail check or in the full DeadLockCheck() during the
+announced upgrade wait.  The fast-fail check is only a cheap early test for
+cycles already visible in the main lock table. It does not force transfer of
+unrelated fast-path relation locks, so it may miss cycles whose edges are still
+hidden in backend-local fast-path state. Such cycles become visible later when a
+conflicting strong lock request transfers the relevant fast-path locks.
+The full deadlock detector handles that case.
+
+Other protected upgraders and anti-wraparound autovacuums are skipped as
+victims. If no acceptable victim exists, the protected backend is canceled as
+usual.
+
+The victim observes preemption via PROC_WAIT_STATUS_PREEMPTED on its
+waitStatus, which LockAcquireExtended turns into a deadlock ereport.
+
 Group Locking
 -------------
 
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
index b8962d875b6..35143f0ab1e 100644
--- a/src/backend/storage/lmgr/deadlock.c
+++ b/src/backend/storage/lmgr/deadlock.c
@@ -125,6 +125,13 @@ static int	maxPossibleConstraints;
 static DEADLOCK_INFO *deadlockDetails;
 static int	nDeadlockDetails;
 
+/*
+ * Parallel to deadlockDetails[]: live PGPROCs forming each edge of the
+ * detected cycle.  Valid only while all partition locks are held.  Used by
+ * DeadLockCheck to pick a victim under the protected-upgrade rule.
+ */
+static PGPROC **deadlockProcs;
+
 /* PGPROC pointer of any blocking autovacuum worker found */
 static PGPROC *blocking_autovacuum_proc = NULL;
 
@@ -148,11 +155,12 @@ InitDeadLockChecking(void)
 	oldcxt = MemoryContextSwitchTo(TopMemoryContext);
 
 	/*
-	 * FindLockCycle needs at most MaxBackends entries in visitedProcs[] and
-	 * deadlockDetails[].
+	 * FindLockCycle needs at most MaxBackends entries in visitedProcs[],
+	 * deadlockDetails[], and deadlockProcs[].
 	 */
 	visitedProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));
 	deadlockDetails = (DEADLOCK_INFO *) palloc(MaxBackends * sizeof(DEADLOCK_INFO));
+	deadlockProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));
 
 	/*
 	 * TopoSort needs to consider at most MaxBackends wait-queue entries, and
@@ -219,10 +227,14 @@ InitDeadLockChecking(void)
 DeadLockState
 DeadLockCheck(PGPROC *proc)
 {
+	bool		victims_canceled = false;
+
+retry:
 	/* Initialize to "no constraints" */
 	nCurConstraints = 0;
 	nPossibleConstraints = 0;
 	nWaitOrders = 0;
+	nDeadlockDetails = 0;
 
 	/* Initialize to not blocked by an autovacuum worker */
 	blocking_autovacuum_proc = NULL;
@@ -242,6 +254,72 @@ DeadLockCheck(PGPROC *proc)
 		if (!FindLockCycle(proc, possibleConstraints, &nSoftEdges))
 			elog(FATAL, "deadlock seems to have disappeared");
 
+		/*
+		 * Protected-upgrade resolution: if our wait is the announced upgrade
+		 * for our proclock, cancel a blocker instead of MyProc and re-run the
+		 * full check (not just FindLockCycle) so soft cycles still get a
+		 * chance at queue rearrangement.  Partition locks are held throughout,
+		 * so the loop is bounded by MaxBackends.
+		 *
+		 * deadlockProcs[0] is always MyProc; start at the direct blocker.
+		 * Skip other protected upgraders (don't preempt them) and
+		 * anti-wraparound autovacuums (canceling them is worse than losing
+		 * our work).  If no acceptable victim exists, fall through to
+		 * DS_HARD_DEADLOCK.
+		 */
+		if (MyProc->waitProcLock != NULL &&
+			MyProc->waitProcLock->upgradeIntent == MyProc->waitLockMode)
+		{
+			PGPROC	   *victim = NULL;
+
+			for (int i = 1; i < nDeadlockDetails; i++)
+			{
+				PGPROC	   *w = deadlockProcs[i];
+
+				if (w == NULL || w == MyProc)
+					continue;
+
+				/* Don't preempt another protected upgrader. */
+				if (w->waitProcLock != NULL &&
+					w->waitProcLock->upgradeIntent == w->waitLockMode)
+					continue;
+
+				/*
+				 * PROC_IS_AUTOVACUUM is set once at startup, safe lockless.
+				 * Read PROC_VACUUM_FOR_WRAPAROUND only if we can grab
+				 * ProcArrayLock conditionally; otherwise skip this candidate.
+				 */
+				if (w->statusFlags & PROC_IS_AUTOVACUUM)
+				{
+					uint8		statusFlags;
+
+					if (!LWLockConditionalAcquire(ProcArrayLock, LW_SHARED))
+						continue;
+
+					statusFlags = ProcGlobal->statusFlags[w->pgxactoff];
+					LWLockRelease(ProcArrayLock);
+
+					if (statusFlags & PROC_VACUUM_FOR_WRAPAROUND)
+						continue;
+				}
+
+				Assert(w->waitLock != NULL);
+				Assert(!dlist_node_is_detached(&w->waitLink));
+				victim = w;
+				break;
+			}
+
+			if (victim != NULL)
+			{
+				RemoveFromWaitQueue(victim,
+									LockTagHashCode(&(victim->waitLock->tag)),
+									PROC_WAIT_STATUS_PREEMPTED);
+				SetLatch(&victim->procLatch);
+				victims_canceled = true;
+				goto retry;
+			}
+		}
+
 		return DS_HARD_DEADLOCK;	/* cannot find a non-deadlocked state */
 	}
 
@@ -273,7 +351,9 @@ DeadLockCheck(PGPROC *proc)
 	}
 
 	/* Return code tells caller if we had to escape a deadlock or not */
-	if (nWaitOrders > 0)
+	if (victims_canceled)
+		return DS_PREEMPT_DEADLOCK;
+	else if (nWaitOrders > 0)
 		return DS_SOFT_DEADLOCK;
 	else if (blocking_autovacuum_proc != NULL)
 		return DS_BLOCKED_BY_AUTOVACUUM;
@@ -590,6 +670,7 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
 						info->locktag = lock->tag;
 						info->lockmode = checkProc->waitLockMode;
 						info->pid = checkProc->pid;
+						deadlockProcs[depth] = checkProc;
 
 						return true;
 					}
@@ -679,6 +760,7 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
 					info->locktag = lock->tag;
 					info->lockmode = checkProc->waitLockMode;
 					info->pid = checkProc->pid;
+					deadlockProcs[depth] = checkProc;
 
 					/*
 					 * Add this edge to the list of soft edges in the cycle
@@ -753,6 +835,7 @@ FindLockCycleRecurseMember(PGPROC *checkProc,
 					info->locktag = lock->tag;
 					info->lockmode = checkProc->waitLockMode;
 					info->pid = checkProc->pid;
+					deadlockProcs[depth] = checkProc;
 
 					/*
 					 * Add this edge to the list of soft edges in the cycle
@@ -1159,4 +1242,5 @@ RememberSimpleDeadLock(PGPROC *proc1,
 	info->lockmode = proc2->waitLockMode;
 	info->pid = proc2->pid;
 	nDeadlockDetails = 2;
+	deadlockProcs[0] = deadlockProcs[1] = NULL;
 }
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 2ccf7237fee..10b0e0a5b9a 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -105,6 +105,19 @@ SetLocktagRelationOid(LOCKTAG *tag, Oid relid)
  */
 void
 LockRelationOid(Oid relid, LOCKMODE lockmode)
+{
+	LockRelationOidWithUpgradeIntent(relid, lockmode, NoLock);
+}
+
+/*
+ *		LockRelationOidWithUpgradeIntent
+ *
+ * Lock and atomically announce a future upgrade to `upgradeMode`.  See
+ * LockAcquireExtended for semantics.
+ */
+void
+LockRelationOidWithUpgradeIntent(Oid relid, LOCKMODE lockmode,
+								 LOCKMODE upgradeMode)
 {
 	LOCKTAG		tag;
 	LOCALLOCK  *locallock;
@@ -112,8 +125,8 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
 
 	SetLocktagRelationOid(&tag, relid);
 
-	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
-							  false);
+	res = LockAcquireExtended(&tag, lockmode, upgradeMode, false, false, true,
+							  &locallock, false);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages, so that we
@@ -156,8 +169,8 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
 
 	SetLocktagRelationOid(&tag, relid);
 
-	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
-							  false);
+	res = LockAcquireExtended(&tag, lockmode, NoLock, false, true, true,
+							  &locallock, false);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -190,8 +203,8 @@ LockRelationId(LockRelId *relid, LOCKMODE lockmode)
 
 	SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
 
-	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
-							  false);
+	res = LockAcquireExtended(&tag, lockmode, NoLock, false, false, true,
+							  &locallock, false);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages; see notes
@@ -253,8 +266,8 @@ LockRelation(Relation relation, LOCKMODE lockmode)
 						 relation->rd_lockInfo.lockRelId.dbId,
 						 relation->rd_lockInfo.lockRelId.relId);
 
-	res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
-							  false);
+	res = LockAcquireExtended(&tag, lockmode, NoLock, false, false, true,
+							  &locallock, false);
 
 	/*
 	 * Now that we have the lock, check for invalidation messages; see notes
@@ -285,8 +298,8 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
 						 relation->rd_lockInfo.lockRelId.dbId,
 						 relation->rd_lockInfo.lockRelId.relId);
 
-	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
-							  false);
+	res = LockAcquireExtended(&tag, lockmode, NoLock, false, true, true,
+							  &locallock, false);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -590,8 +603,8 @@ ConditionalLockTuple(Relation relation, const ItemPointerData *tid, LOCKMODE loc
 					  ItemPointerGetBlockNumber(tid),
 					  ItemPointerGetOffsetNumber(tid));
 
-	return (LockAcquireExtended(&tag, lockmode, false, true, true, NULL,
-								logLockFailure) != LOCKACQUIRE_NOT_AVAIL);
+	return (LockAcquireExtended(&tag, lockmode, NoLock, false, true, true,
+								NULL, logLockFailure) != LOCKACQUIRE_NOT_AVAIL);
 }
 
 /*
@@ -748,8 +761,8 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 
 		SET_LOCKTAG_TRANSACTION(tag, xid);
 
-		if (LockAcquireExtended(&tag, ShareLock, false, true, true, NULL,
-								logLockFailure)
+		if (LockAcquireExtended(&tag, ShareLock, NoLock, false, true, true,
+								NULL, logLockFailure)
 			== LOCKACQUIRE_NOT_AVAIL)
 			return false;
 
@@ -1042,8 +1055,8 @@ ConditionalLockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
 					   objid,
 					   objsubid);
 
-	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
-							  false);
+	res = LockAcquireExtended(&tag, lockmode, NoLock, false, true, true,
+							  &locallock, false);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
@@ -1122,8 +1135,8 @@ ConditionalLockSharedObject(Oid classid, Oid objid, uint16 objsubid,
 					   objid,
 					   objsubid);
 
-	res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
-							  false);
+	res = LockAcquireExtended(&tag, lockmode, NoLock, false, true, true,
+							  &locallock, false);
 
 	if (res == LOCKACQUIRE_NOT_AVAIL)
 		return false;
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 8d246ed5a4e..0e666a025ef 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -59,6 +59,9 @@ bool		log_lock_failures = false;
 #define NLOCKENTS() \
 	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
 
+#define LOCKMODE_SELF_CONFLICTS(conflictTab, lockmode) \
+	(((conflictTab)[(lockmode)] & LOCKBIT_ON(lockmode)) != 0)
+
 
 /*
  * Data structures defining the semantics of the standard lock methods.
@@ -384,7 +387,7 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
 		elog(LOG,
 			 "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
 			 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
-			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
+			 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) upgradeIntent(%d) type(%s)",
 			 where, lock,
 			 lock->tag.locktag_field1, lock->tag.locktag_field2,
 			 lock->tag.locktag_field3, lock->tag.locktag_field4,
@@ -397,6 +400,7 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
 			 lock->granted[4], lock->granted[5], lock->granted[6],
 			 lock->granted[7], lock->nGranted,
 			 dclist_count(&lock->waitProcs),
+			 lock->nUpgradeIntent,
 			 LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
 }
 
@@ -406,10 +410,11 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
 {
 	if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
 		elog(LOG,
-			 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
+			 "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x) upgradeIntent(%d)",
 			 where, proclockP, proclockP->tag.myLock,
 			 PROCLOCK_LOCKMETHOD(*(proclockP)),
-			 proclockP->tag.myProc, (int) proclockP->holdMask);
+			 proclockP->tag.myProc, (int) proclockP->holdMask,
+			 proclockP->upgradeIntent);
 }
 #else							/* not LOCK_DEBUG */
 
@@ -427,6 +432,7 @@ static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
 static void FinishStrongLockAcquire(void);
 static ProcWaitStatus WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
 static void waitonlock_error_callback(void *arg);
+pg_noreturn static void ReportPreemptDeadlock(LOCALLOCK *locallock);
 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
 static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
@@ -808,13 +814,17 @@ LockAcquire(const LOCKTAG *locktag,
 			bool sessionLock,
 			bool dontWait)
 {
-	return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
-							   true, NULL, false);
+	return LockAcquireExtended(locktag, lockmode, NoLock, sessionLock,
+							   dontWait, true, NULL, false);
 }
 
 /*
  * LockAcquireExtended - allows us to specify additional options
  *
+ * upgradeIntent is NoLock or a stronger mode the caller will try to acquire
+ * later; when set, it is installed on the PROCLOCK atomically with the grant.
+ * See README "Protected Lock Upgrades".
+ *
  * reportMemoryError specifies whether a lock request that fills the lock
  * table should generate an ERROR or not.  Passing "false" allows the caller
  * to attempt to recover from lock-table-full situations, perhaps by forcibly
@@ -832,6 +842,7 @@ LockAcquire(const LOCKTAG *locktag,
 LockAcquireResult
 LockAcquireExtended(const LOCKTAG *locktag,
 					LOCKMODE lockmode,
+					LOCKMODE upgradeIntent,
 					bool sessionLock,
 					bool dontWait,
 					bool reportMemoryError,
@@ -851,6 +862,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	bool		found_conflict;
 	ProcWaitStatus waitResult;
 	bool		log_lock = false;
+	bool		upgradeIntentSet = false;
 
 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
@@ -858,6 +870,16 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
 		elog(ERROR, "unrecognized lock mode: %d", lockmode);
 
+	/*
+	 * Announcement requires a self-conflicting non-fast-path lockmode (the
+	 * fast path can't carry one) and a strictly stronger target.
+	 */
+	Assert(upgradeIntent == NoLock ||
+		   (!EligibleForRelationFastPath(locktag, lockmode) &&
+			LOCKMODE_SELF_CONFLICTS(lockMethodTable->conflictTab, lockmode) &&
+			upgradeIntent > lockmode &&
+			upgradeIntent <= lockMethodTable->numLockModes));
+
 	if (RecoveryInProgress() && !InRecovery &&
 		(locktag->locktag_type == LOCKTAG_OBJECT ||
 		 locktag->locktag_type == LOCKTAG_RELATION) &&
@@ -933,9 +955,13 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	 *
 	 * If lockCleared is already set, caller need not worry about absorbing
 	 * sinval messages related to the lock's object.
+	 *
+	 * Upgrade intent must be installed at the first grant, not on re-entry.
 	 */
 	if (locallock->nLocks > 0)
 	{
+		Assert(upgradeIntent == NoLock);
+
 		GrantLockLocal(locallock, owner);
 		if (locallock->lockCleared)
 			return LOCKACQUIRE_ALREADY_CLEAR;
@@ -1094,6 +1120,19 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	lock = proclock->tag.myLock;
 	locallock->lock = lock;
 
+	if (upgradeIntent != NoLock)
+	{
+		/* It is not allowed to change upgradeIntent mode */
+		Assert(proclock->upgradeIntent == NoLock ||
+			   proclock->upgradeIntent == upgradeIntent);
+		upgradeIntentSet = (proclock->upgradeIntent == NoLock);
+		if (upgradeIntentSet)
+		{
+			proclock->upgradeIntent = upgradeIntent;
+			lock->nUpgradeIntent++;
+		}
+	}
+
 	/*
 	 * If lock requested conflicts with locks requested by waiters, must join
 	 * wait queue.  Otherwise, check for conflict with already-held locks.
@@ -1121,18 +1160,26 @@ LockAcquireExtended(const LOCKTAG *locktag,
 		waitResult = JoinWaitQueue(locallock, lockMethodTable, dontWait);
 	}
 
-	if (waitResult == PROC_WAIT_STATUS_ERROR)
+	if (waitResult == PROC_WAIT_STATUS_ERROR ||
+		waitResult == PROC_WAIT_STATUS_PREEMPTED)
 	{
 		/*
-		 * We're not getting the lock because a deadlock was detected already
-		 * while trying to join the wait queue, or because we would have to
-		 * wait but the caller requested no blocking.
-		 *
+		 * Not getting the lock: deadlock detected while joining the queue,
+		 * preempted by another backend's protected upgrade, or dontWait.
 		 * Undo the changes to shared entries before releasing the partition
 		 * lock.
 		 */
 		AbortStrongLockAcquire();
 
+		/* If we just installed upgrade intent but never got the lock, undo it. */
+		if (upgradeIntentSet &&
+			(proclock->holdMask & LOCKBIT_ON(lockmode)) == 0)
+		{
+			proclock->upgradeIntent = NoLock;
+			Assert(lock->nUpgradeIntent > 0);
+			lock->nUpgradeIntent--;
+		}
+
 		if (proclock->holdMask == 0)
 		{
 			uint32		proclock_hashcode;
@@ -1208,6 +1255,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
 				*locallockp = NULL;
 			return LOCKACQUIRE_NOT_AVAIL;
 		}
+		else if (waitResult == PROC_WAIT_STATUS_PREEMPTED)
+			ReportPreemptDeadlock(locallock);
 		else
 		{
 			DeadLockReport();
@@ -1236,14 +1285,15 @@ LockAcquireExtended(const LOCKTAG *locktag,
 
 		if (waitResult == PROC_WAIT_STATUS_ERROR)
 		{
-			/*
-			 * We failed as a result of a deadlock, see CheckDeadLock(). Quit
-			 * now.
-			 */
 			Assert(!dontWait);
 			DeadLockReport();
 			/* DeadLockReport() will not return */
 		}
+		else if (waitResult == PROC_WAIT_STATUS_PREEMPTED)
+		{
+			Assert(!dontWait);
+			ReportPreemptDeadlock(locallock);
+		}
 	}
 	else
 		LWLockRelease(partitionLock);
@@ -1319,6 +1369,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
 		dclist_init(&lock->waitProcs);
 		lock->nRequested = 0;
 		lock->nGranted = 0;
+		lock->nUpgradeIntent = 0;
 		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
 		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
 		LOCK_PRINT("LockAcquire: new", lock, lockmode);
@@ -1390,6 +1441,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
 			proc->lockGroupLeader : proc;
 		proclock->holdMask = 0;
 		proclock->releaseMask = 0;
+		proclock->upgradeIntent = NoLock;
 		/* Add proclock to appropriate lists */
 		dlist_push_tail(&lock->procLocks, &proclock->lockLink);
 		dlist_push_tail(&proc->myProcLocks[partition], &proclock->procLink);
@@ -1413,9 +1465,13 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
 		 * about user-level coding practices that are in fact safe in context.
 		 * It can be enabled to help find system-level problems.
 		 *
+		 * Skip for an announced upgrade: the detector handles those cycles
+		 * by canceling a blocker.
+		 *
 		 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
 		 * better to use a table.  For now, though, this works.
 		 */
+		if (proclock->upgradeIntent != lockmode)
 		{
 			int			i;
 
@@ -1671,6 +1727,19 @@ GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
 	if (lock->granted[lockmode] == lock->requested[lockmode])
 		lock->waitMask &= LOCKBIT_OFF(lockmode);
 	proclock->holdMask |= LOCKBIT_ON(lockmode);
+
+	/* The announced target was just granted; clear the announcement. */
+	if (proclock->upgradeIntent != NoLock)
+	{
+		Assert(proclock->upgradeIntent >= lockmode);
+		if (proclock->upgradeIntent == lockmode)
+		{
+			proclock->upgradeIntent = NoLock;
+			Assert(lock->nUpgradeIntent > 0);
+			lock->nUpgradeIntent--;
+		}
+	}
+
 	LOCK_PRINT("GrantLock", lock, lockmode);
 	Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
 	Assert(lock->nGranted <= lock->nRequested);
@@ -1727,6 +1796,16 @@ UnGrantLock(LOCK *lock, LOCKMODE lockmode,
 	 * Now fix the per-proclock state.
 	 */
 	proclock->holdMask &= LOCKBIT_OFF(lockmode);
+
+	/* Releasing the last held mode also retires any pending announcement. */
+	if (proclock->upgradeIntent != NoLock &&
+		proclock->holdMask == 0)
+	{
+		proclock->upgradeIntent = NoLock;
+		Assert(lock->nUpgradeIntent > 0);
+		lock->nUpgradeIntent--;
+	}
+
 	PROCLOCK_PRINT("UnGrantLock: updated", proclock);
 
 	return wakeupNeeded;
@@ -1883,6 +1962,29 @@ AbortStrongLockAcquire(void)
 	SpinLockRelease(&FastPathStrongRelationLocks->mutex);
 }
 
+/*
+ * ereport a deadlock for a protected-upgrade preemption.  Use log_lock_waits
+ * for the cycle details from the protected backend's side.
+ */
+pg_noreturn static void
+ReportPreemptDeadlock(LOCALLOCK *locallock)
+{
+	StringInfoData buf;
+	const char *modename;
+
+	initStringInfo(&buf);
+	DescribeLockTag(&buf, &locallock->tag.lock);
+	modename = GetLockmodeName(locallock->tag.lock.locktag_lockmethodid,
+							   locallock->tag.mode);
+
+	pgstat_report_deadlock();
+	ereport(ERROR,
+			(errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
+			 errmsg("deadlock detected"),
+			 errdetail("Process %d could not acquire %s on %s because another backend holds a conflicting lock and has announced a future upgrade that would form a deadlock cycle.",
+					   MyProcPid, modename, buf.data)));
+}
+
 /*
  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
  *		WaitOnLock on.
@@ -2042,8 +2144,10 @@ waitonlock_error_callback(void *arg)
 
 /*
  * Remove a proc from the wait-queue it is on (caller must know it is on one).
- * This is only used when the proc has failed to get the lock, so we set its
- * waitStatus to PROC_WAIT_STATUS_ERROR.
+ * Used when the proc has failed to get the lock; the caller passes the
+ * waitStatus to publish (PROC_WAIT_STATUS_ERROR for ordinary failure or
+ * PROC_WAIT_STATUS_PREEMPTED when the wait was canceled to break a cycle
+ * with a protected upgrade).
  *
  * Appropriate partition lock must be held by caller.  Also, caller is
  * responsible for signaling the proc if needed.
@@ -2051,7 +2155,7 @@ waitonlock_error_callback(void *arg)
  * NB: this does not clean up any locallock object that may exist for the lock.
  */
 void
-RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
+RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode, ProcWaitStatus newStatus)
 {
 	LOCK	   *waitLock = proc->waitLock;
 	PROCLOCK   *proclock = proc->waitProcLock;
@@ -2060,6 +2164,8 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
 
 	/* Make sure proc is waiting */
 	Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING);
+	Assert(newStatus == PROC_WAIT_STATUS_ERROR ||
+		   newStatus == PROC_WAIT_STATUS_PREEMPTED);
 	Assert(!dlist_node_is_detached(&proc->waitLink));
 	Assert(waitLock);
 	Assert(!dclist_is_empty(&waitLock->waitProcs));
@@ -2078,10 +2184,18 @@ RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
 	if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
 		waitLock->waitMask &= LOCKBIT_OFF(lockmode);
 
+	/* Wait canceled => the announcement (if any) no longer applies. */
+	if (proclock->upgradeIntent != NoLock)
+	{
+		proclock->upgradeIntent = NoLock;
+		Assert(waitLock->nUpgradeIntent > 0);
+		waitLock->nUpgradeIntent--;
+	}
+
 	/* Clean up the proc's own state, and pass it the ok/fail signal */
 	proc->waitLock = NULL;
 	proc->waitProcLock = NULL;
-	proc->waitStatus = PROC_WAIT_STATUS_ERROR;
+	proc->waitStatus = newStatus;
 
 	/*
 	 * Delete the proclock immediately if it represents no already-held locks.
@@ -3697,6 +3811,8 @@ PostPrepare_Locks(FullTransactionId fxid)
 			Assert(lock->nGranted >= 0);
 			Assert(lock->nGranted <= lock->nRequested);
 			Assert((proclock->holdMask & ~lock->grantMask) == 0);
+			/* upgradeIntent is not supported for PREPARE TRANSACTION. */
+			Assert(proclock->upgradeIntent == NoLock);
 
 			/* Ignore it if nothing to release (must be a session lock) */
 			if (proclock->releaseMask == 0)
@@ -4397,6 +4513,7 @@ lock_twophase_recover(FullTransactionId fxid, uint16 info,
 		dclist_init(&lock->waitProcs);
 		lock->nRequested = 0;
 		lock->nGranted = 0;
+		lock->nUpgradeIntent = 0;
 		MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
 		MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
 		LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
@@ -4460,6 +4577,7 @@ lock_twophase_recover(FullTransactionId fxid, uint16 info,
 		proclock->groupLeader = proc;
 		proclock->holdMask = 0;
 		proclock->releaseMask = 0;
+		proclock->upgradeIntent = NoLock;
 		/* Add proclock to appropriate lists */
 		dlist_push_tail(&lock->procLocks, &proclock->lockLink);
 		dlist_push_tail(&proc->myProcLocks[partition],
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 1ac25068d62..2ce7073d9d2 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -854,7 +854,8 @@ LockErrorCleanup(void)
 	if (!dlist_node_is_detached(&MyProc->waitLink))
 	{
 		/* We could not have been granted the lock yet */
-		RemoveFromWaitQueue(MyProc, lockAwaited->hashcode);
+		RemoveFromWaitQueue(MyProc, lockAwaited->hashcode,
+							PROC_WAIT_STATUS_ERROR);
 	}
 	else
 	{
@@ -1135,10 +1136,11 @@ AuxiliaryPidGetProc(int pid)
  *
  * Result is one of the following:
  *
- *  PROC_WAIT_STATUS_OK       - lock was immediately granted
- *  PROC_WAIT_STATUS_WAITING  - joined the wait queue; call ProcSleep()
- *  PROC_WAIT_STATUS_ERROR    - immediate deadlock was detected, or would
- *                              need to wait and dontWait == true
+ *  PROC_WAIT_STATUS_OK         - lock immediately granted
+ *  PROC_WAIT_STATUS_WAITING    - joined the wait queue; call ProcSleep()
+ *  PROC_WAIT_STATUS_ERROR      - immediate deadlock or dontWait failure
+ *  PROC_WAIT_STATUS_PREEMPTED  - cycle proven against an announced upgrade;
+ *                                caller must raise a deadlock error
  *
  * NOTES: The process queue is now a priority queue for locking.
  */
@@ -1239,9 +1241,15 @@ JoinWaitQueue(LOCALLOCK *locallock, LockMethod lockMethodTable, bool dontWait)
 					 * can't do that until we are *on* the wait queue. So, set
 					 * a flag to check below, and break out of loop.  Also,
 					 * record deadlock info for later message.
+					 *
+					 * If this wait is our announced upgrade, skip the early
+					 * exit so the full detector can preempt a blocker.
 					 */
-					RememberSimpleDeadLock(MyProc, lockmode, lock, proc);
-					early_deadlock = true;
+					if (proclock->upgradeIntent != lockmode)
+					{
+						RememberSimpleDeadLock(MyProc, lockmode, lock, proc);
+						early_deadlock = true;
+					}
 					break;
 				}
 				/* I must go before this waiter.  Check special case. */
@@ -1270,6 +1278,75 @@ JoinWaitQueue(LOCALLOCK *locallock, LockMethod lockMethodTable, bool dontWait)
 	if (early_deadlock)
 		return PROC_WAIT_STATUS_ERROR;
 
+	/*
+	 * Fast-fail: if some holder's announced upgrade conflicts with a mode we
+	 * already hold, and its current holdMask conflicts with our request, a
+	 * future cycle is inevitable.  Don't wait deadlock_timeout for the same
+	 * conclusion.  Skipped when we are the announced upgrader, when we hold
+	 * nothing here, or when no holder has any announcement.
+	 *
+	 * This is only a fast-fail check for cycles already visible in the main
+	 * lock table.  Fast-path locks can hide edges.  For example, while waiting
+	 * for ShareUpdateExclusiveLock, this backend may still hold weaker fast-path
+	 * locks on the same relation, since ShareUpdateExclusiveLock does not itself
+	 * force fast-path transfer.  Such an edge becomes visible only later, e.g.
+	 * when another backend's AccessExclusiveLock upgrade transfers fast-path
+	 * locks.  The full deadlock detector handles that case.
+	 *
+	 * We could cheaply include modes this backend holds locally, via
+	 * LockMethodLocalHash.  That catches the direct case where this backend holds
+	 * a weak fast-path lock and waits behind an announced stronger upgrade, but
+	 * it cannot see hidden fast-path edges owned by third backends in longer
+	 * cycles.
+	 *
+	 * Making upgrade intent visible to the fast-path mechanism would catch more
+	 * cases, e.g. by treating upgradeIntent as conflicting for
+	 * ConflictsWithRelationFastPath() and keeping FastPathStrongRelationLocks
+	 * bumped until the upgrade is consumed.  However, that would pessimize the
+	 * whole concurrent operation: ordinary weak relation locks would stop using
+	 * the fast path and contend in the main lock table.
+	 *
+	 * Transferring fast-path locks from DeadLockCheck() only when needed would
+	 * avoid that penalty, but is lock-ordering-sensitive: DeadLockCheck() already
+	 * holds all lock partition LWLocks, while fast-path transfer needs
+	 * fpInfoLocks and may acquire partition LWLocks in the normal order.  A safe
+	 * design would need probe/retry logic, making it worthwhile only if hidden
+	 * longer fast-path cycles must be preempted early.
+	 */
+	if (proclock->upgradeIntent != lockmode && myHeldLocks != 0 &&
+		lock->nUpgradeIntent > 0)
+	{
+		dlist_iter	iter;
+
+		dlist_foreach(iter, &lock->procLocks)
+		{
+			PROCLOCK   *holder = dlist_container(PROCLOCK, lockLink, iter.cur);
+
+			if (holder->upgradeIntent == NoLock)
+				continue;
+
+			/* Skip self (we're in lock->procLocks too). */
+			if (holder == proclock)
+				continue;
+
+			/* Same lock group: not a cycle. */
+			if (leader != NULL && holder->groupLeader == leader)
+				continue;
+
+			/* Must the holder later wait for me? */
+			if ((lockMethodTable->conflictTab[holder->upgradeIntent] &
+				 myHeldLocks) == 0)
+				continue;
+
+			/* Must I wait for the holder now? */
+			if ((lockMethodTable->conflictTab[lockmode] &
+				 holder->holdMask) == 0)
+				continue;
+
+			return PROC_WAIT_STATUS_PREEMPTED;
+		}
+	}
+
 	/*
 	 * At this point we know that we'd really need to sleep. If we've been
 	 * commanded not to do that, bail out.
@@ -1308,8 +1385,10 @@ JoinWaitQueue(LOCALLOCK *locallock, LockMethod lockMethodTable, bool dontWait)
  *
  * Result is one of the following:
  *
- *  PROC_WAIT_STATUS_OK      - lock was granted
- *  PROC_WAIT_STATUS_ERROR   - a deadlock was detected
+ *  PROC_WAIT_STATUS_OK         - lock granted
+ *  PROC_WAIT_STATUS_ERROR      - deadlock detected
+ *  PROC_WAIT_STATUS_PREEMPTED  - canceled by another backend's DeadLockCheck
+ *                                to break a protected-upgrade cycle
  */
 ProcWaitStatus
 ProcSleep(LOCALLOCK *locallock)
@@ -1562,16 +1641,19 @@ ProcSleep(LOCALLOCK *locallock)
 
 		/*
 		 * If awoken after the deadlock check interrupt has run, increment the
-		 * lock statistics counters and if log_lock_waits is on, then report
-		 * about the wait.
+		 * lock statistics counters and, if log_lock_waits is on, report about
+		 * the wait. Also report a wait that another backend preempted before
+		 * our own deadlock timeout fired.
 		 */
-		if (deadlock_state != DS_NOT_YET_CHECKED)
+		if (deadlock_state != DS_NOT_YET_CHECKED ||
+			(myWaitStatus == PROC_WAIT_STATUS_PREEMPTED && log_lock_waits))
 		{
 			long		secs;
 			int			usecs;
 			long		msecs;
 
-			INJECTION_POINT("deadlock-timeout-fired", NULL);
+			if (deadlock_state != DS_NOT_YET_CHECKED)
+				INJECTION_POINT("deadlock-timeout-fired", NULL);
 			TimestampDifference(get_timeout_start_time(DEADLOCK_TIMEOUT),
 								GetCurrentTimestamp(),
 								&secs, &usecs);
@@ -1611,6 +1693,13 @@ ProcSleep(LOCALLOCK *locallock)
 							 (errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
 												   "Processes holding the lock: %s. Wait queue: %s.",
 												   lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
+				else if (deadlock_state == DS_PREEMPT_DEADLOCK)
+					ereport(LOG,
+							(errmsg("process %d avoided deadlock for %s on %s by canceling a blocking lock wait after %ld.%03d ms",
+									MyProcPid, modename, buf.data, msecs, usecs),
+							 (errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
+												   "Processes holding the lock: %s. Wait queue: %s.",
+												   lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
 				else if (deadlock_state == DS_HARD_DEADLOCK)
 				{
 					/*
@@ -1659,16 +1748,13 @@ ProcSleep(LOCALLOCK *locallock)
 									MyProcPid, modename, buf.data, msecs, usecs)));
 				else
 				{
-					Assert(myWaitStatus == PROC_WAIT_STATUS_ERROR);
+					Assert(myWaitStatus == PROC_WAIT_STATUS_ERROR ||
+						   myWaitStatus == PROC_WAIT_STATUS_PREEMPTED);
 
 					/*
-					 * Currently, the deadlock checker always kicks its own
-					 * process, which means that we'll only see
-					 * PROC_WAIT_STATUS_ERROR when deadlock_state ==
-					 * DS_HARD_DEADLOCK, and there's no need to print
-					 * redundant messages.  But for completeness and
-					 * future-proofing, print a message if it looks like
-					 * someone else kicked us off the lock.
+					 * Self-detected hard deadlocks log via DeadLockReport;
+					 * skip the redundant LOG.  Otherwise (e.g. preempted),
+					 * emit it here.
 					 */
 					if (deadlock_state != DS_HARD_DEADLOCK)
 						ereport(LOG,
@@ -1870,14 +1956,16 @@ CheckDeadLock(void)
 		 * Get this process out of wait state. (Note: we could do this more
 		 * efficiently by relying on lockAwaited, but use this coding to
 		 * preserve the flexibility to kill some other transaction than the
-		 * one detecting the deadlock.)
+		 * one detecting the deadlock.  DS_PREEMPT_DEADLOCK exercises that
+		 * already.)
 		 *
 		 * RemoveFromWaitQueue sets MyProc->waitStatus to
 		 * PROC_WAIT_STATUS_ERROR, so ProcSleep will report an error after we
 		 * return.
 		 */
 		Assert(MyProc->waitLock != NULL);
-		RemoveFromWaitQueue(MyProc, LockTagHashCode(&(MyProc->waitLock->tag)));
+		RemoveFromWaitQueue(MyProc, LockTagHashCode(&(MyProc->waitLock->tag)),
+							PROC_WAIT_STATUS_ERROR);
 
 		/*
 		 * We're done here.  Transaction abort caused by the error that
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index 9453a3e4932..f0a71184757 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -99,6 +99,13 @@ typedef void (*RangeVarGetRelidCallback) (const RangeVar *relation, Oid relId,
 	RangeVarGetRelidExtended(relation, lockmode, \
 							 (missing_ok) ? RVR_MISSING_OK : 0, NULL, NULL)
 
+extern Oid	RangeVarGetRelidWithUpgradeIntent(const RangeVar *relation,
+											  LOCKMODE lockmode,
+											  LOCKMODE upgradeMode,
+											  uint32 flags,
+											  RangeVarGetRelidCallback callback,
+											  void *callback_arg);
+
 extern Oid	RangeVarGetRelidExtended(const RangeVar *relation,
 									 LOCKMODE lockmode, uint32 flags,
 									 RangeVarGetRelidCallback callback,
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index 2a985ce5e15..57667432e02 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -42,6 +42,9 @@ extern void LockRelationId(LockRelId *relid, LOCKMODE lockmode);
 extern bool ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode);
 extern void UnlockRelationId(LockRelId *relid, LOCKMODE lockmode);
 extern void UnlockRelationOid(Oid relid, LOCKMODE lockmode);
+extern void LockRelationOidWithUpgradeIntent(Oid relid,
+											 LOCKMODE lockmode,
+											 LOCKMODE upgradeMode);
 
 extern void LockRelation(Relation relation, LOCKMODE lockmode);
 extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index ee3cb1dc203..cff76a7aed8 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -131,6 +131,7 @@ typedef const LockMethodData *LockMethod;
  * nRequested -- total requested locks of all types.
  * granted -- count of each lock type currently granted on the lock.
  * nGranted -- total granted locks of all types.
+ * nUpgradeIntent -- # of proclocks here with upgradeIntent set.
  *
  * Note: these counts count 1 for each backend.  Internally to a backend,
  * there may be multiple grabs on a particular lock, but this is not reflected
@@ -150,6 +151,7 @@ typedef struct LOCK
 	int			nRequested;		/* total of requested[] array */
 	int			granted[MAX_LOCKMODES]; /* counts of granted locks */
 	int			nGranted;		/* total of granted[] array */
+	int			nUpgradeIntent; /* see LOCK comment above */
 } LOCK;
 
 #define LOCK_LOCKMETHOD(lock) ((LOCKMETHODID) (lock).tag.locktag_lockmethodid)
@@ -206,6 +208,7 @@ typedef struct PROCLOCK
 	PGPROC	   *groupLeader;	/* proc's lock group leader, or proc itself */
 	LOCKMASK	holdMask;		/* bitmask for lock types currently held */
 	LOCKMASK	releaseMask;	/* bitmask for lock types to be released */
+	LOCKMODE	upgradeIntent;	/* announced future upgrade target, or NoLock */
 	dlist_node	lockLink;		/* list link in LOCK's list of proclocks */
 	dlist_node	procLink;		/* list link in PGPROC's list of proclocks */
 } PROCLOCK;
@@ -342,10 +345,22 @@ typedef enum
 	DS_NO_DEADLOCK,				/* no deadlock detected */
 	DS_SOFT_DEADLOCK,			/* deadlock avoided by queue rearrangement */
 	DS_HARD_DEADLOCK,			/* deadlock, no way out but ERROR */
+	DS_PREEMPT_DEADLOCK,		/* resolved by canceling a blocking waiter */
 	DS_BLOCKED_BY_AUTOVACUUM,	/* no deadlock; queue blocked by autovacuum
 								 * worker */
 } DeadLockState;
 
+/*
+ * Lock-wait status set by RemoveFromWaitQueue and read by the waiter on wakeup
+ */
+typedef enum
+{
+	PROC_WAIT_STATUS_OK,
+	PROC_WAIT_STATUS_WAITING,
+	PROC_WAIT_STATUS_ERROR,
+	PROC_WAIT_STATUS_PREEMPTED,
+} ProcWaitStatus;
+
 /*
  * The lockmgr's shared hash tables are partitioned to reduce contention.
  * To determine which partition a given locktag belongs to, compute the tag's
@@ -386,6 +401,7 @@ extern LockAcquireResult LockAcquire(const LOCKTAG *locktag,
 									 bool dontWait);
 extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
 											 LOCKMODE lockmode,
+											 LOCKMODE upgradeIntent,
 											 bool sessionLock,
 											 bool dontWait,
 											 bool reportMemoryError,
@@ -418,7 +434,8 @@ extern void GrantAwaitedLock(void);
 extern LOCALLOCK *GetAwaitedLock(void);
 extern void ResetAwaitedLock(void);
 
-extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode);
+extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode,
+								ProcWaitStatus newStatus);
 extern LockData *GetLockStatusData(void);
 extern BlockedProcsData *GetBlockerStatusData(int blocked_pid);
 
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 3e1d1fad5f9..9f89e587f3a 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -146,13 +146,6 @@ extern PGDLLIMPORT int FastPathLockGroupsPerBackend;
 #define DELAY_CHKPT_COMPLETE	(1<<1)
 #define DELAY_CHKPT_IN_COMMIT	(DELAY_CHKPT_START | 1<<2)
 
-typedef enum
-{
-	PROC_WAIT_STATUS_OK,
-	PROC_WAIT_STATUS_WAITING,
-	PROC_WAIT_STATUS_ERROR,
-} ProcWaitStatus;
-
 /*
  * Each backend has a PGPROC struct in shared memory.  There is also a list of
  * currently-unused PGPROC structs that will be reallocated to new backends.
diff --git a/src/test/modules/injection_points/expected/repack.out b/src/test/modules/injection_points/expected/repack.out
index b575e9052ee..9dffc603d9f 100644
--- a/src/test/modules/injection_points/expected/repack.out
+++ b/src/test/modules/injection_points/expected/repack.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 4 sessions
 
 starting permutation: wait_before_lock change_existing change_new change_subxact1 change_subxact2 check2 wakeup_before_lock check1
 injection_points_attach
@@ -111,3 +111,182 @@ injection_points_detach
                        
 (1 row)
 
+
+starting permutation: wait_before_lock begin_txn lock_table_ae s3_wakeup end_txn
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step wait_before_lock: 
+	REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey;
+ <waiting ...>
+step begin_txn: 
+	BEGIN;
+
+step lock_table_ae: 
+	LOCK TABLE repack_test IN ACCESS EXCLUSIVE MODE;
+ <waiting ...>
+step s3_wakeup: 
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step wait_before_lock: <... completed>
+step lock_table_ae: <... completed>
+step end_txn: 
+	COMMIT;
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+
+starting permutation: s2_timeout_fast wait_before_lock begin_and_read lock_table_ae s3_wakeup end_txn
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s2_timeout_fast: 
+	SET deadlock_timeout = '10ms';
+
+step wait_before_lock: 
+	REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey;
+ <waiting ...>
+step begin_and_read: 
+	BEGIN;
+	SELECT 1 FROM repack_test LIMIT 1;
+
+?column?
+--------
+       1
+(1 row)
+
+step lock_table_ae: 
+	LOCK TABLE repack_test IN ACCESS EXCLUSIVE MODE;
+ <waiting ...>
+step s3_wakeup: 
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step wait_before_lock: <... completed>
+step lock_table_ae: <... completed>
+ERROR:  deadlock detected
+step end_txn: 
+	COMMIT;
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+
+starting permutation: s2_timeout_fast wait_before_lock begin_and_read lock_table_sue s3_wakeup end_txn
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s2_timeout_fast: 
+	SET deadlock_timeout = '10ms';
+
+step wait_before_lock: 
+	REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey;
+ <waiting ...>
+step begin_and_read: 
+	BEGIN;
+	SELECT 1 FROM repack_test LIMIT 1;
+
+?column?
+--------
+       1
+(1 row)
+
+step lock_table_sue: 
+	LOCK TABLE repack_test IN SHARE UPDATE EXCLUSIVE MODE;
+ <waiting ...>
+step s3_wakeup: 
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step lock_table_sue: <... completed>
+ERROR:  deadlock detected
+step wait_before_lock: <... completed>
+step end_txn: 
+	COMMIT;
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+
+starting permutation: s2_timeout_fast wait_before_lock begin_and_read s3_begin_txn s3_lock_table_y_sue s3_lock_table_sue lock_table_y_sue s4_wakeup end_txn s3_end_txn
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step s2_timeout_fast: 
+	SET deadlock_timeout = '10ms';
+
+step wait_before_lock: 
+	REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey;
+ <waiting ...>
+step begin_and_read: 
+	BEGIN;
+	SELECT 1 FROM repack_test LIMIT 1;
+
+?column?
+--------
+       1
+(1 row)
+
+step s3_begin_txn: 
+	BEGIN;
+
+step s3_lock_table_y_sue: 
+	LOCK TABLE repack_test_y IN SHARE UPDATE EXCLUSIVE MODE;
+
+step s3_lock_table_sue: 
+	LOCK TABLE repack_test IN SHARE UPDATE EXCLUSIVE MODE;
+ <waiting ...>
+step lock_table_y_sue: 
+	LOCK TABLE repack_test_y IN SHARE UPDATE EXCLUSIVE MODE;
+ <waiting ...>
+step s4_wakeup: 
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step lock_table_y_sue: <... completed>
+step s3_lock_table_sue: <... completed>
+ERROR:  deadlock detected
+step end_txn: 
+	COMMIT;
+
+step wait_before_lock: <... completed>
+step s3_end_txn: 
+	COMMIT;
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
diff --git a/src/test/modules/injection_points/specs/repack.spec b/src/test/modules/injection_points/specs/repack.spec
index d727a9b056b..36d32964f94 100644
--- a/src/test/modules/injection_points/specs/repack.spec
+++ b/src/test/modules/injection_points/specs/repack.spec
@@ -5,6 +5,7 @@ setup
 
 	CREATE TABLE repack_test(i int PRIMARY KEY, j int);
 	INSERT INTO repack_test(i, j) VALUES (1, 1), (2, 2), (3, 3), (4, 4);
+	CREATE TABLE repack_test_y(i int);
 
 	CREATE TABLE relfilenodes(node oid);
 
@@ -15,6 +16,7 @@ setup
 teardown
 {
 	DROP TABLE repack_test;
+	DROP TABLE repack_test_y;
 	DROP EXTENSION injection_points;
 
 	DROP TABLE relfilenodes;
@@ -61,6 +63,10 @@ teardown
 }
 
 session s2
+step s2_timeout_fast
+{
+	SET deadlock_timeout = '10ms';
+}
 # Change the existing data. UPDATE changes both key and non-key columns. Also
 # update one row twice to test whether tuple version generated by this session
 # can be found.
@@ -128,6 +134,60 @@ step wakeup_before_lock
 {
 	SELECT injection_points_wakeup('repack-concurrently-before-lock');
 }
+# Steps used in lock contention tests.
+step begin_txn
+{
+	BEGIN;
+}
+step begin_and_read
+{
+	BEGIN;
+	SELECT 1 FROM repack_test LIMIT 1;
+}
+step lock_table_ae
+{
+	LOCK TABLE repack_test IN ACCESS EXCLUSIVE MODE;
+}
+step lock_table_sue
+{
+	LOCK TABLE repack_test IN SHARE UPDATE EXCLUSIVE MODE;
+}
+step lock_table_y_sue
+{
+	LOCK TABLE repack_test_y IN SHARE UPDATE EXCLUSIVE MODE;
+}
+step end_txn
+{
+	COMMIT;
+}
+
+session s3
+step s3_begin_txn
+{
+	BEGIN;
+}
+step s3_lock_table_y_sue
+{
+	LOCK TABLE repack_test_y IN SHARE UPDATE EXCLUSIVE MODE;
+}
+step s3_lock_table_sue
+{
+	LOCK TABLE repack_test IN SHARE UPDATE EXCLUSIVE MODE;
+}
+step s3_wakeup
+{
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+}
+step s3_end_txn
+{
+	COMMIT;
+}
+
+session s4
+step s4_wakeup
+{
+	SELECT injection_points_wakeup('repack-concurrently-before-lock');
+}
 
 # Test if data changes introduced while one session is performing REPACK
 # CONCURRENTLY find their way into the table.
@@ -140,3 +200,51 @@ permutation
 	check2
 	wakeup_before_lock
 	check1
+
+# A waiter that does not already hold a conflicting lock on the table is
+# waits until REPACK finishes and then acquires its lock (soft-deadlock).
+permutation
+	wait_before_lock
+	begin_txn
+	lock_table_ae
+	s3_wakeup
+	end_txn
+
+# A waiter that already holds AccessShareLock then waits for AccessExclusiveLock
+# behind REPACK's ShareUpdateExclusiveLock.
+permutation
+	s2_timeout_fast
+	wait_before_lock
+	begin_and_read
+	lock_table_ae(*)
+	s3_wakeup
+	end_txn
+
+# Same shape as above, but the waiter requests ShareUpdateExclusiveLock.
+permutation
+	s2_timeout_fast
+	wait_before_lock
+	begin_and_read
+	lock_table_sue(*)
+	s3_wakeup
+	end_txn
+
+# Three-backend future deadlock:
+#
+# - s1 holds ShareUpdateExclusiveLock on repack_test and has declared a future
+#   AccessExclusiveLock on it.
+# - s2 holds AccessShareLock on repack_test.
+# - s3 holds ShareUpdateExclusiveLock on repack_test_y, then waits for
+#   ShareUpdateExclusiveLock on repack_test behind s1.
+# - s2 waits for ShareUpdateExclusiveLock on repack_test_y behind s3.
+permutation
+	s2_timeout_fast
+	wait_before_lock
+	begin_and_read
+	s3_begin_txn
+	s3_lock_table_y_sue
+	s3_lock_table_sue
+	lock_table_y_sue(*)
+	s4_wakeup
+	end_txn
+	s3_end_txn
-- 
2.43.0

