commit ee3955a764b16318adeb9e793b8ccfa3294536b3
Author: root <root@p216n129.pbm.ihost.com>
Date:   Fri Jun 9 14:25:01 2017 -0400

    ProcArrayLock parts

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 8a71536..4e39b4d 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -407,17 +407,10 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid)
 		Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
 
 		/*
-		 * If we can immediately acquire ProcArrayLock, we clear our own XID
-		 * and release the lock.  If not, use group XID clearing to improve
-		 * efficiency.
+		 * LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE) will always
+		 * return not available -- so just go to  group XID clearing directly
 		 */
-		if (LWLockConditionalAcquire(ProcArrayLock, LW_EXCLUSIVE))
-		{
-			ProcArrayEndTransactionInternal(proc, pgxact, latestXid);
-			LWLockRelease(ProcArrayLock);
-		}
-		else
-			ProcArrayGroupClearXid(proc, latestXid);
+		ProcArrayGroupClearXid(proc, latestXid);
 	}
 	else
 	{
diff --git a/src/backend/storage/lmgr/generate-lwlocknames.pl b/src/backend/storage/lmgr/generate-lwlocknames.pl
index 10d0698..c7030a7 100644
--- a/src/backend/storage/lmgr/generate-lwlocknames.pl
+++ b/src/backend/storage/lmgr/generate-lwlocknames.pl
@@ -56,7 +56,13 @@ while (<$lwlocknames>)
 
 printf $c "\n};\n";
 print $h "\n";
+# changing $numProc to 2, 4 or 8 should just work for 2, 4, or 8 "ProcArrayLock" parts
+my $numProc = 2;
+my $numProcNamed = 8;
 printf $h "#define NUM_INDIVIDUAL_LWLOCKS		%s\n", $lastlockidx + 1;
+printf $h "#define NUMBER_PROC_LOCKS                    %s\n", $numProc;
+printf $h "#define NUMBER_PROC_LOCKS_MASK               %s\n", $numProc-1;
+printf $h "#define OLD_NUM_INDIVIDUAL_LWLOCKS           %s\n", $lastlockidx -$numProcNamed + 1;
 
 close $h;
 close $c;
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 35536e4..1b04ebd 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -1099,6 +1099,36 @@ LWLockDequeueSelf(LWLock *lock)
 #endif
 }
 
+ /*
+  * subroutine to acquire ProcArrayLock parts
+  */
+bool
+doProcArrayLock(LWLock *lock, LWLockMode mode)
+{
+	PGPROC	   *proc = MyProc;
+	LWLock	   *new_lock;
+	int			i;
+
+	if (mode == LW_SHARED)
+	{
+		new_lock = &MainLWLockArray[OLD_NUM_INDIVIDUAL_LWLOCKS + ((proc->pid) & NUMBER_PROC_LOCKS_MASK)].lock;
+		return LWLockAcquire(new_lock, mode);
+	}
+	else
+	{
+		/*
+		 * LW_EXCLUSIVE
+		 */
+		for (i = 0; i < NUMBER_PROC_LOCKS; i++)
+		{
+			new_lock = &MainLWLockArray[OLD_NUM_INDIVIDUAL_LWLOCKS + i].lock;
+			LWLockAcquire(new_lock, LW_EXCLUSIVE);
+		}
+		return false;
+	}
+}
+
+
 /*
  * LWLockAcquire - acquire a lightweight lock in the specified mode
  *
@@ -1113,6 +1143,7 @@ LWLockAcquire(LWLock *lock, LWLockMode mode)
 	PGPROC	   *proc = MyProc;
 	bool		result = true;
 	int			extraWaits = 0;
+	int			lp;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
 
@@ -1143,6 +1174,15 @@ LWLockAcquire(LWLock *lock, LWLockMode mode)
 		elog(ERROR, "too many LWLocks taken");
 
 	/*
+	 * If this request is for the ProcArrayLock, call subroutine to acquire
+	 * proper parts
+	 */
+	if (lock == ProcArrayLock)
+	{
+		return doProcArrayLock(lock, mode);
+	}
+
+	/*
 	 * Lock out cancel/die interrupts until we exit the code section protected
 	 * by the LWLock.  This ensures that interrupts will not interfere with
 	 * manipulations of data structures in shared memory.
@@ -1293,6 +1333,14 @@ LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
 		elog(ERROR, "too many LWLocks taken");
 
 	/*
+	 * if request for ProcArrayLock, return false -- not available
+	 */
+	if (lock == ProcArrayLock)
+	{
+		return false;
+	}
+
+	/*
 	 * Lock out cancel/die interrupts until we exit the code section protected
 	 * by the LWLock.  This ensures that interrupts will not interfere with
 	 * manipulations of data structures in shared memory.
@@ -1348,6 +1396,15 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 
 	Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
 
+	/*
+	 * do ProcArrayLock, but get the lock (always) and return true
+	 */
+	if (lock == ProcArrayLock)
+	{
+		LWLockAcquire(lock, mode);
+		return true;
+	}
+
 	PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
 
 	/* Ensure we will have room to remember the lock */
@@ -1533,6 +1590,11 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 	PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
 
 	/*
+	 * The ProcArrayLock should not be a parameter to this function
+	 */
+	Assert(lock != ProcArrayLock);
+
+	/*
 	 * Lock out cancel/die interrupts while we sleep on the lock.  There is no
 	 * cleanup mechanism to remove us from the wait queue if we got
 	 * interrupted.
@@ -1707,6 +1769,53 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 	}
 }
 
+/*
+ * Subroutine to release proper part(s) of ProcArrayLock
+ * for LW_SHARED, release the single ProcArrayLock part
+ * for LW_EXCLUSIVE, release all the parts
+ */
+void
+ProcArrayRelease(LWLock *lock)
+{
+	int			i;
+	LWLockMode	mode;
+	LWLock	   *newLock;
+
+	/*
+	 * Search from the end -- expecting to find the lock on the end of the
+	 * locks held list
+	 */
+	for (i = num_held_lwlocks; --i >= 0;)
+	{
+		/*
+		 * rely on the fact that the PracArrayLock parts are contiguous in the
+		 * locks list use ProcArray7 so that regardless of the
+		 * NUMBER_PROC_LOCKS, the test will be valid
+		 */
+		if ((held_lwlocks[i].lock >= ProcArray0)
+			&& (held_lwlocks[i].lock <= ProcArray7))
+		{
+			mode = held_lwlocks[i].mode;
+			break;
+		}
+	}
+	Assert(i >= 0);
+	if (mode == LW_SHARED)
+	{
+		LWLockRelease(held_lwlocks[i].lock);
+		return;
+	}
+
+	/*
+	 * for LW_EXCLUSIVE, release the parts in reverse order of acquire
+	 */
+	for (i = (NUMBER_PROC_LOCKS - 1); i >= 0; i--)
+	{
+		newLock = (&MainLWLockArray[OLD_NUM_INDIVIDUAL_LWLOCKS + i].lock);
+		LWLockRelease(newLock);
+	}
+	return;
+}
 
 /*
  * LWLockRelease - release a previously acquired lock
@@ -1720,6 +1829,15 @@ LWLockRelease(LWLock *lock)
 	int			i;
 
 	/*
+	 * intercept a release for the ProcArrayLock
+	 */
+	if (lock == ProcArrayLock)
+	{
+		ProcArrayRelease(lock);
+		return;
+	}
+
+	/*
 	 * Remove lock from list of locks held.  Usually, but not always, it will
 	 * be the latest-acquired lock; so search array backwards.
 	 */
@@ -1817,7 +1935,19 @@ LWLockReleaseAll(void)
 	{
 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
 
-		LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
+		/*
+		 * check for ProcArrayLock parts when any are found, call
+		 * LWLockRelease with the ProcArray_Lock
+		 */
+		if (((held_lwlocks[num_held_lwlocks - 1].lock) >= ProcArray0)
+			&& ((held_lwlocks[num_held_lwlocks - 1].lock) <= ProcArray7))
+		{
+			LWLockRelease(ProcArrayLock);
+		}
+		else
+		{
+			LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
+		}
 	}
 }
 
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6025ec..a581f09 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,12 @@ OldSnapshotTimeMapLock				42
 BackendRandomLock					43
 LogicalRepWorkerLock				44
 CLogTruncationLock					45
+# the ProcArray substitutes need to be in contiguous
+ProcArray0							46
+ProcArray1							47
+ProcArray2							48
+ProcArray3							49
+ProcArray4							50
+ProcArray5							51
+ProcArray6							52
+ProcArray7							53
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 0cd45bb..495777a 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -158,6 +158,8 @@ extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value);
 extern Size LWLockShmemSize(void);
 extern void CreateLWLocks(void);
 extern void InitLWLockAccess(void);
+extern bool doProcArrayLock(LWLock *lock, LWLockMode mode);
+extern void ProcArrayRelease(LWLock *lock);
 
 extern const char *GetLWLockIdentifier(uint32 classId, uint16 eventId);
 
