diff --git a/src/backend/commands/discard.c b/src/backend/commands/discard.c
index 57d3d7dd9b..8da22b1b12 100644
--- a/src/backend/commands/discard.c
+++ b/src/backend/commands/discard.c
@@ -71,7 +71,7 @@ DiscardAll(bool isTopLevel)
 	ResetAllOptions();
 	DropAllPreparedStatements();
 	Async_UnlistenAll();
-	LockReleaseAll(USER_LOCKMETHOD, true);
+	LockReleaseSession(USER_LOCKMETHOD);
 	ResetPlanCache();
 	ResetTempTableNamespace();
 	ResetSequenceCaches();
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index c25af7fe09..cd23396ce4 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -289,6 +289,9 @@ static LOCALLOCK *awaitedLock;
 static ResourceOwner awaitedOwner;
 
 
+static dlist_head session_locks[lengthof(LockMethods)];
+
+
 #ifdef LOCK_DEBUG
 
 /*------
@@ -376,7 +379,8 @@ static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
 static void FinishStrongLockAcquire(void);
 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
-static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
+static void LockReassignOwner(LOCALLOCKOWNER *locallockowner,
+							  ResourceOwner parent);
 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
 						PROCLOCK *proclock, LockMethod lockMethodTable);
 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
@@ -701,7 +705,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	{
 		PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
 		LWLockRelease(partitionLock);
-		elog(WARNING, "you don't own a lock of type %s",
+		elog(PANIC, "you don't own a lock of type %s",
 			 lockMethodTable->lockModeNames[lockmode]);
 		RemoveLocalLock(locallock);
 		return false;
@@ -839,26 +843,9 @@ LockAcquireExtended(const LOCKTAG *locktag,
 		locallock->nLocks = 0;
 		locallock->holdsStrongLockCount = false;
 		locallock->lockCleared = false;
-		locallock->numLockOwners = 0;
-		locallock->maxLockOwners = 8;
-		locallock->lockOwners = NULL;	/* in case next line fails */
-		locallock->lockOwners = (LOCALLOCKOWNER *)
-			MemoryContextAlloc(TopMemoryContext,
-							   locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+		dlist_init(&locallock->locallockowners);
 	}
-	else
-	{
-		/* Make sure there will be room to remember the lock */
-		if (locallock->numLockOwners >= locallock->maxLockOwners)
-		{
-			int			newsize = locallock->maxLockOwners * 2;
 
-			locallock->lockOwners = (LOCALLOCKOWNER *)
-				repalloc(locallock->lockOwners,
-						 newsize * sizeof(LOCALLOCKOWNER));
-			locallock->maxLockOwners = newsize;
-		}
-	}
 	hashcode = locallock->hashcode;
 
 	if (locallockp)
@@ -1366,17 +1353,18 @@ CheckAndSetLockHeld(LOCALLOCK *locallock, bool acquired)
 static void
 RemoveLocalLock(LOCALLOCK *locallock)
 {
-	int			i;
+	dlist_mutable_iter iter;
 
-	for (i = locallock->numLockOwners - 1; i >= 0; i--)
+	dlist_foreach_modify(iter, &locallock->locallockowners)
 	{
-		if (locallock->lockOwners[i].owner != NULL)
-			ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
+		LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+		Assert(locallockowner->owner != NULL);
+		dlist_delete(&locallockowner->locallock_node);
+		ResourceOwnerForgetLock(locallockowner->owner, locallockowner);
 	}
-	locallock->numLockOwners = 0;
-	if (locallock->lockOwners != NULL)
-		pfree(locallock->lockOwners);
-	locallock->lockOwners = NULL;
+
+	Assert(dlist_is_empty(&locallock->locallockowners));
 
 	if (locallock->holdsStrongLockCount)
 	{
@@ -1394,7 +1382,7 @@ RemoveLocalLock(LOCALLOCK *locallock)
 	if (!hash_search(LockMethodLocalHash,
 					 (void *) &(locallock->tag),
 					 HASH_REMOVE, NULL))
-		elog(WARNING, "locallock table corrupted");
+		elog(PANIC, "locallock table corrupted");
 
 	/*
 	 * Indicate that the lock is released for certain types of locks
@@ -1688,26 +1676,40 @@ CleanUpLock(LOCK *lock, PROCLOCK *proclock,
 static void
 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
 {
-	LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
-	int			i;
+	LOCALLOCKOWNER *locallockowner;
+	dlist_iter iter;
 
-	Assert(locallock->numLockOwners < locallock->maxLockOwners);
 	/* Count the total */
 	locallock->nLocks++;
+
 	/* Count the per-owner lock */
-	for (i = 0; i < locallock->numLockOwners; i++)
+	dlist_foreach(iter, &locallock->locallockowners)
 	{
-		if (lockOwners[i].owner == owner)
+		locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+		if (locallockowner->owner == owner)
 		{
-			lockOwners[i].nLocks++;
+			locallockowner->nLocks++;
 			return;
 		}
 	}
-	lockOwners[i].owner = owner;
-	lockOwners[i].nLocks = 1;
-	locallock->numLockOwners++;
+
+	locallockowner = MemoryContextAlloc(TopMemoryContext, sizeof(LOCALLOCKOWNER));
+	locallockowner->owner = owner;
+	locallockowner->nLocks = 1;
+	locallockowner->locallock = locallock;
+
+	dlist_push_tail(&locallock->locallockowners, &locallockowner->locallock_node);
+
 	if (owner != NULL)
-		ResourceOwnerRememberLock(owner, locallock);
+		ResourceOwnerRememberLock(owner, locallockowner);
+	else
+	{
+		LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallockowner->locallock);
+
+		Assert(lockmethodid > 0 && lockmethodid <= 2);
+		dlist_push_tail(&session_locks[lockmethodid - 1], &locallockowner->resowner_node);
+	}
 
 	/* Indicate that the lock is acquired for certain types of locks. */
 	CheckAndSetLockHeld(locallock, true);
@@ -2021,9 +2023,9 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	 * Decrease the count for the resource owner.
 	 */
 	{
-		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 		ResourceOwner owner;
-		int			i;
+		dlist_iter iter;
+		bool		found = false;
 
 		/* Identify owner for lock */
 		if (sessionLock)
@@ -2031,24 +2033,29 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 		else
 			owner = CurrentResourceOwner;
 
-		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		dlist_foreach(iter, &locallock->locallockowners)
 		{
-			if (lockOwners[i].owner == owner)
+			LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+
+			if (locallockowner->owner != owner)
+				continue;
+
+			found = true;
+
+			if (--locallockowner->nLocks == 0)
 			{
-				Assert(lockOwners[i].nLocks > 0);
-				if (--lockOwners[i].nLocks == 0)
-				{
-					if (owner != NULL)
-						ResourceOwnerForgetLock(owner, locallock);
-					/* compact out unused slot */
-					locallock->numLockOwners--;
-					if (i < locallock->numLockOwners)
-						lockOwners[i] = lockOwners[locallock->numLockOwners];
-				}
-				break;
+				dlist_delete(&locallockowner->locallock_node);
+
+				if (owner != NULL)
+					ResourceOwnerForgetLock(owner, locallockowner);
+				else
+					dlist_delete(&locallockowner->resowner_node);
 			}
+
+			Assert(locallockowner->nLocks >= 0);
 		}
-		if (i < 0)
+
+		if (!found)
 		{
 			/* don't release a lock belonging to another owner */
 			elog(WARNING, "you don't own a lock of type %s",
@@ -2066,6 +2073,8 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	if (locallock->nLocks > 0)
 		return true;
 
+	Assert(locallock->nLocks >= 0);
+
 	/*
 	 * At this point we can no longer suppose we are clear of invalidation
 	 * messages related to this lock.  Although we'll delete the LOCALLOCK
@@ -2147,7 +2156,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	{
 		PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
 		LWLockRelease(partitionLock);
-		elog(WARNING, "you don't own a lock of type %s",
+		elog(PANIC, "you don't own a lock of type %s",
 			 lockMethodTable->lockModeNames[lockmode]);
 		RemoveLocalLock(locallock);
 		return false;
@@ -2168,283 +2177,44 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
 	return true;
 }
 
+#ifdef USE_ASSERT_CHECKING
 /*
- * LockReleaseAll -- Release all locks of the specified lock method that
- *		are held by the current process.
- *
- * Well, not necessarily *all* locks.  The available behaviors are:
- *		allLocks == true: release all locks including session locks.
- *		allLocks == false: release all non-session locks.
+ * LockAssertNoneHeld -- Assert that we no longer hold any DEFAULT_LOCKMETHOD
+ * locks during an abort.
  */
-void
-LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
+extern void
+LockAssertNoneHeld(bool isCommit)
 {
 	HASH_SEQ_STATUS status;
-	LockMethod	lockMethodTable;
-	int			i,
-				numLockModes;
 	LOCALLOCK  *locallock;
-	LOCK	   *lock;
-	PROCLOCK   *proclock;
-	int			partition;
-	bool		have_fast_path_lwlock = false;
-
-	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
-		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
-	lockMethodTable = LockMethods[lockmethodid];
-
-#ifdef LOCK_DEBUG
-	if (*(lockMethodTable->trace_flag))
-		elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
-#endif
-
-	/*
-	 * Get rid of our fast-path VXID lock, if appropriate.  Note that this is
-	 * the only way that the lock we hold on our own VXID can ever get
-	 * released: it is always and only released when a toplevel transaction
-	 * ends.
-	 */
-	if (lockmethodid == DEFAULT_LOCKMETHOD)
-		VirtualXactLockTableCleanup();
-
-	numLockModes = lockMethodTable->numLockModes;
-
-	/*
-	 * First we run through the locallock table and get rid of unwanted
-	 * entries, then we scan the process's proclocks and get rid of those. We
-	 * do this separately because we may have multiple locallock entries
-	 * pointing to the same proclock, and we daren't end up with any dangling
-	 * pointers.  Fast-path locks are cleaned up during the locallock table
-	 * scan, though.
-	 */
-	hash_seq_init(&status, LockMethodLocalHash);
 
-	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+	if (!isCommit)
 	{
-		/*
-		 * If the LOCALLOCK entry is unused, we must've run out of shared
-		 * memory while trying to set up this lock.  Just forget the local
-		 * entry.
-		 */
-		if (locallock->nLocks == 0)
-		{
-			RemoveLocalLock(locallock);
-			continue;
-		}
-
-		/* Ignore items that are not of the lockmethod to be removed */
-		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
-			continue;
+		hash_seq_init(&status, LockMethodLocalHash);
 
-		/*
-		 * If we are asked to release all locks, we can just zap the entry.
-		 * Otherwise, must scan to see if there are session locks. We assume
-		 * there is at most one lockOwners entry for session locks.
-		 */
-		if (!allLocks)
+		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 		{
-			LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+			dlist_iter local_iter;
 
-			/* If session lock is above array position 0, move it down to 0 */
-			for (i = 0; i < locallock->numLockOwners; i++)
-			{
-				if (lockOwners[i].owner == NULL)
-					lockOwners[0] = lockOwners[i];
-				else
-					ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
-			}
+			Assert(locallock->nLocks >= 0);
 
-			if (locallock->numLockOwners > 0 &&
-				lockOwners[0].owner == NULL &&
-				lockOwners[0].nLocks > 0)
+			dlist_foreach(local_iter, &locallock->locallockowners)
 			{
-				/* Fix the locallock to show just the session locks */
-				locallock->nLocks = lockOwners[0].nLocks;
-				locallock->numLockOwners = 1;
-				/* We aren't deleting this locallock, so done */
-				continue;
-			}
-			else
-				locallock->numLockOwners = 0;
-		}
-
-		/*
-		 * If the lock or proclock pointers are NULL, this lock was taken via
-		 * the relation fast-path (and is not known to have been transferred).
-		 */
-		if (locallock->proclock == NULL || locallock->lock == NULL)
-		{
-			LOCKMODE	lockmode = locallock->tag.mode;
-			Oid			relid;
+				LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+																 locallock_node,
+																 local_iter.cur);
 
-			/* Verify that a fast-path lock is what we've got. */
-			if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
-				elog(PANIC, "locallock table corrupted");
-
-			/*
-			 * If we don't currently hold the LWLock that protects our
-			 * fast-path data structures, we must acquire it before attempting
-			 * to release the lock via the fast-path.  We will continue to
-			 * hold the LWLock until we're done scanning the locallock table,
-			 * unless we hit a transferred fast-path lock.  (XXX is this
-			 * really such a good idea?  There could be a lot of entries ...)
-			 */
-			if (!have_fast_path_lwlock)
-			{
-				LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE);
-				have_fast_path_lwlock = true;
-			}
+				Assert(locallockowner->owner == NULL);
 
-			/* Attempt fast-path release. */
-			relid = locallock->tag.lock.locktag_field2;
-			if (FastPathUnGrantRelationLock(relid, lockmode))
-			{
-				RemoveLocalLock(locallock);
-				continue;
+				if (locallockowner->nLocks > 0 &&
+					LOCALLOCK_LOCKMETHOD(*locallock) == DEFAULT_LOCKMETHOD)
+					Assert(false);
 			}
-
-			/*
-			 * Our lock, originally taken via the fast path, has been
-			 * transferred to the main lock table.  That's going to require
-			 * some extra work, so release our fast-path lock before starting.
-			 */
-			LWLockRelease(&MyProc->fpInfoLock);
-			have_fast_path_lwlock = false;
-
-			/*
-			 * Now dump the lock.  We haven't got a pointer to the LOCK or
-			 * PROCLOCK in this case, so we have to handle this a bit
-			 * differently than a normal lock release.  Unfortunately, this
-			 * requires an extra LWLock acquire-and-release cycle on the
-			 * partitionLock, but hopefully it shouldn't happen often.
-			 */
-			LockRefindAndRelease(lockMethodTable, MyProc,
-								 &locallock->tag.lock, lockmode, false);
-			RemoveLocalLock(locallock);
-			continue;
 		}
-
-		/* Mark the proclock to show we need to release this lockmode */
-		if (locallock->nLocks > 0)
-			locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
-
-		/* And remove the locallock hashtable entry */
-		RemoveLocalLock(locallock);
 	}
-
-	/* Done with the fast-path data structures */
-	if (have_fast_path_lwlock)
-		LWLockRelease(&MyProc->fpInfoLock);
-
-	/*
-	 * Now, scan each lock partition separately.
-	 */
-	for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
-	{
-		LWLock	   *partitionLock;
-		SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
-		PROCLOCK   *nextplock;
-
-		partitionLock = LockHashPartitionLockByIndex(partition);
-
-		/*
-		 * If the proclock list for this partition is empty, we can skip
-		 * acquiring the partition lock.  This optimization is trickier than
-		 * it looks, because another backend could be in process of adding
-		 * something to our proclock list due to promoting one of our
-		 * fast-path locks.  However, any such lock must be one that we
-		 * decided not to delete above, so it's okay to skip it again now;
-		 * we'd just decide not to delete it again.  We must, however, be
-		 * careful to re-fetch the list header once we've acquired the
-		 * partition lock, to be sure we have a valid, up-to-date pointer.
-		 * (There is probably no significant risk if pointer fetch/store is
-		 * atomic, but we don't wish to assume that.)
-		 *
-		 * XXX This argument assumes that the locallock table correctly
-		 * represents all of our fast-path locks.  While allLocks mode
-		 * guarantees to clean up all of our normal locks regardless of the
-		 * locallock situation, we lose that guarantee for fast-path locks.
-		 * This is not ideal.
-		 */
-		if (SHMQueueNext(procLocks, procLocks,
-						 offsetof(PROCLOCK, procLink)) == NULL)
-			continue;			/* needn't examine this partition */
-
-		LWLockAcquire(partitionLock, LW_EXCLUSIVE);
-
-		for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
-												  offsetof(PROCLOCK, procLink));
-			 proclock;
-			 proclock = nextplock)
-		{
-			bool		wakeupNeeded = false;
-
-			/* Get link first, since we may unlink/delete this proclock */
-			nextplock = (PROCLOCK *)
-				SHMQueueNext(procLocks, &proclock->procLink,
-							 offsetof(PROCLOCK, procLink));
-
-			Assert(proclock->tag.myProc == MyProc);
-
-			lock = proclock->tag.myLock;
-
-			/* Ignore items that are not of the lockmethod to be removed */
-			if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
-				continue;
-
-			/*
-			 * In allLocks mode, force release of all locks even if locallock
-			 * table had problems
-			 */
-			if (allLocks)
-				proclock->releaseMask = proclock->holdMask;
-			else
-				Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
-
-			/*
-			 * Ignore items that have nothing to be released, unless they have
-			 * holdMask == 0 and are therefore recyclable
-			 */
-			if (proclock->releaseMask == 0 && proclock->holdMask != 0)
-				continue;
-
-			PROCLOCK_PRINT("LockReleaseAll", proclock);
-			LOCK_PRINT("LockReleaseAll", lock, 0);
-			Assert(lock->nRequested >= 0);
-			Assert(lock->nGranted >= 0);
-			Assert(lock->nGranted <= lock->nRequested);
-			Assert((proclock->holdMask & ~lock->grantMask) == 0);
-
-			/*
-			 * Release the previously-marked lock modes
-			 */
-			for (i = 1; i <= numLockModes; i++)
-			{
-				if (proclock->releaseMask & LOCKBIT_ON(i))
-					wakeupNeeded |= UnGrantLock(lock, i, proclock,
-												lockMethodTable);
-			}
-			Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
-			Assert(lock->nGranted <= lock->nRequested);
-			LOCK_PRINT("LockReleaseAll: updated", lock, 0);
-
-			proclock->releaseMask = 0;
-
-			/* CleanUpLock will wake up waiters if needed. */
-			CleanUpLock(lock, proclock,
-						lockMethodTable,
-						LockTagHashCode(&lock->tag),
-						wakeupNeeded);
-		}						/* loop over PROCLOCKs within this partition */
-
-		LWLockRelease(partitionLock);
-	}							/* loop over partitions */
-
-#ifdef LOCK_DEBUG
-	if (*(lockMethodTable->trace_flag))
-		elog(LOG, "LockReleaseAll done");
-#endif
+	Assert(MyProc->fpLockBits == 0);
 }
+#endif
 
 /*
  * LockReleaseSession -- Release all session locks of the specified lock method
@@ -2453,22 +2223,21 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
 void
 LockReleaseSession(LOCKMETHODID lockmethodid)
 {
-	HASH_SEQ_STATUS status;
-	LOCALLOCK  *locallock;
+	dlist_mutable_iter iter;
 
 	if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
 		elog(ERROR, "unrecognized lock method: %d", lockmethodid);
 
-	hash_seq_init(&status, LockMethodLocalHash);
-
-	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+	dlist_foreach_modify(iter, &session_locks[lockmethodid - 1])
 	{
-		/* Ignore items that are not of the specified lock method */
-		if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
-			continue;
+		LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+		Assert(LOCALLOCK_LOCKMETHOD(*locallockowner->locallock) == lockmethodid);
 
-		ReleaseLockIfHeld(locallock, true);
+		ReleaseLockIfHeld(locallockowner->locallock, true);
 	}
+
+	Assert(dlist_is_empty(&session_locks[lockmethodid - 1]));
 }
 
 /*
@@ -2480,26 +2249,12 @@ LockReleaseSession(LOCKMETHODID lockmethodid)
  * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
  * table to find them.
  */
-void
-LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+extern void
+LockReleaseCurrentOwner(ResourceOwner owner, dlist_node *resowner_node)
 {
-	if (locallocks == NULL)
-	{
-		HASH_SEQ_STATUS status;
-		LOCALLOCK  *locallock;
-
-		hash_seq_init(&status, LockMethodLocalHash);
+	LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, resowner_node);
 
-		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
-			ReleaseLockIfHeld(locallock, false);
-	}
-	else
-	{
-		int			i;
-
-		for (i = nlocks - 1; i >= 0; i--)
-			ReleaseLockIfHeld(locallocks[i], false);
-	}
+	ReleaseLockIfHeld(locallockowner->locallock, false);
 }
 
 /*
@@ -2519,8 +2274,7 @@ static void
 ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
 {
 	ResourceOwner owner;
-	LOCALLOCKOWNER *lockOwners;
-	int			i;
+	dlist_iter  iter;
 
 	/* Identify owner for lock (must match LockRelease!) */
 	if (sessionLock)
@@ -2529,39 +2283,49 @@ ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
 		owner = CurrentResourceOwner;
 
 	/* Scan to see if there are any locks belonging to the target owner */
-	lockOwners = locallock->lockOwners;
-	for (i = locallock->numLockOwners - 1; i >= 0; i--)
+	dlist_foreach(iter, &locallock->locallockowners)
 	{
-		if (lockOwners[i].owner == owner)
+		LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
+		LOCALLOCK *locallock = locallockowner->locallock;
+
+		if (locallockowner->owner != owner)
+			continue;
+
+		/* release all references to the lock by this resource owner */
+
+		if (sessionLock)
+			Assert(locallockowner->owner == NULL);
+		else
+			Assert(locallockowner->owner != NULL);
+
+		/*
+		 * We will still hold this lock after forgetting this
+		 * ResourceOwner.
+		 */
+		if (locallockowner->nLocks < locallock->nLocks)
 		{
-			Assert(lockOwners[i].nLocks > 0);
-			if (lockOwners[i].nLocks < locallock->nLocks)
-			{
-				/*
-				 * We will still hold this lock after forgetting this
-				 * ResourceOwner.
-				 */
-				locallock->nLocks -= lockOwners[i].nLocks;
-				/* compact out unused slot */
-				locallock->numLockOwners--;
-				if (owner != NULL)
-					ResourceOwnerForgetLock(owner, locallock);
-				if (i < locallock->numLockOwners)
-					lockOwners[i] = lockOwners[locallock->numLockOwners];
-			}
+			locallock->nLocks -= locallockowner->nLocks;
+			Assert(locallock->nLocks >= 0);
+			dlist_delete(&locallockowner->locallock_node);
+
+			if (sessionLock)
+				dlist_delete(&locallockowner->resowner_node);
 			else
-			{
-				Assert(lockOwners[i].nLocks == locallock->nLocks);
-				/* We want to call LockRelease just once */
-				lockOwners[i].nLocks = 1;
-				locallock->nLocks = 1;
-				if (!LockRelease(&locallock->tag.lock,
-								 locallock->tag.mode,
-								 sessionLock))
-					elog(WARNING, "ReleaseLockIfHeld: failed??");
-			}
-			break;
+				ResourceOwnerForgetLock(owner, locallockowner);
+		}
+		else
+		{
+			Assert(locallockowner->nLocks == locallock->nLocks);
+			/* We want to call LockRelease just once */
+			locallockowner->nLocks = 1;
+			locallock->nLocks = 1;
+
+			if (!LockRelease(&locallock->tag.lock,
+							 locallock->tag.mode,
+							 sessionLock))
+				elog(PANIC, "ReleaseLockIfHeld: failed??");
 		}
+		break;
 	}
 }
 
@@ -2576,29 +2340,12 @@ ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
  * and we'll traverse through our hash table to find them.
  */
 void
-LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
+LockReassignCurrentOwner(ResourceOwner owner, dlist_node *resowner_node)
 {
+	LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER, resowner_node, resowner_node);
 	ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
 
-	Assert(parent != NULL);
-
-	if (locallocks == NULL)
-	{
-		HASH_SEQ_STATUS status;
-		LOCALLOCK  *locallock;
-
-		hash_seq_init(&status, LockMethodLocalHash);
-
-		while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
-			LockReassignOwner(locallock, parent);
-	}
-	else
-	{
-		int			i;
-
-		for (i = nlocks - 1; i >= 0; i--)
-			LockReassignOwner(locallocks[i], parent);
-	}
+	LockReassignOwner(locallockowner, parent);
 }
 
 /*
@@ -2606,45 +2353,33 @@ LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
  * CurrentResourceOwner to its parent.
  */
 static void
-LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
+LockReassignOwner(LOCALLOCKOWNER *locallockowner, ResourceOwner parent)
 {
-	LOCALLOCKOWNER *lockOwners;
-	int			i;
-	int			ic = -1;
-	int			ip = -1;
+	dlist_iter iter;
+	LOCALLOCK *locallock = locallockowner->locallock;
 
-	/*
-	 * Scan to see if there are any locks belonging to current owner or its
-	 * parent
-	 */
-	lockOwners = locallock->lockOwners;
-	for (i = locallock->numLockOwners - 1; i >= 0; i--)
+	ResourceOwnerForgetLock(CurrentResourceOwner, locallockowner);
+
+	dlist_foreach(iter, &locallock->locallockowners)
 	{
-		if (lockOwners[i].owner == CurrentResourceOwner)
-			ic = i;
-		else if (lockOwners[i].owner == parent)
-			ip = i;
-	}
+		LOCALLOCKOWNER *parentlocalowner = dlist_container(LOCALLOCKOWNER, locallock_node, iter.cur);
 
-	if (ic < 0)
-		return;					/* no current locks */
+		Assert(parentlocalowner->locallock == locallock);
 
-	if (ip < 0)
-	{
-		/* Parent has no slot, so just give it the child's slot */
-		lockOwners[ic].owner = parent;
-		ResourceOwnerRememberLock(parent, locallock);
-	}
-	else
-	{
-		/* Merge child's count with parent's */
-		lockOwners[ip].nLocks += lockOwners[ic].nLocks;
-		/* compact out unused slot */
-		locallock->numLockOwners--;
-		if (ic < locallock->numLockOwners)
-			lockOwners[ic] = lockOwners[locallock->numLockOwners];
+		if (parentlocalowner->owner != parent)
+			continue;
+
+		parentlocalowner->nLocks += locallockowner->nLocks;
+
+		locallockowner->nLocks = 0;
+		dlist_delete(&locallockowner->locallock_node);
+		pfree(locallockowner);
+		return;
 	}
-	ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
+
+	/* reassign locallockowner to parent resowner */
+	locallockowner->owner = parent;
+	ResourceOwnerRememberLock(parent, locallockowner);
 }
 
 /*
@@ -3178,7 +2913,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
 	{
 		PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
 		LWLockRelease(partitionLock);
-		elog(WARNING, "you don't own a lock of type %s",
+		elog(PANIC, "you don't own a lock of type %s",
 			 lockMethodTable->lockModeNames[lockmode]);
 		return;
 	}
@@ -3259,10 +2994,9 @@ CheckForSessionAndXactLocks(void)
 
 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 	{
-		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 		PerLockTagEntry *hentry;
 		bool		found;
-		int			i;
+		dlist_iter iter;
 
 		/*
 		 * Ignore VXID locks.  We don't want those to be held by prepared
@@ -3283,9 +3017,13 @@ CheckForSessionAndXactLocks(void)
 			hentry->sessLock = hentry->xactLock = false;
 
 		/* Scan to see if we hold lock at session or xact level or both */
-		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		dlist_foreach(iter, &locallock->locallockowners)
 		{
-			if (lockOwners[i].owner == NULL)
+			LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+															 locallock_node,
+															 iter.cur);
+
+			if (locallockowner->owner == NULL)
 				hentry->sessLock = true;
 			else
 				hentry->xactLock = true;
@@ -3332,10 +3070,9 @@ AtPrepare_Locks(void)
 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 	{
 		TwoPhaseLockRecord record;
-		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 		bool		haveSessionLock;
 		bool		haveXactLock;
-		int			i;
+		dlist_iter iter;
 
 		/*
 		 * Ignore VXID locks.  We don't want those to be held by prepared
@@ -3350,9 +3087,13 @@ AtPrepare_Locks(void)
 
 		/* Scan to see whether we hold it at session or transaction level */
 		haveSessionLock = haveXactLock = false;
-		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		dlist_foreach(iter, &locallock->locallockowners)
 		{
-			if (lockOwners[i].owner == NULL)
+			LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+															 locallock_node,
+															 iter.cur);
+
+			if (locallockowner->owner == NULL)
 				haveSessionLock = true;
 			else
 				haveXactLock = true;
@@ -3444,10 +3185,9 @@ PostPrepare_Locks(TransactionId xid)
 
 	while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
 	{
-		LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
 		bool		haveSessionLock;
 		bool		haveXactLock;
-		int			i;
+		dlist_iter iter;
 
 		if (locallock->proclock == NULL || locallock->lock == NULL)
 		{
@@ -3466,9 +3206,13 @@ PostPrepare_Locks(TransactionId xid)
 
 		/* Scan to see whether we hold it at session or transaction level */
 		haveSessionLock = haveXactLock = false;
-		for (i = locallock->numLockOwners - 1; i >= 0; i--)
+		dlist_foreach(iter, &locallock->locallockowners)
 		{
-			if (lockOwners[i].owner == NULL)
+			LOCALLOCKOWNER *locallockowner = dlist_container(LOCALLOCKOWNER,
+															 locallock_node,
+															 iter.cur);
+
+			if (locallockowner->owner == NULL)
 				haveSessionLock = true;
 			else
 				haveXactLock = true;
@@ -3599,6 +3343,7 @@ PostPrepare_Locks(TransactionId xid)
 	}							/* loop over partitions */
 
 	END_CRIT_SECTION();
+
 }
 
 
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index d1d3cd0dc8..498722dfdd 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -778,10 +778,17 @@ ProcReleaseLocks(bool isCommit)
 		return;
 	/* If waiting, get off wait queue (should only be needed after error) */
 	LockErrorCleanup();
-	/* Release standard locks, including session-level if aborting */
-	LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
-	/* Release transaction-level advisory locks */
-	LockReleaseAll(USER_LOCKMETHOD, false);
+
+	VirtualXactLockTableCleanup();
+
+	/* Release session-level locks if aborting */
+	if (!isCommit)
+		LockReleaseSession(DEFAULT_LOCKMETHOD);
+
+#ifdef USE_ASSERT_CHECKING
+	/* Ensure all locks were released */
+	LockAssertNoneHeld(isCommit);
+#endif
 }
 
 
@@ -870,6 +877,8 @@ ProcKill(int code, Datum arg)
 		LWLockRelease(leader_lwlock);
 	}
 
+	Assert(MyProc->fpLockBits == 0);
+
 	/*
 	 * Reset MyLatch to the process local one.  This is so that signal
 	 * handlers et al can continue using the latch after the shared latch
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 7292e51f7d..f9092c65c1 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -1176,7 +1176,7 @@ ShutdownPostgres(int code, Datum arg)
 	 * User locks are not released by transaction end, so be sure to release
 	 * them explicitly.
 	 */
-	LockReleaseAll(USER_LOCKMETHOD, true);
+	LockReleaseSession(USER_LOCKMETHOD);
 }
 
 
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index e24f00f060..0d40d5a7e7 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -33,6 +33,7 @@
 #include "utils/resowner_private.h"
 #include "utils/snapmgr.h"
 
+#include "lib/ilist.h"
 
 /*
  * All resource IDs managed by this code are required to fit into a Datum,
@@ -133,9 +134,7 @@ typedef struct ResourceOwnerData
 	ResourceArray cryptohasharr;	/* cryptohash contexts */
 	ResourceArray hmacarr;		/* HMAC contexts */
 
-	/* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
-	int			nlocks;			/* number of owned locks */
-	LOCALLOCK  *locks[MAX_RESOWNER_LOCKS];	/* list of owned locks */
+	dlist_head  locks;
 }			ResourceOwnerData;
 
 
@@ -452,6 +451,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
 	ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
 	ResourceArrayInit(&(owner->cryptohasharr), PointerGetDatum(NULL));
 	ResourceArrayInit(&(owner->hmacarr), PointerGetDatum(NULL));
+	dlist_init(&owner->locks);
 
 	return owner;
 }
@@ -585,50 +585,39 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 	}
 	else if (phase == RESOURCE_RELEASE_LOCKS)
 	{
+		dlist_mutable_iter iter;
+
 		if (isTopLevel)
 		{
-			/*
-			 * For a top-level xact we are going to release all locks (or at
-			 * least all non-session locks), so just do a single lmgr call at
-			 * the top of the recursion.
-			 */
+			dlist_foreach_modify(iter, &owner->locks)
+			{
+				LockReleaseCurrentOwner(owner, iter.cur);
+			}
+			Assert(dlist_is_empty(&owner->locks));
+
 			if (owner == TopTransactionResourceOwner)
 			{
 				ProcReleaseLocks(isCommit);
+
 				ReleasePredicateLocks(isCommit, false);
 			}
 		}
 		else
 		{
-			/*
-			 * Release locks retail.  Note that if we are committing a
-			 * subtransaction, we do NOT release its locks yet, but transfer
-			 * them to the parent.
-			 */
-			LOCALLOCK **locks;
-			int			nlocks;
-
-			Assert(owner->parent != NULL);
-
-			/*
-			 * Pass the list of locks owned by this resource owner to the lock
-			 * manager, unless it has overflowed.
-			 */
-			if (owner->nlocks > MAX_RESOWNER_LOCKS)
+			if (isCommit)
 			{
-				locks = NULL;
-				nlocks = 0;
+				dlist_foreach_modify(iter, &owner->locks)
+					LockReassignCurrentOwner(owner, iter.cur);
+
+				Assert(dlist_is_empty(&owner->locks));
 			}
 			else
 			{
-				locks = owner->locks;
-				nlocks = owner->nlocks;
-			}
+				dlist_foreach_modify(iter, &owner->locks)
+					LockReleaseCurrentOwner(owner, iter.cur);
 
-			if (isCommit)
-				LockReassignCurrentOwner(locks, nlocks);
-			else
-				LockReleaseCurrentOwner(locks, nlocks);
+				Assert(dlist_is_empty(&owner->locks));
+			}
 		}
 	}
 	else if (phase == RESOURCE_RELEASE_AFTER_LOCKS)
@@ -752,7 +741,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 	Assert(owner->jitarr.nitems == 0);
 	Assert(owner->cryptohasharr.nitems == 0);
 	Assert(owner->hmacarr.nitems == 0);
-	Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
+	Assert(dlist_is_empty(&owner->locks));
 
 	/*
 	 * Delete children.  The recursive call will delink the child from me, so
@@ -983,45 +972,56 @@ ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
  * the entry.
  */
 void
-ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
 {
-	Assert(locallock != NULL);
-
-	if (owner->nlocks > MAX_RESOWNER_LOCKS)
-		return;					/* we have already overflowed */
+	Assert(owner != NULL);
+	Assert(locallockowner != NULL);
 
-	if (owner->nlocks < MAX_RESOWNER_LOCKS)
-		owner->locks[owner->nlocks] = locallock;
-	else
+#ifdef USE_ASSERT_CHECKING
 	{
-		/* overflowed */
+		dlist_iter iter;
+
+		dlist_foreach(iter, &owner->locks)
+		{
+			LOCALLOCKOWNER *i = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+			Assert(i->locallock != locallockowner->locallock);
+		}
 	}
-	owner->nlocks++;
+#endif
+
+	dlist_push_tail(&owner->locks, &locallockowner->resowner_node);
 }
 
 /*
  * Forget that a Local Lock is owned by a ResourceOwner
  */
 void
-ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock)
+ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCKOWNER *locallockowner)
 {
-	int			i;
-
-	if (owner->nlocks > MAX_RESOWNER_LOCKS)
-		return;					/* we have overflowed */
+	Assert(owner != NULL);
+	Assert(locallockowner != NULL);
 
-	Assert(owner->nlocks > 0);
-	for (i = owner->nlocks - 1; i >= 0; i--)
+#ifdef USE_ASSERT_CHECKING
 	{
-		if (locallock == owner->locks[i])
+		dlist_iter iter;
+		bool found = false;
+
+		dlist_foreach(iter, &owner->locks)
 		{
-			owner->locks[i] = owner->locks[owner->nlocks - 1];
-			owner->nlocks--;
-			return;
+			LOCALLOCKOWNER *i = dlist_container(LOCALLOCKOWNER, resowner_node, iter.cur);
+
+			if (locallockowner == i)
+			{
+				Assert(!found);
+				found = true;
+			}
 		}
+
+		Assert(found);
 	}
-	elog(ERROR, "lock reference %p is not owned by resource owner %s",
-		 locallock, owner->name);
+#endif
+	dlist_delete(&locallockowner->resowner_node);
 }
 
 /*
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index a5286fab89..b656103d26 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -23,6 +23,7 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "utils/timestamp.h"
+#include "lib/ilist.h"
 
 /* struct PGPROC is declared in proc.h, but must forward-reference it */
 typedef struct PGPROC PGPROC;
@@ -412,6 +413,13 @@ typedef struct LOCALLOCKOWNER
 	 * Must use a forward struct reference to avoid circularity.
 	 */
 	struct ResourceOwnerData *owner;
+
+	dlist_node  resowner_node;
+
+	dlist_node  locallock_node;
+
+	struct LOCALLOCK *locallock;
+
 	int64		nLocks;			/* # of times held by this owner */
 } LOCALLOCKOWNER;
 
@@ -425,9 +433,9 @@ typedef struct LOCALLOCK
 	LOCK	   *lock;			/* associated LOCK object, if any */
 	PROCLOCK   *proclock;		/* associated PROCLOCK object, if any */
 	int64		nLocks;			/* total number of times lock is held */
-	int			numLockOwners;	/* # of relevant ResourceOwners */
-	int			maxLockOwners;	/* allocated size of array */
-	LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
+
+	dlist_head locallockowners;
+
 	bool		holdsStrongLockCount;	/* bumped FastPathStrongRelationLocks */
 	bool		lockCleared;	/* we read all sinval msgs for lock */
 } LOCALLOCK;
@@ -556,10 +564,15 @@ extern void AbortStrongLockAcquire(void);
 extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
 						LOCKMODE lockmode, bool sessionLock);
-extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
+
+#ifdef USE_ASSERT_CHECKING
+extern void LockAssertNoneHeld(bool isCommit);
+#endif
+
 extern void LockReleaseSession(LOCKMETHODID lockmethodid);
-extern void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks);
-extern void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks);
+struct ResourceOwnerData;
+extern void LockReleaseCurrentOwner(struct ResourceOwnerData *owner, dlist_node *resowner_node);
+extern void LockReassignCurrentOwner(struct ResourceOwnerData *owner, dlist_node *resowner_node);
 extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
 #ifdef USE_ASSERT_CHECKING
 extern HTAB *GetLockMethodLocalHash(void);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index 6dafc87e28..d23d8c4c7c 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -31,8 +31,8 @@ extern void ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer);
 extern void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer);
 
 /* support for local lock management */
-extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock);
-extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);
+extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCKOWNER *locallock);
+extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCKOWNER *locallock);
 
 /* support for catcache refcount management */
 extern void ResourceOwnerEnlargeCatCacheRefs(ResourceOwner owner);
