From bbd62ffc517ff0791c8ee215e5ec11e44f703f3e Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Sat, 7 Mar 2020 09:24:33 +0530
Subject: [PATCH v3 1/3] Add assert to check that we should not acquire any
 other lock if we are already holding the relation extension lock.  Only
 exception is that if we are trying to acquire the relation extension lock
 then we can hold the same lock.

---
 src/backend/access/transam/xact.c | 15 +++++++++
 src/backend/storage/lmgr/lmgr.c   | 17 ++++++++++-
 src/backend/storage/lmgr/lock.c   | 64 +++++++++++++++++++++++++++++++++++++++
 src/include/storage/lock.h        |  4 +++
 4 files changed, 99 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e3c60f2..ca64712 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2267,6 +2267,9 @@ CommitTransaction(void)
 	XactTopFullTransactionId = InvalidFullTransactionId;
 	nParallelCurrentXids = 0;
 
+	/* Reset the relation extension lock held count. */
+	ResetRelExtLockHeldCount();
+
 	/*
 	 * done with commit processing, set current transaction state back to
 	 * default
@@ -2735,6 +2738,9 @@ AbortTransaction(void)
 		pgstat_report_xact_timestamp(0);
 	}
 
+	/* Reset the relation extension lock held count. */
+	ResetRelExtLockHeldCount();
+
 	/*
 	 * State remains TRANS_ABORT until CleanupTransaction().
 	 */
@@ -5006,6 +5012,9 @@ AbortSubTransaction(void)
 		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
 	}
 
+	/* Reset the relation extension lock held count. */
+	ResetRelExtLockHeldCount();
+
 	/*
 	 * Restore the upper transaction's read-only state, too.  This should be
 	 * redundant with GUC's cleanup but we may as well do it for consistency
@@ -5062,6 +5071,12 @@ PushTransaction(void)
 	TransactionState s;
 
 	/*
+	 * Relation extension lock must not be held while starting a new
+	 * sub-transaction.
+	 */
+	Assert(!IsRelExtLockHeld());
+
+	/*
 	 * We keep subtransaction state nodes in TopTransactionContext.
 	 */
 	s = (TransactionState)
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 2010320..26760f8 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -408,6 +408,9 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode)
 								relation->rd_lockInfo.lockRelId.relId);
 
 	(void) LockAcquire(&tag, lockmode, false, false);
+
+	/* Increment the lock hold count. */
+	IncrementRelExtLockHeldCount();
 }
 
 /*
@@ -420,12 +423,21 @@ bool
 ConditionalLockRelationForExtension(Relation relation, LOCKMODE lockmode)
 {
 	LOCKTAG		tag;
+	LockAcquireResult result;
 
 	SET_LOCKTAG_RELATION_EXTEND(tag,
 								relation->rd_lockInfo.lockRelId.dbId,
 								relation->rd_lockInfo.lockRelId.relId);
+	result = LockAcquire(&tag, lockmode, false, true);
 
-	return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
+	/* Increment the lock hold count if we got the lock. */
+	if (result != LOCKACQUIRE_NOT_AVAIL)
+	{
+		IncrementRelExtLockHeldCount();
+		return true;
+	}
+
+	return false;
 }
 
 /*
@@ -458,6 +470,9 @@ UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
 								relation->rd_lockInfo.lockRelId.relId);
 
 	LockRelease(&tag, lockmode, false);
+
+	/* Decrement the lock hold count. */
+	DecrementRelExtLockHeldCount();
 }
 
 /*
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 56dba09..59c9ca3 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -170,6 +170,15 @@ typedef struct TwoPhaseLockRecord
  */
 static int	FastPathLocalUseCount = 0;
 
+/*
+ * Count of number of relation extension lock currently held by this backend.
+ * We need this counter so that we can ensure that while holding the relation
+ * extension lock we are not trying to acquire any other heavy weight lock.
+ * Basically, that will ensuring that the proc holding relation extension lock
+ * can not wait for any another lock.
+ */
+static int	RelationExtensionLockHeldCount = 0;
+
 /* Macros for manipulating proc->fpLockBits */
 #define FAST_PATH_BITS_PER_SLOT			3
 #define FAST_PATH_LOCKNUMBER_OFFSET		1
@@ -841,6 +850,15 @@ LockAcquireExtended(const LOCKTAG *locktag,
 	}
 
 	/*
+	 * We should not acquire any other lock if we are already holding the
+	 * relation extension lock.  Only exception is that if we are trying to
+	 * acquire the relation extension lock then we can hold the relation
+	 * extension on the same relation.
+	 */
+	Assert(!IsRelExtLockHeld() ||
+		   ((locktag->locktag_type == LOCKTAG_RELATION_EXTEND) && found));
+
+	/*
 	 * Prepare to emit a WAL record if acquisition of this lock needs to be
 	 * replayed in a standby server.
 	 *
@@ -4492,3 +4510,49 @@ LockWaiterCount(const LOCKTAG *locktag)
 
 	return waiters;
 }
+
+/*
+ * IsRelExtLockHeld
+ *
+ * Is relation extension lock is held by this backend.
+ */
+bool
+IsRelExtLockHeld()
+{
+	return RelationExtensionLockHeldCount > 0;
+}
+
+/*
+ * IncrementRelExtLockHeldCount
+ *
+ * Increment the relation extension lock held count.
+ */
+void
+IncrementRelExtLockHeldCount()
+{
+	RelationExtensionLockHeldCount++;
+}
+
+/*
+ * DecrementRelExtLockHeldCount
+ *
+ * Decrement the relation extension lock held count;
+ */
+void
+DecrementRelExtLockHeldCount()
+{
+	/* We must hold the relation extension lock. */
+	Assert(RelationExtensionLockHeldCount > 0);
+	RelationExtensionLockHeldCount--;
+}
+
+/*
+ * ResetRelExtLockHeldCount
+ *
+ * Reset the relation extension lock hold count;
+ */
+void
+ResetRelExtLockHeldCount()
+{
+	RelationExtensionLockHeldCount = 0;
+}
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index bb8e4e6..c31a5f3 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -582,6 +582,10 @@ extern void RememberSimpleDeadLock(PGPROC *proc1,
 extern void InitDeadLockChecking(void);
 
 extern int	LockWaiterCount(const LOCKTAG *locktag);
+extern bool IsRelExtLockHeld(void);
+extern void IncrementRelExtLockHeldCount(void);
+extern void DecrementRelExtLockHeldCount(void);
+extern void ResetRelExtLockHeldCount(void);
 
 #ifdef LOCK_DEBUG
 extern void DumpLocks(PGPROC *proc);
-- 
1.8.3.1

