From 4823d95c86ee696b2df57526ffba93aea83054bf Mon Sep 17 00:00:00 2001
From: Dilip kumar <dilipkumar@dkmac.local>
Date: Sat, 9 Sep 2023 12:56:10 +0530
Subject: [PATCH v1 2/3] bank wise slru locks

The previous patch has divided SLRU buffer pool into associative
banks.  And this patch is further optimizing it by introducing
bank wise slru locks instead of a common centralized lock this
will reduce the contention on the slru control lock.

Dilip Kumar based on some design suggestions from Robert Haas
---
 src/backend/access/transam/clog.c        | 108 +++++++++-----
 src/backend/access/transam/commit_ts.c   |  43 +++---
 src/backend/access/transam/multixact.c   | 179 ++++++++++++++++-------
 src/backend/access/transam/slru.c        | 134 +++++++++++++----
 src/backend/access/transam/subtrans.c    |  27 ++--
 src/backend/commands/async.c             |  30 ++--
 src/backend/storage/lmgr/lwlock.c        |  14 ++
 src/backend/storage/lmgr/lwlocknames.txt |  14 +-
 src/backend/storage/lmgr/predicate.c     |  32 ++--
 src/include/access/slru.h                |  33 ++++-
 src/include/storage/lwlock.h             |   8 +
 src/test/modules/test_slru/test_slru.c   |  28 ++--
 12 files changed, 452 insertions(+), 198 deletions(-)

diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 29d58f1eb3..938806532d 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -276,14 +276,19 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
 						   XLogRecPtr lsn, int pageno,
 						   bool all_xact_same_page)
 {
+	LWLock	*lock;
+
 	/* Can't use group update when PGPROC overflows. */
 	StaticAssertDecl(THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS,
 					 "group clog threshold less than PGPROC cached subxids");
 
+	/* Get SLRU lock w.r.t. the SLRU page we are going to access. */
+	lock = SimpleLruPageGetSLRULock(XactCtl, pageno);
+
 	/*
-	 * When there is contention on XactSLRULock, we try to group multiple
+	 * When there is contention on SLRU lock, we try to group multiple
 	 * updates; a single leader process will perform transaction status
-	 * updates for multiple backends so that the number of times XactSLRULock
+	 * updates for multiple backends so that the number of times the SLRU lock
 	 * needs to be acquired is reduced.
 	 *
 	 * For this optimization to be safe, the XID and subxids in MyProc must be
@@ -302,17 +307,17 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
 				nsubxids * sizeof(TransactionId)) == 0))
 	{
 		/*
-		 * If we can immediately acquire XactSLRULock, we update the status of
+		 * If we can immediately acquire SLRULock, we update the status of
 		 * our own XID and release the lock.  If not, try use group XID
 		 * update.  If that doesn't work out, fall back to waiting for the
 		 * lock to perform an update for this transaction only.
 		 */
-		if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE))
+		if (LWLockConditionalAcquire(lock, LW_EXCLUSIVE))
 		{
 			/* Got the lock without waiting!  Do the update. */
 			TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
 											   lsn, pageno);
-			LWLockRelease(XactSLRULock);
+			LWLockRelease(lock);
 			return;
 		}
 		else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno))
@@ -325,10 +330,10 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
 	}
 
 	/* Group update not applicable, or couldn't accept this page number. */
-	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 	TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
 									   lsn, pageno);
-	LWLockRelease(XactSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -347,7 +352,8 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
 	Assert(status == TRANSACTION_STATUS_COMMITTED ||
 		   status == TRANSACTION_STATUS_ABORTED ||
 		   (status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid)));
-	Assert(LWLockHeldByMeInMode(XactSLRULock, LW_EXCLUSIVE));
+	Assert(LWLockHeldByMeInMode(SimpleLruPageGetSLRULock(XactCtl, pageno),
+								LW_EXCLUSIVE));
 
 	/*
 	 * If we're doing an async commit (ie, lsn is valid), then we must wait
@@ -398,14 +404,13 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
 }
 
 /*
- * When we cannot immediately acquire XactSLRULock in exclusive mode at
+ * When we cannot immediately acquire SLRU bank lock in exclusive mode at
  * commit time, add ourselves to a list of processes that need their XIDs
  * status update.  The first process to add itself to the list will acquire
- * XactSLRULock in exclusive mode and set transaction status as required
- * on behalf of all group members.  This avoids a great deal of contention
- * around XactSLRULock when many processes are trying to commit at once,
- * since the lock need not be repeatedly handed off from one committing
- * process to the next.
+ * SLRU lock in exclusive mode and set transaction status as required on behalf
+ * of all group members.  This avoids a great deal of contention around
+ * SLRULock when many processes are trying to commit at once, since the lock
+ * need not be repeatedly handed off from one committing process to the next.
  *
  * Returns true when transaction status has been updated in clog; returns
  * false if we decided against applying the optimization because the page
@@ -419,6 +424,8 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
 	PGPROC	   *proc = MyProc;
 	uint32		nextidx;
 	uint32		wakeidx;
+	int			prevpageno;
+	LWLock	   *prevlock = NULL;
 
 	/* We should definitely have an XID whose status needs to be updated. */
 	Assert(TransactionIdIsValid(xid));
@@ -499,11 +506,8 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
 		return true;
 	}
 
-	/* We are the leader.  Acquire the lock on behalf of everyone. */
-	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
-
 	/*
-	 * Now that we've got the lock, clear the list of processes waiting for
+	 * We are leader so clear the list of processes waiting for
 	 * group XID status update, saving a pointer to the head of the list.
 	 * Trying to pop elements one at a time could lead to an ABA problem.
 	 */
@@ -513,10 +517,38 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
 	/* Remember head of list so we can perform wakeups after dropping lock. */
 	wakeidx = nextidx;
 
+	/* Acquire the SLRU bank lock w.r.t. the first page in the group. */
+	prevpageno = ProcGlobal->allProcs[nextidx].clogGroupMemberPage;
+	prevlock = SimpleLruPageGetSLRULock(XactCtl, prevpageno);
+	LWLockAcquire(prevlock, LW_EXCLUSIVE);
+
 	/* Walk the list and update the status of all XIDs. */
 	while (nextidx != INVALID_PGPROCNO)
 	{
 		PGPROC	   *nextproc = &ProcGlobal->allProcs[nextidx];
+		int			thispageno = nextproc->clogGroupMemberPage;
+
+		/*
+		 * Although we are trying our best to keep same page in a group, there
+		 * are cases where we might get different pages as well for detail
+		 * refer comment in above while loop where we are adding this process
+		 * for group update.  So if the current page we are going to access is
+		 * not in the same slru bank in which we updated the last page then we
+		 * need to release the lock on the previous bank and acquire lock on
+		 * the bank w.r.t. the page we are going to update now.
+		 */
+		if (thispageno != prevpageno)
+		{
+			LWLock	*lock = SimpleLruPageGetSLRULock(XactCtl, thispageno);
+
+			if (prevlock != lock)
+			{
+				LWLockRelease(prevlock);
+				LWLockAcquire(lock, LW_EXCLUSIVE);
+			}
+			prevlock = lock;
+			prevpageno = thispageno;
+		}
 
 		/*
 		 * Transactions with more than THRESHOLD_SUBTRANS_CLOG_OPT sub-XIDs
@@ -536,7 +568,8 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
 	}
 
 	/* We're done with the lock now. */
-	LWLockRelease(XactSLRULock);
+	if (prevlock != NULL)
+		LWLockRelease(prevlock);
 
 	/*
 	 * Now that we've released the lock, go back and wake everybody up.  We
@@ -565,10 +598,11 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
 /*
  * Sets the commit status of a single transaction.
  *
- * Must be called with XactSLRULock held
+ * Must be called with slot specific SLRU bank's lock held
  */
 static void
-TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
+TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn,
+						  int slotno)
 {
 	int			byteno = TransactionIdToByte(xid);
 	int			bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
@@ -657,7 +691,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
 	lsnindex = GetLSNIndex(slotno, xid);
 	*lsn = XactCtl->shared->group_lsn[lsnindex];
 
-	LWLockRelease(XactSLRULock);
+	LWLockRelease(SimpleLruPageGetSLRULock(XactCtl, pageno));
 
 	return status;
 }
@@ -676,7 +710,7 @@ CLOGShmemInit(void)
 {
 	XactCtl->PagePrecedes = CLOGPagePrecedes;
 	SimpleLruInit(XactCtl, "Xact", NUM_CLOG_BUFFERS, CLOG_LSNS_PER_PAGE,
-				  XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER,
+				  "pg_xact", LWTRANCHE_XACT_BUFFER, LWTRANCHE_XACT_SLRU,
 				  SYNC_HANDLER_CLOG);
 	SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE);
 }
@@ -691,8 +725,9 @@ void
 BootStrapCLOG(void)
 {
 	int			slotno;
+	LWLock	   *lock = SimpleLruPageGetSLRULock(XactCtl, 0);
 
-	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/* Create and zero the first page of the commit log */
 	slotno = ZeroCLOGPage(0, false);
@@ -701,7 +736,7 @@ BootStrapCLOG(void)
 	SimpleLruWritePage(XactCtl, slotno);
 	Assert(!XactCtl->shared->page_dirty[slotno]);
 
-	LWLockRelease(XactSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -736,14 +771,10 @@ StartupCLOG(void)
 	TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
 	int			pageno = TransactionIdToPage(xid);
 
-	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
-
 	/*
 	 * Initialize our idea of the latest page number.
 	 */
-	XactCtl->shared->latest_page_number = pageno;
-
-	LWLockRelease(XactSLRULock);
+	pg_atomic_init_u32(&XactCtl->shared->latest_page_number, pageno);
 }
 
 /*
@@ -754,8 +785,9 @@ TrimCLOG(void)
 {
 	TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid);
 	int			pageno = TransactionIdToPage(xid);
+	LWLock	   *lock = SimpleLruPageGetSLRULock(XactCtl, pageno);
 
-	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/*
 	 * Zero out the remainder of the current clog page.  Under normal
@@ -787,7 +819,7 @@ TrimCLOG(void)
 		XactCtl->shared->page_dirty[slotno] = true;
 	}
 
-	LWLockRelease(XactSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -819,6 +851,7 @@ void
 ExtendCLOG(TransactionId newestXact)
 {
 	int			pageno;
+	LWLock	   *lock;
 
 	/*
 	 * No work except at first XID of a page.  But beware: just after
@@ -829,13 +862,14 @@ ExtendCLOG(TransactionId newestXact)
 		return;
 
 	pageno = TransactionIdToPage(newestXact);
+	lock = SimpleLruPageGetSLRULock(XactCtl, pageno);
 
-	LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/* Zero the page and make an XLOG entry about it */
 	ZeroCLOGPage(pageno, true);
 
-	LWLockRelease(XactSLRULock);
+	LWLockRelease(lock);
 }
 
 
@@ -973,16 +1007,18 @@ clog_redo(XLogReaderState *record)
 	{
 		int			pageno;
 		int			slotno;
+		LWLock	   *lock;
 
 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
 
-		LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
+		lock = SimpleLruPageGetSLRULock(XactCtl, pageno);
+		LWLockAcquire(lock, LW_EXCLUSIVE);
 
 		slotno = ZeroCLOGPage(pageno, false);
 		SimpleLruWritePage(XactCtl, slotno);
 		Assert(!XactCtl->shared->page_dirty[slotno]);
 
-		LWLockRelease(XactSLRULock);
+		LWLockRelease(lock);
 	}
 	else if (info == CLOG_TRUNCATE)
 	{
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index 54422f2780..0c7f5bae86 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -220,8 +220,9 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
 {
 	int			slotno;
 	int			i;
+	LWLock	   *lock = SimpleLruPageGetSLRULock(CommitTsCtl, pageno);
 
-	LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
 
@@ -231,13 +232,13 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
 
 	CommitTsCtl->shared->page_dirty[slotno] = true;
 
-	LWLockRelease(CommitTsSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
  * Sets the commit timestamp of a single transaction.
  *
- * Must be called with CommitTsSLRULock held
+ * Must be called with slot specific SLRU bank's Lock held
  */
 static void
 TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
@@ -338,7 +339,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
 	if (nodeid)
 		*nodeid = entry.nodeid;
 
-	LWLockRelease(CommitTsSLRULock);
+	LWLockRelease(SimpleLruPageGetSLRULock(CommitTsCtl, pageno));
 	return *ts != 0;
 }
 
@@ -510,9 +511,8 @@ CommitTsShmemInit(void)
 
 	CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
 	SimpleLruInit(CommitTsCtl, "CommitTs", NUM_COMMIT_TS_BUFFERS, 0,
-				  CommitTsSLRULock, "pg_commit_ts",
-				  LWTRANCHE_COMMITTS_BUFFER,
-				  SYNC_HANDLER_COMMIT_TS);
+				  "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
+				  LWTRANCHE_COMMITTS_SLRU, SYNC_HANDLER_COMMIT_TS);
 	SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
 
 	commitTsShared = ShmemInitStruct("CommitTs shared",
@@ -668,9 +668,7 @@ ActivateCommitTs(void)
 	/*
 	 * Re-Initialize our idea of the latest page number.
 	 */
-	LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
-	CommitTsCtl->shared->latest_page_number = pageno;
-	LWLockRelease(CommitTsSLRULock);
+	pg_atomic_write_u32(&CommitTsCtl->shared->latest_page_number, pageno);
 
 	/*
 	 * If CommitTs is enabled, but it wasn't in the previous server run, we
@@ -697,12 +695,13 @@ ActivateCommitTs(void)
 	if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
 	{
 		int			slotno;
+		LWLock	   *lock = SimpleLruPageGetSLRULock(CommitTsCtl, pageno);
 
-		LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+		LWLockAcquire(lock, LW_EXCLUSIVE);
 		slotno = ZeroCommitTsPage(pageno, false);
 		SimpleLruWritePage(CommitTsCtl, slotno);
 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
-		LWLockRelease(CommitTsSLRULock);
+		LWLockRelease(lock);
 	}
 
 	/* Change the activation status in shared memory. */
@@ -751,9 +750,9 @@ DeactivateCommitTs(void)
 	 * be overwritten anyway when we wrap around, but it seems better to be
 	 * tidy.)
 	 */
-	LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+	SimpleLruAcquireAllBankLock(CommitTsCtl, LW_EXCLUSIVE);
 	(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
-	LWLockRelease(CommitTsSLRULock);
+	SimpleLruReleaseAllBankLock(CommitTsCtl);
 }
 
 /*
@@ -785,6 +784,7 @@ void
 ExtendCommitTs(TransactionId newestXact)
 {
 	int			pageno;
+	LWLock	   *lock;
 
 	/*
 	 * Nothing to do if module not enabled.  Note we do an unlocked read of
@@ -805,12 +805,14 @@ ExtendCommitTs(TransactionId newestXact)
 
 	pageno = TransactionIdToCTsPage(newestXact);
 
-	LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+	lock = SimpleLruPageGetSLRULock(CommitTsCtl, pageno);
+
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/* Zero the page and make an XLOG entry about it */
 	ZeroCommitTsPage(pageno, !InRecovery);
 
-	LWLockRelease(CommitTsSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -964,16 +966,18 @@ commit_ts_redo(XLogReaderState *record)
 	{
 		int			pageno;
 		int			slotno;
+		LWLock	   *lock;
 
 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+		lock = SimpleLruPageGetSLRULock(CommitTsCtl, pageno);
 
-		LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
+		LWLockAcquire(lock, LW_EXCLUSIVE);
 
 		slotno = ZeroCommitTsPage(pageno, false);
 		SimpleLruWritePage(CommitTsCtl, slotno);
 		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
 
-		LWLockRelease(CommitTsSLRULock);
+		LWLockRelease(lock);
 	}
 	else if (info == COMMIT_TS_TRUNCATE)
 	{
@@ -985,7 +989,8 @@ commit_ts_redo(XLogReaderState *record)
 		 * During XLOG replay, latest_page_number isn't set up yet; insert a
 		 * suitable value to bypass the sanity test in SimpleLruTruncate.
 		 */
-		CommitTsCtl->shared->latest_page_number = trunc->pageno;
+		pg_atomic_write_u32(&CommitTsCtl->shared->latest_page_number,
+							trunc->pageno);
 
 		SimpleLruTruncate(CommitTsCtl, trunc->pageno);
 	}
diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c
index abb022e067..e63bd4cf71 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -192,10 +192,10 @@ static SlruCtlData MultiXactMemberCtlData;
 
 /*
  * MultiXact state shared across all backends.  All this state is protected
- * by MultiXactGenLock.  (We also use MultiXactOffsetSLRULock and
- * MultiXactMemberSLRULock to guard accesses to the two sets of SLRU
- * buffers.  For concurrency's sake, we avoid holding more than one of these
- * locks at a time.)
+ * by MultiXactGenLock.  (We also use SLRU bank's lock of MultiXactOffset and
+ * MultiXactMember to guard accesses to the two sets of SLRU buffers.  For
+ * concurrency's sake, we avoid holding more than one of these locks at a
+ * time.)
  */
 typedef struct MultiXactStateData
 {
@@ -870,12 +870,15 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
 	int			slotno;
 	MultiXactOffset *offptr;
 	int			i;
-
-	LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+	LWLock	   *lock;
+	LWLock	   *prevlock = NULL;
 
 	pageno = MultiXactIdToOffsetPage(multi);
 	entryno = MultiXactIdToOffsetEntry(multi);
 
+	lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
+
 	/*
 	 * Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction"
 	 * to complain about if there's any I/O error.  This is kinda bogus, but
@@ -891,10 +894,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
 
 	MultiXactOffsetCtl->shared->page_dirty[slotno] = true;
 
-	/* Exchange our lock */
-	LWLockRelease(MultiXactOffsetSLRULock);
-
-	LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
+	/* Release MultiXactOffset SLRU lock. */
+	LWLockRelease(lock);
 
 	prev_pageno = -1;
 
@@ -916,6 +917,20 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
 
 		if (pageno != prev_pageno)
 		{
+			/*
+			 * MultiXactMember SLRU page is changed so check if this new page
+			 * fall into the different SLRU bank then release the old bank's
+			 * lock and acquire lock on the new bank.
+			 */
+			lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno);
+			if (lock != prevlock)
+			{
+				if (prevlock != NULL)
+					LWLockRelease(prevlock);
+
+				LWLockAcquire(lock, LW_EXCLUSIVE);
+				prevlock = lock;
+			}
 			slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
 			prev_pageno = pageno;
 		}
@@ -936,7 +951,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
 		MultiXactMemberCtl->shared->page_dirty[slotno] = true;
 	}
 
-	LWLockRelease(MultiXactMemberSLRULock);
+	if (prevlock != NULL)
+		LWLockRelease(prevlock);
 }
 
 /*
@@ -1239,6 +1255,8 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
 	MultiXactId tmpMXact;
 	MultiXactOffset nextOffset;
 	MultiXactMember *ptr;
+	LWLock	   *lock;
+	LWLock	   *prevlock = NULL;
 
 	debug_elog3(DEBUG2, "GetMembers: asked for %u", multi);
 
@@ -1342,11 +1360,23 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
 	 * time on every multixact creation.
 	 */
 retry:
-	LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
-
 	pageno = MultiXactIdToOffsetPage(multi);
 	entryno = MultiXactIdToOffsetEntry(multi);
 
+	/*
+	 * If the page is on the different SLRU bank then release the lock on the
+	 * previous bank if we are already holding one and acquire the lock on the
+	 * new bank.
+	 */
+	lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno);
+	if (lock != prevlock)
+	{
+		if (prevlock != NULL)
+			LWLockRelease(prevlock);
+		LWLockAcquire(lock, LW_EXCLUSIVE);
+		prevlock = lock;
+	}
+
 	slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi);
 	offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
 	offptr += entryno;
@@ -1379,7 +1409,22 @@ retry:
 		entryno = MultiXactIdToOffsetEntry(tmpMXact);
 
 		if (pageno != prev_pageno)
+		{
+			/*
+			 * SLRU pageno is changed so check whether this page is falling in
+			 * the different slru bank than on which we are already holding the
+			 * lock and if so release the lock on the old bank and acquire that
+			 * on the new bank.
+			 */
+			lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno);
+			if (prevlock != lock)
+			{
+				LWLockRelease(prevlock);
+				LWLockAcquire(lock, LW_EXCLUSIVE);
+				prevlock = lock;
+			}
 			slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact);
+		}
 
 		offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
 		offptr += entryno;
@@ -1388,7 +1433,8 @@ retry:
 		if (nextMXOffset == 0)
 		{
 			/* Corner case 2: next multixact is still being filled in */
-			LWLockRelease(MultiXactOffsetSLRULock);
+			LWLockRelease(prevlock);
+			prevlock = NULL;
 			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 			goto retry;
@@ -1397,13 +1443,11 @@ retry:
 		length = nextMXOffset - offset;
 	}
 
-	LWLockRelease(MultiXactOffsetSLRULock);
+	LWLockRelease(prevlock);
+	prevlock = NULL;
 
 	ptr = (MultiXactMember *) palloc(length * sizeof(MultiXactMember));
 
-	/* Now get the members themselves. */
-	LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
-
 	truelength = 0;
 	prev_pageno = -1;
 	for (i = 0; i < length; i++, offset++)
@@ -1419,6 +1463,20 @@ retry:
 
 		if (pageno != prev_pageno)
 		{
+			/*
+			 * MultiXactMember SLRU page is changed so check if this new page
+			 * fall into the different SLRU bank then release the old bank's
+			 * lock and acquire lock on the new bank.
+			 */
+			lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno);
+			if (lock != prevlock)
+			{
+				if (prevlock)
+					LWLockRelease(prevlock);
+				LWLockAcquire(lock, LW_EXCLUSIVE);
+				prevlock = lock;
+			}
+
 			slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
 			prev_pageno = pageno;
 		}
@@ -1442,7 +1500,8 @@ retry:
 		truelength++;
 	}
 
-	LWLockRelease(MultiXactMemberSLRULock);
+	if (prevlock)
+		LWLockRelease(prevlock);
 
 	/* A multixid with zero members should not happen */
 	Assert(truelength > 0);
@@ -1852,15 +1911,14 @@ MultiXactShmemInit(void)
 
 	SimpleLruInit(MultiXactOffsetCtl,
 				  "MultiXactOffset", NUM_MULTIXACTOFFSET_BUFFERS, 0,
-				  MultiXactOffsetSLRULock, "pg_multixact/offsets",
-				  LWTRANCHE_MULTIXACTOFFSET_BUFFER,
+				  "pg_multixact/offsets", LWTRANCHE_MULTIXACTOFFSET_BUFFER,
+				  LWTRANCHE_MULTIXACTOFFSET_SLRU,
 				  SYNC_HANDLER_MULTIXACT_OFFSET);
 	SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE);
 	SimpleLruInit(MultiXactMemberCtl,
 				  "MultiXactMember", NUM_MULTIXACTMEMBER_BUFFERS, 0,
-				  MultiXactMemberSLRULock, "pg_multixact/members",
-				  LWTRANCHE_MULTIXACTMEMBER_BUFFER,
-				  SYNC_HANDLER_MULTIXACT_MEMBER);
+				  "pg_multixact/members", LWTRANCHE_MULTIXACTMEMBER_BUFFER,
+				  LWTRANCHE_MULTIXACTMEMBER_SLRU, SYNC_HANDLER_MULTIXACT_MEMBER);
 	/* doesn't call SimpleLruTruncate() or meet criteria for unit tests */
 
 	/* Initialize our shared state struct */
@@ -1894,8 +1952,10 @@ void
 BootStrapMultiXact(void)
 {
 	int			slotno;
+	LWLock	   *lock;
 
-	LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+	lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl , 0);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/* Create and zero the first page of the offsets log */
 	slotno = ZeroMultiXactOffsetPage(0, false);
@@ -1904,9 +1964,10 @@ BootStrapMultiXact(void)
 	SimpleLruWritePage(MultiXactOffsetCtl, slotno);
 	Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]);
 
-	LWLockRelease(MultiXactOffsetSLRULock);
+	LWLockRelease(lock);
 
-	LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
+	lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl , 0);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/* Create and zero the first page of the members log */
 	slotno = ZeroMultiXactMemberPage(0, false);
@@ -1915,7 +1976,7 @@ BootStrapMultiXact(void)
 	SimpleLruWritePage(MultiXactMemberCtl, slotno);
 	Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]);
 
-	LWLockRelease(MultiXactMemberSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -1975,10 +2036,12 @@ static void
 MaybeExtendOffsetSlru(void)
 {
 	int			pageno;
+	LWLock	   *lock;
 
 	pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact);
+	lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl ,pageno);
 
-	LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	if (!SimpleLruDoesPhysicalPageExist(MultiXactOffsetCtl, pageno))
 	{
@@ -1993,7 +2056,7 @@ MaybeExtendOffsetSlru(void)
 		SimpleLruWritePage(MultiXactOffsetCtl, slotno);
 	}
 
-	LWLockRelease(MultiXactOffsetSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -2015,13 +2078,15 @@ StartupMultiXact(void)
 	 * Initialize offset's idea of the latest page number.
 	 */
 	pageno = MultiXactIdToOffsetPage(multi);
-	MultiXactOffsetCtl->shared->latest_page_number = pageno;
+	pg_atomic_init_u32(&MultiXactOffsetCtl->shared->latest_page_number,
+					   pageno);
 
 	/*
 	 * Initialize member's idea of the latest page number.
 	 */
 	pageno = MXOffsetToMemberPage(offset);
-	MultiXactMemberCtl->shared->latest_page_number = pageno;
+	pg_atomic_init_u32(&MultiXactMemberCtl->shared->latest_page_number,
+					   pageno);
 }
 
 /*
@@ -2037,7 +2102,6 @@ TrimMultiXact(void)
 	int			pageno;
 	int			entryno;
 	int			flagsoff;
-
 	LWLockAcquire(MultiXactGenLock, LW_SHARED);
 	nextMXact = MultiXactState->nextMXact;
 	offset = MultiXactState->nextOffset;
@@ -2046,13 +2110,13 @@ TrimMultiXact(void)
 	LWLockRelease(MultiXactGenLock);
 
 	/* Clean up offsets state */
-	LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
 
 	/*
 	 * (Re-)Initialize our idea of the latest page number for offsets.
 	 */
 	pageno = MultiXactIdToOffsetPage(nextMXact);
-	MultiXactOffsetCtl->shared->latest_page_number = pageno;
+	pg_atomic_write_u32(&MultiXactOffsetCtl->shared->latest_page_number,
+						pageno);
 
 	/*
 	 * Zero out the remainder of the current offsets page.  See notes in
@@ -2067,7 +2131,9 @@ TrimMultiXact(void)
 	{
 		int			slotno;
 		MultiXactOffset *offptr;
+		LWLock   *lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno);
 
+		LWLockAcquire(lock, LW_EXCLUSIVE);
 		slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact);
 		offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
 		offptr += entryno;
@@ -2075,18 +2141,17 @@ TrimMultiXact(void)
 		MemSet(offptr, 0, BLCKSZ - (entryno * sizeof(MultiXactOffset)));
 
 		MultiXactOffsetCtl->shared->page_dirty[slotno] = true;
+		LWLockRelease(lock);
 	}
 
-	LWLockRelease(MultiXactOffsetSLRULock);
-
 	/* And the same for members */
-	LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
 
 	/*
 	 * (Re-)Initialize our idea of the latest page number for members.
 	 */
 	pageno = MXOffsetToMemberPage(offset);
-	MultiXactMemberCtl->shared->latest_page_number = pageno;
+	pg_atomic_write_u32(&MultiXactMemberCtl->shared->latest_page_number,
+						pageno);
 
 	/*
 	 * Zero out the remainder of the current members page.  See notes in
@@ -2098,7 +2163,9 @@ TrimMultiXact(void)
 		int			slotno;
 		TransactionId *xidptr;
 		int			memberoff;
+		LWLock	   *lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno);
 
+		LWLockAcquire(lock, LW_EXCLUSIVE);
 		memberoff = MXOffsetToMemberOffset(offset);
 		slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset);
 		xidptr = (TransactionId *)
@@ -2113,10 +2180,9 @@ TrimMultiXact(void)
 		 */
 
 		MultiXactMemberCtl->shared->page_dirty[slotno] = true;
+		LWLockRelease(lock);
 	}
 
-	LWLockRelease(MultiXactMemberSLRULock);
-
 	/* signal that we're officially up */
 	LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
 	MultiXactState->finishedStartup = true;
@@ -2404,6 +2470,7 @@ static void
 ExtendMultiXactOffset(MultiXactId multi)
 {
 	int			pageno;
+	LWLock	   *lock;
 
 	/*
 	 * No work except at first MultiXactId of a page.  But beware: just after
@@ -2414,13 +2481,14 @@ ExtendMultiXactOffset(MultiXactId multi)
 		return;
 
 	pageno = MultiXactIdToOffsetPage(multi);
+	lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno);
 
-	LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/* Zero the page and make an XLOG entry about it */
 	ZeroMultiXactOffsetPage(pageno, true);
 
-	LWLockRelease(MultiXactOffsetSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -2453,15 +2521,17 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers)
 		if (flagsoff == 0 && flagsbit == 0)
 		{
 			int			pageno;
+			LWLock	   *lock;
 
 			pageno = MXOffsetToMemberPage(offset);
+			lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno);
 
-			LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
+			LWLockAcquire(lock, LW_EXCLUSIVE);
 
 			/* Zero the page and make an XLOG entry about it */
 			ZeroMultiXactMemberPage(pageno, true);
 
-			LWLockRelease(MultiXactMemberSLRULock);
+			LWLockRelease(lock);
 		}
 
 		/*
@@ -2759,7 +2829,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result)
 	offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
 	offptr += entryno;
 	offset = *offptr;
-	LWLockRelease(MultiXactOffsetSLRULock);
+	LWLockRelease(SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno));
 
 	*result = offset;
 	return true;
@@ -3241,31 +3311,33 @@ multixact_redo(XLogReaderState *record)
 	{
 		int			pageno;
 		int			slotno;
+		LWLock	   *lock;
 
 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
-
-		LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
+		lock = SimpleLruPageGetSLRULock(MultiXactOffsetCtl, pageno);
+		LWLockAcquire(lock, LW_EXCLUSIVE);
 
 		slotno = ZeroMultiXactOffsetPage(pageno, false);
 		SimpleLruWritePage(MultiXactOffsetCtl, slotno);
 		Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]);
 
-		LWLockRelease(MultiXactOffsetSLRULock);
+		LWLockRelease(lock);
 	}
 	else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE)
 	{
 		int			pageno;
 		int			slotno;
+		LWLock	   *lock;
 
 		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
-
-		LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
+		lock = SimpleLruPageGetSLRULock(MultiXactMemberCtl, pageno);
+		LWLockAcquire(lock, LW_EXCLUSIVE);
 
 		slotno = ZeroMultiXactMemberPage(pageno, false);
 		SimpleLruWritePage(MultiXactMemberCtl, slotno);
 		Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]);
 
-		LWLockRelease(MultiXactMemberSLRULock);
+		LWLockRelease(lock);
 	}
 	else if (info == XLOG_MULTIXACT_CREATE_ID)
 	{
@@ -3331,7 +3403,8 @@ multixact_redo(XLogReaderState *record)
 		 * SimpleLruTruncate.
 		 */
 		pageno = MultiXactIdToOffsetPage(xlrec.endTruncOff);
-		MultiXactOffsetCtl->shared->latest_page_number = pageno;
+		pg_atomic_write_u32(&MultiXactOffsetCtl->shared->latest_page_number,
+							pageno);
 		PerformOffsetsTruncation(xlrec.startTruncOff, xlrec.endTruncOff);
 
 		LWLockRelease(MultiXactTruncationLock);
diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c
index 57889b72bd..c06e4eddd1 100644
--- a/src/backend/access/transam/slru.c
+++ b/src/backend/access/transam/slru.c
@@ -187,9 +187,9 @@ Size
 SimpleLruShmemSize(int nslots, int nlsns)
 {
 	Size		sz;
-	int			bankmask_ignore;
+	int			bankmask;
 
-	SlruAdjustNSlots(&nslots, &bankmask_ignore);
+	SlruAdjustNSlots(&nslots, &bankmask);
 
 	/* we assume nslots isn't so large as to risk overflow */
 	sz = MAXALIGN(sizeof(SlruSharedData));
@@ -199,6 +199,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
 	sz += MAXALIGN(nslots * sizeof(int));	/* page_number[] */
 	sz += MAXALIGN(nslots * sizeof(int));	/* page_lru_count[] */
 	sz += MAXALIGN(nslots * sizeof(LWLockPadded));	/* buffer_locks[] */
+	sz += MAXALIGN((bankmask + 1) * sizeof(LWLockPadded));	/* bank_locks[] */
 
 	if (nlsns > 0)
 		sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));	/* group_lsn[] */
@@ -206,6 +207,32 @@ SimpleLruShmemSize(int nslots, int nlsns)
 	return BUFFERALIGN(sz) + BLCKSZ * nslots;
 }
 
+/*
+ * Function to acquire all bank's lock of the given SlruCtl
+ */
+void
+SimpleLruAcquireAllBankLock(SlruCtl ctl, LWLockMode mode)
+{
+	SlruShared	shared = ctl->shared;
+	int			bankno;
+
+	for (bankno = 0; bankno <= ctl->bank_mask; bankno++)
+		LWLockAcquire(&shared->bank_locks[bankno].lock, mode);
+}
+
+/*
+ * Function to release all bank's lock of the given SlruCtl
+ */
+void
+SimpleLruReleaseAllBankLock(SlruCtl ctl)
+{
+	SlruShared	shared = ctl->shared;
+	int			bankno;
+
+	for (bankno = 0; bankno <= ctl->bank_mask; bankno++)
+		LWLockRelease(&shared->bank_locks[bankno].lock);
+}
+
 /*
  * Initialize, or attach to, a simple LRU cache in shared memory.
  *
@@ -220,7 +247,7 @@ SimpleLruShmemSize(int nslots, int nlsns)
  */
 void
 SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
-			  LWLock *ctllock, const char *subdir, int tranche_id,
+			  const char *subdir, int tranche_id, int slru_tranche_id,
 			  SyncRequestHandler sync_handler)
 {
 	SlruShared	shared;
@@ -239,13 +266,13 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
 		char	   *ptr;
 		Size		offset;
 		int			slotno;
+		int			nbanks = bankmask + 1;
+		int			bankno;
 
 		Assert(!found);
 
 		memset(shared, 0, sizeof(SlruSharedData));
 
-		shared->ControlLock = ctllock;
-
 		shared->num_slots = nslots;
 		shared->lsn_groups_per_page = nlsns;
 
@@ -271,6 +298,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
 		/* Initialize LWLocks */
 		shared->buffer_locks = (LWLockPadded *) (ptr + offset);
 		offset += MAXALIGN(nslots * sizeof(LWLockPadded));
+		shared->bank_locks = (LWLockPadded *) (ptr + offset);
+		offset += MAXALIGN(nbanks * sizeof(LWLockPadded));
 
 		if (nlsns > 0)
 		{
@@ -290,6 +319,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
 			shared->page_lru_count[slotno] = 0;
 			ptr += BLCKSZ;
 		}
+		/* initialize bank locks for each buffer bank */
+		for (bankno = 0; bankno < nbanks; bankno++)
+			LWLockInitialize(&shared->bank_locks[bankno].lock,
+							 slru_tranche_id);
 
 		/* Should fit to estimated shmem size */
 		Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns));
@@ -344,7 +377,7 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno)
 	SimpleLruZeroLSNs(ctl, slotno);
 
 	/* Assume this page is now the latest active page */
-	shared->latest_page_number = pageno;
+	pg_atomic_write_u32(&shared->latest_page_number, pageno);
 
 	/* update the stats counter of zeroed pages */
 	pgstat_count_slru_page_zeroed(shared->slru_stats_idx);
@@ -383,12 +416,13 @@ static void
 SimpleLruWaitIO(SlruCtl ctl, int slotno)
 {
 	SlruShared	shared = ctl->shared;
+	int			bankno = slotno / SLRU_BANK_SIZE;
 
 	/* See notes at top of file */
-	LWLockRelease(shared->ControlLock);
+	LWLockRelease(&shared->bank_locks[bankno].lock);
 	LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
 	LWLockRelease(&shared->buffer_locks[slotno].lock);
-	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+	LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
 
 	/*
 	 * If the slot is still in an io-in-progress state, then either someone
@@ -443,6 +477,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
 	for (;;)
 	{
 		int			slotno;
+		int			bankno;
 		bool		ok;
 
 		/* See if page already is in memory; if not, pick victim slot */
@@ -485,9 +520,10 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
 
 		/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
 		LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
+		bankno = slotno / SLRU_BANK_SIZE;
 
 		/* Release control lock while doing I/O */
-		LWLockRelease(shared->ControlLock);
+		LWLockRelease(&shared->bank_locks[bankno].lock);
 
 		/* Do the read */
 		ok = SlruPhysicalReadPage(ctl, pageno, slotno);
@@ -496,7 +532,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
 		SimpleLruZeroLSNs(ctl, slotno);
 
 		/* Re-acquire control lock and update page state */
-		LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+		LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
 
 		Assert(shared->page_number[slotno] == pageno &&
 			   shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
@@ -538,11 +574,12 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
 {
 	SlruShared	shared = ctl->shared;
 	int			slotno;
-	int			bankstart = (pageno & ctl->bank_mask) * SLRU_BANK_SIZE;
+	int			bankno = pageno & ctl->bank_mask;
+	int			bankstart = bankno * SLRU_BANK_SIZE;
 	int			bankend = bankstart + SLRU_BANK_SIZE;
 
 	/* Try to find the page while holding only shared lock */
-	LWLockAcquire(shared->ControlLock, LW_SHARED);
+	LWLockAcquire(&shared->bank_locks[bankno].lock, LW_SHARED);
 
 	/* See if page is already in a buffer */
 	for (slotno = bankstart; slotno < bankend; slotno++)
@@ -562,8 +599,8 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid)
 	}
 
 	/* No luck, so switch to normal exclusive lock and do regular read */
-	LWLockRelease(shared->ControlLock);
-	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+	LWLockRelease(&shared->bank_locks[bankno].lock);
+	LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
 
 	return SimpleLruReadPage(ctl, pageno, true, xid);
 }
@@ -585,6 +622,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
 	SlruShared	shared = ctl->shared;
 	int			pageno = shared->page_number[slotno];
 	bool		ok;
+	int			bankno = slotno / SLRU_BANK_SIZE;
 
 	/* If a write is in progress, wait for it to finish */
 	while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
@@ -613,7 +651,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
 	LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
 
 	/* Release control lock while doing I/O */
-	LWLockRelease(shared->ControlLock);
+	LWLockRelease(&shared->bank_locks[bankno].lock);
 
 	/* Do the write */
 	ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
@@ -628,7 +666,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
 	}
 
 	/* Re-acquire control lock and update page state */
-	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+	LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
 
 	Assert(shared->page_number[slotno] == pageno &&
 		   shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
@@ -1133,7 +1171,7 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno)
 				this_delta = 0;
 			}
 			this_page_number = shared->page_number[slotno];
-			if (this_page_number == shared->latest_page_number)
+			if (this_page_number == pg_atomic_read_u32(&shared->latest_page_number))
 				continue;
 			if (shared->page_status[slotno] == SLRU_PAGE_VALID)
 			{
@@ -1207,6 +1245,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
 	int			slotno;
 	int			pageno = 0;
 	int			i;
+	int			lastbankno = 0;
 	bool		ok;
 
 	/* update the stats counter of flushes */
@@ -1217,10 +1256,19 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
 	 */
 	fdata.num_files = 0;
 
-	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+	LWLockAcquire(&shared->bank_locks[0].lock, LW_EXCLUSIVE);
 
 	for (slotno = 0; slotno < shared->num_slots; slotno++)
 	{
+		int			curbankno = slotno / SLRU_BANK_SIZE;
+
+		if (curbankno != lastbankno)
+		{
+			LWLockRelease(&shared->bank_locks[lastbankno].lock);
+			LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE);
+			lastbankno = curbankno;
+		}
+
 		SlruInternalWritePage(ctl, slotno, &fdata);
 
 		/*
@@ -1234,7 +1282,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
 				!shared->page_dirty[slotno]));
 	}
 
-	LWLockRelease(shared->ControlLock);
+	LWLockRelease(&shared->bank_locks[lastbankno].lock);
 
 	/*
 	 * Now close any files that were open
@@ -1274,6 +1322,7 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
 {
 	SlruShared	shared = ctl->shared;
 	int			slotno;
+	int			prevbankno;
 
 	/* update the stats counter of truncates */
 	pgstat_count_slru_truncate(shared->slru_stats_idx);
@@ -1284,25 +1333,38 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage)
 	 * or just after a checkpoint, any dirty pages should have been flushed
 	 * already ... we're just being extra careful here.)
 	 */
-	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
-
 restart:
 
 	/*
 	 * While we are holding the lock, make an important safety check: the
 	 * current endpoint page must not be eligible for removal.
 	 */
-	if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage))
+	if (ctl->PagePrecedes(pg_atomic_read_u32(&shared->latest_page_number),
+						  cutoffPage))
 	{
-		LWLockRelease(shared->ControlLock);
 		ereport(LOG,
 				(errmsg("could not truncate directory \"%s\": apparent wraparound",
 						ctl->Dir)));
 		return;
 	}
 
+	prevbankno = 0;
+	LWLockAcquire(&shared->bank_locks[prevbankno].lock, LW_EXCLUSIVE);
 	for (slotno = 0; slotno < shared->num_slots; slotno++)
 	{
+		int			curbankno = slotno / SLRU_BANK_SIZE;
+
+		/*
+		 * If the curbankno is not same as prevbankno then release the lock on
+		 * the prevbankno and acquire the lock on the curbankno.
+		 */
+		if (curbankno != prevbankno)
+		{
+			LWLockRelease(&shared->bank_locks[prevbankno].lock);
+			LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE);
+			prevbankno = curbankno;
+		}
+
 		if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
 			continue;
 		if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
@@ -1332,10 +1394,12 @@ restart:
 			SlruInternalWritePage(ctl, slotno, NULL);
 		else
 			SimpleLruWaitIO(ctl, slotno);
+
+		LWLockRelease(&shared->bank_locks[prevbankno].lock);
 		goto restart;
 	}
 
-	LWLockRelease(shared->ControlLock);
+	LWLockRelease(&shared->bank_locks[prevbankno].lock);
 
 	/* Now we can remove the old segment(s) */
 	(void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
@@ -1376,15 +1440,31 @@ SlruDeleteSegment(SlruCtl ctl, int segno)
 	SlruShared	shared = ctl->shared;
 	int			slotno;
 	bool		did_write;
+	int			prevbankno = 0;
 
 	/* Clean out any possibly existing references to the segment. */
-	LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
+	LWLockAcquire(&shared->bank_locks[prevbankno].lock, LW_EXCLUSIVE);
 restart:
 	did_write = false;
 	for (slotno = 0; slotno < shared->num_slots; slotno++)
 	{
-		int			pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
+		int			pagesegno;
+		int			curbankno;
+
+		curbankno = slotno / SLRU_BANK_SIZE;
+
+		/*
+		 * If the curbankno is not same as prevbankno then release the lock on
+		 * the prevbankno and acquire the lock on the curbankno.
+		 */
+		if (curbankno != prevbankno)
+		{
+			LWLockRelease(&shared->bank_locks[prevbankno].lock);
+			LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE);
+			prevbankno = curbankno;
+		}
 
+		pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
 		if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
 			continue;
 
@@ -1418,7 +1498,7 @@ restart:
 
 	SlruInternalDeleteSegment(ctl, segno);
 
-	LWLockRelease(shared->ControlLock);
+	LWLockRelease(&shared->bank_locks[prevbankno].lock);
 }
 
 /*
diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c
index 125273e235..2b0afa8a15 100644
--- a/src/backend/access/transam/subtrans.c
+++ b/src/backend/access/transam/subtrans.c
@@ -77,12 +77,14 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
 	int			pageno = TransactionIdToPage(xid);
 	int			entryno = TransactionIdToEntry(xid);
 	int			slotno;
+	LWLock	   *lock;
 	TransactionId *ptr;
 
 	Assert(TransactionIdIsValid(parent));
 	Assert(TransactionIdFollows(xid, parent));
 
-	LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
+	lock = SimpleLruPageGetSLRULock(SubTransCtl, pageno);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid);
 	ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
@@ -100,7 +102,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
 		SubTransCtl->shared->page_dirty[slotno] = true;
 	}
 
-	LWLockRelease(SubtransSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -130,7 +132,7 @@ SubTransGetParent(TransactionId xid)
 
 	parent = *ptr;
 
-	LWLockRelease(SubtransSLRULock);
+	LWLockRelease(SimpleLruPageGetSLRULock(SubTransCtl, pageno));
 
 	return parent;
 }
@@ -193,8 +195,8 @@ SUBTRANSShmemInit(void)
 {
 	SubTransCtl->PagePrecedes = SubTransPagePrecedes;
 	SimpleLruInit(SubTransCtl, "Subtrans", NUM_SUBTRANS_BUFFERS, 0,
-				  SubtransSLRULock, "pg_subtrans",
-				  LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE);
+				  "pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER,
+				  LWTRANCHE_SUBTRANS_SLRU, SYNC_HANDLER_NONE);
 	SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE);
 }
 
@@ -212,8 +214,9 @@ void
 BootStrapSUBTRANS(void)
 {
 	int			slotno;
+	LWLock	   *lock = SimpleLruPageGetSLRULock(SubTransCtl, 0);
 
-	LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/* Create and zero the first page of the subtrans log */
 	slotno = ZeroSUBTRANSPage(0);
@@ -222,7 +225,7 @@ BootStrapSUBTRANS(void)
 	SimpleLruWritePage(SubTransCtl, slotno);
 	Assert(!SubTransCtl->shared->page_dirty[slotno]);
 
-	LWLockRelease(SubtransSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -259,7 +262,7 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
 	 * Whenever we advance into a new page, ExtendSUBTRANS will likewise zero
 	 * the new page without regard to whatever was previously on disk.
 	 */
-	LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
+	SimpleLruAcquireAllBankLock(SubTransCtl, LW_EXCLUSIVE);
 
 	startPage = TransactionIdToPage(oldestActiveXID);
 	nextXid = ShmemVariableCache->nextXid;
@@ -275,7 +278,7 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
 	}
 	(void) ZeroSUBTRANSPage(startPage);
 
-	LWLockRelease(SubtransSLRULock);
+	SimpleLruReleaseAllBankLock(SubTransCtl);
 }
 
 /*
@@ -309,6 +312,7 @@ void
 ExtendSUBTRANS(TransactionId newestXact)
 {
 	int			pageno;
+	LWLock	   *lock;
 
 	/*
 	 * No work except at first XID of a page.  But beware: just after
@@ -320,12 +324,13 @@ ExtendSUBTRANS(TransactionId newestXact)
 
 	pageno = TransactionIdToPage(newestXact);
 
-	LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
+	lock = SimpleLruPageGetSLRULock(SubTransCtl, pageno);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/* Zero the page */
 	ZeroSUBTRANSPage(pageno);
 
-	LWLockRelease(SubtransSLRULock);
+	LWLockRelease(lock);
 }
 
 
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index d148d10850..7088fe15ea 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -267,9 +267,10 @@ typedef struct QueueBackendStatus
  * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
  * can change the tail pointers.
  *
- * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
+ * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
+ * the control lock for the pg_notify SLRU buffers.
  * In order to avoid deadlocks, whenever we need multiple locks, we first get
- * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock.
+ * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
  *
  * Each backend uses the backend[] array entry with index equal to its
  * BackendId (which can range from 1 to MaxBackends).  We rely on this to make
@@ -570,8 +571,8 @@ AsyncShmemInit(void)
 	 */
 	NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
 	SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
-				  NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
-				  SYNC_HANDLER_NONE);
+				  "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
+				  LWTRANCHE_NOTIFY_SLRU, SYNC_HANDLER_NONE);
 
 	if (!found)
 	{
@@ -1402,7 +1403,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
  * Eventually we will return NULL indicating all is done.
  *
  * We are holding NotifyQueueLock already from the caller and grab
- * NotifySLRULock locally in this function.
+ * page specific SLRULock locally in this function.
  */
 static ListCell *
 asyncQueueAddEntries(ListCell *nextNotify)
@@ -1412,9 +1413,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
 	int			pageno;
 	int			offset;
 	int			slotno;
-
-	/* We hold both NotifyQueueLock and NotifySLRULock during this operation */
-	LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
+	LWLock	   *lock;
 
 	/*
 	 * We work with a local copy of QUEUE_HEAD, which we write back to shared
@@ -1438,6 +1437,11 @@ asyncQueueAddEntries(ListCell *nextNotify)
 	 * wrapped around, but re-zeroing the page is harmless in that case.)
 	 */
 	pageno = QUEUE_POS_PAGE(queue_head);
+	lock = SimpleLruPageGetSLRULock(NotifyCtl, pageno);
+
+	/* We hold both NotifyQueueLock and SLRU bank lock during this operation */
+	LWLockAcquire(lock, LW_EXCLUSIVE);
+
 	if (QUEUE_POS_IS_ZERO(queue_head))
 		slotno = SimpleLruZeroPage(NotifyCtl, pageno);
 	else
@@ -1509,7 +1513,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
 	/* Success, so update the global QUEUE_HEAD */
 	QUEUE_HEAD = queue_head;
 
-	LWLockRelease(NotifySLRULock);
+	LWLockRelease(lock);
 
 	return nextNotify;
 }
@@ -1988,7 +1992,7 @@ asyncQueueReadAllNotifications(void)
 
 			/*
 			 * We copy the data from SLRU into a local buffer, so as to avoid
-			 * holding the NotifySLRULock while we are examining the entries
+			 * holding the SLRULock while we are examining the entries
 			 * and possibly transmitting them to our frontend.  Copy only the
 			 * part of the page we will actually inspect.
 			 */
@@ -2010,7 +2014,7 @@ asyncQueueReadAllNotifications(void)
 				   NotifyCtl->shared->page_buffer[slotno] + curoffset,
 				   copysize);
 			/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
-			LWLockRelease(NotifySLRULock);
+			LWLockRelease(SimpleLruPageGetSLRULock(NotifyCtl, curpage));
 
 			/*
 			 * Process messages up to the stop position, end of page, or an
@@ -2051,7 +2055,7 @@ asyncQueueReadAllNotifications(void)
  *
  * The current page must have been fetched into page_buffer from shared
  * memory.  (We could access the page right in shared memory, but that
- * would imply holding the NotifySLRULock throughout this routine.)
+ * would imply holding the SLRU bank lock throughout this routine.)
  *
  * We stop if we reach the "stop" position, or reach a notification from an
  * uncommitted transaction, or reach the end of the page.
@@ -2204,7 +2208,7 @@ asyncQueueAdvanceTail(void)
 	if (asyncQueuePagePrecedes(oldtailpage, boundary))
 	{
 		/*
-		 * SimpleLruTruncate() will ask for NotifySLRULock but will also
+		 * SimpleLruTruncate() will ask for SLRU bank locks but will also
 		 * release the lock again.
 		 */
 		SimpleLruTruncate(NotifyCtl, newtailpage);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 315a78cda9..1261af0548 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -190,6 +190,20 @@ static const char *const BuiltinTrancheNames[] = {
 	"LogicalRepLauncherDSA",
 	/* LWTRANCHE_LAUNCHER_HASH: */
 	"LogicalRepLauncherHash",
+	/* LWTRANCHE_XACT_SLRU: */
+	"XactSLRU",
+	/* LWTRANCHE_COMMITTS_SLRU: */
+	"CommitTSSLRU",
+	/* LWTRANCHE_SUBTRANS_SLRU: */
+	"SubtransSLRU",
+	/* LWTRANCHE_MULTIXACTOFFSET_SLRU: */
+	"MultixactOffsetSLRU",
+	/* LWTRANCHE_MULTIXACTMEMBER_SLRU: */
+	"MultixactMemberSLRU",
+	/* LWTRANCHE_NOTIFY_SLRU: */
+	"NotifySLRU",
+	/* LWTRANCHE_SERIAL_SLRU: */
+	"SerialSLRU"
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index f72f2906ce..9e66ecd1ed 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -16,11 +16,11 @@ WALBufMappingLock					7
 WALWriteLock						8
 ControlFileLock						9
 # 10 was CheckpointLock
-XactSLRULock						11
-SubtransSLRULock					12
+# 11 was XactSLRULock
+# 12 was SubtransSLRULock
 MultiXactGenLock					13
-MultiXactOffsetSLRULock				14
-MultiXactMemberSLRULock				15
+# 14 was MultiXactOffsetSLRULock
+# 15 was MultiXactMemberSLRULock
 RelCacheInitLock					16
 CheckpointerCommLock				17
 TwoPhaseStateLock					18
@@ -31,19 +31,19 @@ AutovacuumLock						22
 AutovacuumScheduleLock				23
 SyncScanLock						24
 RelationMappingLock					25
-NotifySLRULock						26
+#26 was NotifySLRULock
 NotifyQueueLock						27
 SerializableXactHashLock			28
 SerializableFinishedListLock		29
 SerializablePredicateListLock		30
-SerialSLRULock						31
+SerialControlLock					31
 SyncRepLock							32
 BackgroundWorkerLock				33
 DynamicSharedMemoryControlLock		34
 AutoFileLock						35
 ReplicationSlotAllocationLock		36
 ReplicationSlotControlLock			37
-CommitTsSLRULock					38
+#38 was CommitTsSLRULock
 CommitTsLock						39
 ReplicationOriginLock				40
 MultiXactTruncationLock				41
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 1af41213b4..fe00148956 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -808,8 +808,8 @@ SerialInit(void)
 	 */
 	SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically;
 	SimpleLruInit(SerialSlruCtl, "Serial",
-				  NUM_SERIAL_BUFFERS, 0, SerialSLRULock, "pg_serial",
-				  LWTRANCHE_SERIAL_BUFFER, SYNC_HANDLER_NONE);
+				  NUM_SERIAL_BUFFERS, 0, "pg_serial", LWTRANCHE_SERIAL_BUFFER,
+				  LWTRANCHE_SERIAL_SLRU, SYNC_HANDLER_NONE);
 #ifdef USE_ASSERT_CHECKING
 	SerialPagePrecedesLogicallyUnitTests();
 #endif
@@ -846,12 +846,14 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
 	int			slotno;
 	int			firstZeroPage;
 	bool		isNewPage;
+	LWLock	   *lock;
 
 	Assert(TransactionIdIsValid(xid));
 
 	targetPage = SerialPage(xid);
+	lock = SimpleLruPageGetSLRULock(SerialSlruCtl, targetPage);
 
-	LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 
 	/*
 	 * If no serializable transactions are active, there shouldn't be anything
@@ -901,7 +903,7 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
 	SerialValue(slotno, xid) = minConflictCommitSeqNo;
 	SerialSlruCtl->shared->page_dirty[slotno] = true;
 
-	LWLockRelease(SerialSLRULock);
+	LWLockRelease(lock);
 }
 
 /*
@@ -919,10 +921,10 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid)
 
 	Assert(TransactionIdIsValid(xid));
 
-	LWLockAcquire(SerialSLRULock, LW_SHARED);
+	LWLockAcquire(SerialControlLock, LW_SHARED);
 	headXid = serialControl->headXid;
 	tailXid = serialControl->tailXid;
-	LWLockRelease(SerialSLRULock);
+	LWLockRelease(SerialControlLock);
 
 	if (!TransactionIdIsValid(headXid))
 		return 0;
@@ -934,13 +936,13 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid)
 		return 0;
 
 	/*
-	 * The following function must be called without holding SerialSLRULock,
+	 * The following function must be called without holding SLRU bank lock,
 	 * but will return with that lock held, which must then be released.
 	 */
 	slotno = SimpleLruReadPage_ReadOnly(SerialSlruCtl,
 										SerialPage(xid), xid);
 	val = SerialValue(slotno, xid);
-	LWLockRelease(SerialSLRULock);
+	LWLockRelease(SimpleLruPageGetSLRULock(SerialSlruCtl, SerialPage(xid)));
 	return val;
 }
 
@@ -953,7 +955,7 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid)
 static void
 SerialSetActiveSerXmin(TransactionId xid)
 {
-	LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(SerialControlLock, LW_EXCLUSIVE);
 
 	/*
 	 * When no sxacts are active, nothing overlaps, set the xid values to
@@ -965,7 +967,7 @@ SerialSetActiveSerXmin(TransactionId xid)
 	{
 		serialControl->tailXid = InvalidTransactionId;
 		serialControl->headXid = InvalidTransactionId;
-		LWLockRelease(SerialSLRULock);
+		LWLockRelease(SerialControlLock);
 		return;
 	}
 
@@ -983,7 +985,7 @@ SerialSetActiveSerXmin(TransactionId xid)
 		{
 			serialControl->tailXid = xid;
 		}
-		LWLockRelease(SerialSLRULock);
+		LWLockRelease(SerialControlLock);
 		return;
 	}
 
@@ -992,7 +994,7 @@ SerialSetActiveSerXmin(TransactionId xid)
 
 	serialControl->tailXid = xid;
 
-	LWLockRelease(SerialSLRULock);
+	LWLockRelease(SerialControlLock);
 }
 
 /*
@@ -1006,12 +1008,12 @@ CheckPointPredicate(void)
 {
 	int			tailPage;
 
-	LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(SerialControlLock, LW_EXCLUSIVE);
 
 	/* Exit quickly if the SLRU is currently not in use. */
 	if (serialControl->headPage < 0)
 	{
-		LWLockRelease(SerialSLRULock);
+		LWLockRelease(SerialControlLock);
 		return;
 	}
 
@@ -1055,7 +1057,7 @@ CheckPointPredicate(void)
 		serialControl->headPage = -1;
 	}
 
-	LWLockRelease(SerialSLRULock);
+	LWLockRelease(SerialControlLock);
 
 	/* Truncate away pages that are no longer required */
 	SimpleLruTruncate(SerialSlruCtl, tailPage);
diff --git a/src/include/access/slru.h b/src/include/access/slru.h
index f5f2b5b8b5..eec7a568dc 100644
--- a/src/include/access/slru.h
+++ b/src/include/access/slru.h
@@ -52,8 +52,6 @@ typedef enum
  */
 typedef struct SlruSharedData
 {
-	LWLock	   *ControlLock;
-
 	/* Number of buffers managed by this SLRU structure */
 	int			num_slots;
 
@@ -68,6 +66,13 @@ typedef struct SlruSharedData
 	int		   *page_lru_count;
 	LWLockPadded *buffer_locks;
 
+	/*
+	 * Lock to protect the buffer slot access in per SLRU bank.  The
+	 * buffer_locks protects the I/O on each buffer slots whereas this lock
+	 * protect the in memory operation on the buffer within one SLRU bank.
+	 */
+	LWLockPadded *bank_locks;
+
 	/*
 	 * Optional array of WAL flush LSNs associated with entries in the SLRU
 	 * pages.  If not zero/NULL, we must flush WAL before writing pages (true
@@ -95,7 +100,7 @@ typedef struct SlruSharedData
 	 * this is not critical data, since we use it only to avoid swapping out
 	 * the latest page.
 	 */
-	int			latest_page_number;
+	pg_atomic_uint32	latest_page_number;
 
 	/* SLRU's index for statistics purposes (might not be unique) */
 	int			slru_stats_idx;
@@ -143,11 +148,25 @@ typedef struct SlruCtlData
 
 typedef SlruCtlData *SlruCtl;
 
+/*
+ * Get the SLRU bank lock for given SlruCtl and the pageno.
+ *
+ * This lock needs to be acquire in order to access the slru buffer slots in
+ * the respective bank.  For more details refer comments in SlruSharedData.
+ */
+static inline LWLock *
+SimpleLruPageGetSLRULock(SlruCtl ctl, int pageno)
+{
+	int			bankno = (pageno & ctl->bank_mask);
+
+	/* Try to find the page while holding only shared lock */
+	return &(ctl->shared->bank_locks[bankno].lock);
+}
 
 extern Size SimpleLruShmemSize(int nslots, int nlsns);
 extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
-						  LWLock *ctllock, const char *subdir, int tranche_id,
-						  SyncRequestHandler sync_handler);
+						  const char *subdir, int tranche_id,
+						  int slru_tranche_id, SyncRequestHandler sync_handler);
 extern int	SimpleLruZeroPage(SlruCtl ctl, int pageno);
 extern int	SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok,
 							  TransactionId xid);
@@ -175,5 +194,7 @@ extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename,
 										int segpage, void *data);
 extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage,
 								   void *data);
-
+extern LWLock *SimpleLruPageGetSLRULock(SlruCtl ctl, int pageno);
+extern void SimpleLruAcquireAllBankLock(SlruCtl ctl, LWLockMode mode);
+extern void SimpleLruReleaseAllBankLock(SlruCtl ctl);
 #endif							/* SLRU_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index d77410bdea..09d2efe8ca 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -207,6 +207,14 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_PGSTATS_DATA,
 	LWTRANCHE_LAUNCHER_DSA,
 	LWTRANCHE_LAUNCHER_HASH,
+	LWTRANCHE_XACT_SLRU,
+	LWTRANCHE_COMMITTS_SLRU,
+	LWTRANCHE_SUBTRANS_SLRU,
+	LWTRANCHE_MULTIXACTOFFSET_SLRU,
+	LWTRANCHE_MULTIXACTMEMBER_SLRU,
+	LWTRANCHE_NOTIFY_SLRU,
+	LWTRANCHE_SERIAL_SLRU,
+
 	LWTRANCHE_FIRST_USER_DEFINED
 }			BuiltinTrancheIds;
 
diff --git a/src/test/modules/test_slru/test_slru.c b/src/test/modules/test_slru/test_slru.c
index ae21444c47..7b2eb4ae50 100644
--- a/src/test/modules/test_slru/test_slru.c
+++ b/src/test/modules/test_slru/test_slru.c
@@ -63,9 +63,9 @@ test_slru_page_write(PG_FUNCTION_ARGS)
 	int			pageno = PG_GETARG_INT32(0);
 	char	   *data = text_to_cstring(PG_GETARG_TEXT_PP(1));
 	int			slotno;
+	LWLock	   *lock = SimpleLruPageGetSLRULock(TestSlruCtl, pageno);
 
-	LWLockAcquire(TestSLRULock, LW_EXCLUSIVE);
-
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 	slotno = SimpleLruZeroPage(TestSlruCtl, pageno);
 
 	/* these should match */
@@ -80,7 +80,7 @@ test_slru_page_write(PG_FUNCTION_ARGS)
 			BLCKSZ - 1);
 
 	SimpleLruWritePage(TestSlruCtl, slotno);
-	LWLockRelease(TestSLRULock);
+	LWLockRelease(lock);
 
 	PG_RETURN_VOID();
 }
@@ -99,13 +99,14 @@ test_slru_page_read(PG_FUNCTION_ARGS)
 	bool		write_ok = PG_GETARG_BOOL(1);
 	char	   *data = NULL;
 	int			slotno;
+	LWLock	   *lock = SimpleLruPageGetSLRULock(TestSlruCtl, pageno);
 
 	/* find page in buffers, reading it if necessary */
-	LWLockAcquire(TestSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 	slotno = SimpleLruReadPage(TestSlruCtl, pageno,
 							   write_ok, InvalidTransactionId);
 	data = (char *) TestSlruCtl->shared->page_buffer[slotno];
-	LWLockRelease(TestSLRULock);
+	LWLockRelease(lock);
 
 	PG_RETURN_TEXT_P(cstring_to_text(data));
 }
@@ -116,14 +117,15 @@ test_slru_page_readonly(PG_FUNCTION_ARGS)
 	int			pageno = PG_GETARG_INT32(0);
 	char	   *data = NULL;
 	int			slotno;
+	LWLock	   *lock = SimpleLruPageGetSLRULock(TestSlruCtl, pageno);
 
 	/* find page in buffers, reading it if necessary */
 	slotno = SimpleLruReadPage_ReadOnly(TestSlruCtl,
 										pageno,
 										InvalidTransactionId);
-	Assert(LWLockHeldByMe(TestSLRULock));
+	Assert(LWLockHeldByMe(lock));
 	data = (char *) TestSlruCtl->shared->page_buffer[slotno];
-	LWLockRelease(TestSLRULock);
+	LWLockRelease(lock);
 
 	PG_RETURN_TEXT_P(cstring_to_text(data));
 }
@@ -133,10 +135,11 @@ test_slru_page_exists(PG_FUNCTION_ARGS)
 {
 	int			pageno = PG_GETARG_INT32(0);
 	bool		found;
+	LWLock	   *lock = SimpleLruPageGetSLRULock(TestSlruCtl, pageno);
 
-	LWLockAcquire(TestSLRULock, LW_EXCLUSIVE);
+	LWLockAcquire(lock, LW_EXCLUSIVE);
 	found = SimpleLruDoesPhysicalPageExist(TestSlruCtl, pageno);
-	LWLockRelease(TestSLRULock);
+	LWLockRelease(lock);
 
 	PG_RETURN_BOOL(found);
 }
@@ -215,6 +218,7 @@ test_slru_shmem_startup(void)
 {
 	const char	slru_dir_name[] = "pg_test_slru";
 	int			test_tranche_id;
+	int			test_buffer_tranche_id;
 
 	if (prev_shmem_startup_hook)
 		prev_shmem_startup_hook();
@@ -228,11 +232,13 @@ test_slru_shmem_startup(void)
 	/* initialize the SLRU facility */
 	test_tranche_id = LWLockNewTrancheId();
 	LWLockRegisterTranche(test_tranche_id, "test_slru_tranche");
-	LWLockInitialize(TestSLRULock, test_tranche_id);
+
+	test_buffer_tranche_id = LWLockNewTrancheId();
+	LWLockRegisterTranche(test_tranche_id, "test_buffer_tranche");
 
 	TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically;
 	SimpleLruInit(TestSlruCtl, "TestSLRU",
-				  NUM_TEST_BUFFERS, 0, TestSLRULock, slru_dir_name,
+				  NUM_TEST_BUFFERS, 0, slru_dir_name, test_buffer_tranche_id,
 				  test_tranche_id, SYNC_HANDLER_NONE);
 }
 
-- 
2.39.2 (Apple Git-143)

