On Thu, Dec 24, 2015 at 8:26 AM, Michael Paquier <michael.paqu...@gmail.com>
wrote:

> On Sun, Dec 13, 2015 at 11:05 PM, Amit Kapila <amit.kapil...@gmail.com>
> wrote:
> > On Fri, Dec 11, 2015 at 6:34 PM, Andres Freund <and...@anarazel.de>
> wrote:
>

I was looking into this patch, overall patch looks good to me, I want to
know what is the current state of this patch, is there some pending task in
this patch ?

Patch was not applying on head so i have re based it and re based version is
attached in the mail.

I have done some performance testing also..

Summary:
---------------
1. In my test for readonly workload i have observed some improvement for
scale factor 1000.
2. I have also observed some regression with scale factor 300 (I can't say
it's actual regression  or just run to run variance), I thought that may be
problem with lower scale factor so tested with scale factor 100 but with
s.f. 100 looks fine.

Machine Detail:
cpu   : POWER8
cores: 24 (192 with HT)

Non Default Parameter:
------------------------
Shared Buffer= 30GB
max_wal_size= 10GB
max_connections=500

Test1:
pgbench -i -s 1000 postgres
pgbench -c$ -j$ -Mprepared -S postgres

Client             Base                      Pached

1                       19753                   19493
32                   344059                 336773
64                   495708                 540425
128                 564358                 685212
256                 466562                 639059

Test2:
pgbench -i -s 300 postgres
pgbench -c$ -j$ -Mprepared -S postgres

Client             Base                      Pached

1                      20555                    19404
32                  375919                  332670
64                  509067                  440680
128                431346                  415121
256                380926                  379176

Test3:
pgbench -i -s 100 postgres
pgbench -c$ -j$ -Mprepared -S postgres

Client             Base                      Pached

1                      20555                    19404
32                  375919                  332670
64                  509067                  440680
128                431346                  415121
256                380926                  379176

-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c
index b423aa7..04862d7 100644
--- a/src/backend/storage/buffer/buf_init.c
+++ b/src/backend/storage/buffer/buf_init.c
@@ -121,12 +121,9 @@ InitBufferPool(void)
 			BufferDesc *buf = GetBufferDescriptor(i);
 
 			CLEAR_BUFFERTAG(buf->tag);
-			buf->flags = 0;
-			buf->usage_count = 0;
-			buf->refcount = 0;
-			buf->wait_backend_pid = 0;
 
-			SpinLockInit(&buf->buf_hdr_lock);
+			pg_atomic_init_u32(&buf->state, 0);
+			buf->wait_backend_pid = 0;
 
 			buf->buf_id = i;
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7141eb8..f69faeb 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -51,7 +51,6 @@
 #include "utils/resowner_private.h"
 #include "utils/timestamp.h"
 
-
 /* Note: these two macros only work on shared buffers, not local ones! */
 #define BufHdrGetBlock(bufHdr)	((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ))
 #define BufferGetLSN(bufHdr)	(PageGetLSN(BufHdrGetBlock(bufHdr)))
@@ -126,7 +125,7 @@ static BufferDesc *PinCountWaitBuf = NULL;
  * entry using ReservePrivateRefCountEntry() and then later, if necessary,
  * fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
  * memory allocations in NewPrivateRefCountEntry() which can be important
- * because in some scenarios it's called with a spinlock held...
+ * because in some scenarios it's called with a header lock held...
  */
 static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
 static HTAB *PrivateRefCountHash = NULL;
@@ -775,9 +774,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		 */
 		if (isLocalBuf)
 		{
-			/* Only need to adjust flags */
-			Assert(bufHdr->flags & BM_VALID);
-			bufHdr->flags &= ~BM_VALID;
+			Assert(pg_atomic_read_u32(&bufHdr->state) & BM_VALID);
+			pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_VALID);
 		}
 		else
 		{
@@ -789,8 +787,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 			do
 			{
 				LockBufHdr(bufHdr);
-				Assert(bufHdr->flags & BM_VALID);
-				bufHdr->flags &= ~BM_VALID;
+				Assert(pg_atomic_read_u32(&bufHdr->state) & BM_VALID);
+				pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_VALID);
 				UnlockBufHdr(bufHdr);
 			} while (!StartBufferIO(bufHdr, true));
 		}
@@ -808,7 +806,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	 * it's not been recycled) but come right back here to try smgrextend
 	 * again.
 	 */
-	Assert(!(bufHdr->flags & BM_VALID));		/* spinlock not needed */
+	Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* header lock not needed */
 
 	bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
 
@@ -886,7 +884,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	if (isLocalBuf)
 	{
 		/* Only need to adjust flags */
-		bufHdr->flags |= BM_VALID;
+		pg_atomic_fetch_or_u32(&bufHdr->state, BM_VALID);
 	}
 	else
 	{
@@ -940,7 +938,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	BufferTag	oldTag;			/* previous identity of selected buffer */
 	uint32		oldHash;		/* hash value for oldTag */
 	LWLock	   *oldPartitionLock;		/* buffer partition lock for it */
-	BufFlags	oldFlags;
+	uint32		oldFlags;
 	int			buf_id;
 	BufferDesc *buf;
 	bool		valid;
@@ -1002,24 +1000,26 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	/* Loop here in case we have to try another victim buffer */
 	for (;;)
 	{
+		uint32	state;
+
 		/*
-		 * Ensure, while the spinlock's not yet held, that there's a free
+		 * Ensure, while the header lock isn't yet held, that there's a free
 		 * refcount entry.
 		 */
 		ReservePrivateRefCountEntry();
 
 		/*
 		 * Select a victim buffer.  The buffer is returned with its header
-		 * spinlock still held!
+		 * lock still held!
 		 */
-		buf = StrategyGetBuffer(strategy);
+		buf = StrategyGetBuffer(strategy, &state);
 
-		Assert(buf->refcount == 0);
+		Assert(BUF_STATE_GET_REFCOUNT(state) == 0);
 
-		/* Must copy buffer flags while we still hold the spinlock */
-		oldFlags = buf->flags;
+		/* Must copy buffer flags while we still hold the header lock */
+		oldFlags = state & BUF_FLAG_MASK;
 
-		/* Pin the buffer and then release the buffer spinlock */
+		/* Pin the buffer and then release the buffer header lock */
 		PinBuffer_Locked(buf);
 
 		/*
@@ -1204,7 +1204,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		/*
 		 * Need to lock the buffer header too in order to change its tag.
 		 */
-		LockBufHdr(buf);
+		state = LockBufHdr(buf);
 
 		/*
 		 * Somebody could have pinned or re-dirtied the buffer while we were
@@ -1212,8 +1212,8 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 		 * recycle this buffer; we must undo everything we've done and start
 		 * over with a new victim buffer.
 		 */
-		oldFlags = buf->flags;
-		if (buf->refcount == 1 && !(oldFlags & BM_DIRTY))
+		oldFlags = state & BUF_FLAG_MASK;
+		if (BUF_STATE_GET_REFCOUNT(state) == 1 && !(oldFlags & BM_DIRTY))
 			break;
 
 		UnlockBufHdr(buf);
@@ -1234,12 +1234,19 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	 * 1 so that the buffer can survive one clock-sweep pass.)
 	 */
 	buf->tag = newTag;
-	buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT);
+	pg_atomic_fetch_and_u32(&buf->state,
+							~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
+							  BM_CHECKPOINT_NEEDED | BM_IO_ERROR |
+							  BM_PERMANENT |
+							  BUF_USAGECOUNT_MASK));
 	if (relpersistence == RELPERSISTENCE_PERMANENT)
-		buf->flags |= BM_TAG_VALID | BM_PERMANENT;
+		pg_atomic_fetch_or_u32(&buf->state,
+							   BM_TAG_VALID | BM_PERMANENT |
+							   BUF_USAGECOUNT_ONE);
 	else
-		buf->flags |= BM_TAG_VALID;
-	buf->usage_count = 1;
+		pg_atomic_fetch_or_u32(&buf->state,
+							   BM_TAG_VALID |
+							   BUF_USAGECOUNT_ONE);
 
 	UnlockBufHdr(buf);
 
@@ -1269,7 +1276,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
  * InvalidateBuffer -- mark a shared buffer invalid and return it to the
  * freelist.
  *
- * The buffer header spinlock must be held at entry.  We drop it before
+ * The buffer header lock must be held at entry.  We drop it before
  * returning.  (This is sane because the caller must have locked the
  * buffer in order to be sure it should be dropped.)
  *
@@ -1288,9 +1295,10 @@ InvalidateBuffer(BufferDesc *buf)
 	BufferTag	oldTag;
 	uint32		oldHash;		/* hash value for oldTag */
 	LWLock	   *oldPartitionLock;		/* buffer partition lock for it */
-	BufFlags	oldFlags;
+	uint32		oldFlags;
+	uint32		state;
 
-	/* Save the original buffer tag before dropping the spinlock */
+	/* Save the original buffer tag before dropping the header lock */
 	oldTag = buf->tag;
 
 	UnlockBufHdr(buf);
@@ -1312,7 +1320,7 @@ retry:
 	LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
 
 	/* Re-lock the buffer header */
-	LockBufHdr(buf);
+	state = LockBufHdr(buf);
 
 	/* If it's changed while we were waiting for lock, do nothing */
 	if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
@@ -1331,7 +1339,7 @@ retry:
 	 * yet done StartBufferIO, WaitIO will fall through and we'll effectively
 	 * be busy-looping here.)
 	 */
-	if (buf->refcount != 0)
+	if (BUF_STATE_GET_REFCOUNT(state) != 0)
 	{
 		UnlockBufHdr(buf);
 		LWLockRelease(oldPartitionLock);
@@ -1346,10 +1354,9 @@ retry:
 	 * Clear out the buffer's tag and flags.  We must do this to ensure that
 	 * linear scans of the buffer array don't think the buffer is valid.
 	 */
-	oldFlags = buf->flags;
+	oldFlags = state & BUF_FLAG_MASK;
 	CLEAR_BUFFERTAG(buf->tag);
-	buf->flags = 0;
-	buf->usage_count = 0;
+	pg_atomic_fetch_and_u32(&buf->state, BM_LOCKED | ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK));
 
 	UnlockBufHdr(buf);
 
@@ -1383,6 +1390,7 @@ void
 MarkBufferDirty(Buffer buffer)
 {
 	BufferDesc *bufHdr;
+	uint32		state;
 
 	if (!BufferIsValid(buffer))
 		elog(ERROR, "bad buffer ID: %d", buffer);
@@ -1399,14 +1407,14 @@ MarkBufferDirty(Buffer buffer)
 	/* unfortunately we can't check if the lock is held exclusively */
 	Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
 
-	LockBufHdr(bufHdr);
+	state = LockBufHdr(bufHdr);
 
-	Assert(bufHdr->refcount > 0);
+	Assert(BUF_STATE_GET_REFCOUNT(state) > 0);
 
 	/*
 	 * If the buffer was not dirty already, do vacuum accounting.
 	 */
-	if (!(bufHdr->flags & BM_DIRTY))
+	if (!(state & BM_DIRTY))
 	{
 		VacuumPageDirty++;
 		pgBufferUsage.shared_blks_dirtied++;
@@ -1414,7 +1422,7 @@ MarkBufferDirty(Buffer buffer)
 			VacuumCostBalance += VacuumCostPageDirty;
 	}
 
-	bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+	pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY | BM_JUST_DIRTIED);
 
 	UnlockBufHdr(bufHdr);
 }
@@ -1456,7 +1464,7 @@ ReleaseAndReadBuffer(Buffer buffer,
 		else
 		{
 			bufHdr = GetBufferDescriptor(buffer - 1);
-			/* we have pin, so it's ok to examine tag without spinlock */
+			/* we have pin, so it's ok to examine tag without header lock */
 			if (bufHdr->tag.blockNum == blockNum &&
 				RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node) &&
 				bufHdr->tag.forkNum == forkNum)
@@ -1484,7 +1492,7 @@ ReleaseAndReadBuffer(Buffer buffer,
  * Note that ResourceOwnerEnlargeBuffers must have been done already.
  *
  * Returns TRUE if buffer is BM_VALID, else FALSE.  This provision allows
- * some callers to avoid an extra spinlock cycle.
+ * some callers to avoid an extra header lock cycle.
  */
 static bool
 PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
@@ -1497,23 +1505,40 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 
 	if (ref == NULL)
 	{
+		uint32 state;
+		uint32 oldstate;
+
 		ReservePrivateRefCountEntry();
 		ref = NewPrivateRefCountEntry(b);
 
-		LockBufHdr(buf);
-		buf->refcount++;
-		if (strategy == NULL)
-		{
-			if (buf->usage_count < BM_MAX_USAGE_COUNT)
-				buf->usage_count++;
-		}
-		else
+
+		state = pg_atomic_read_u32(&buf->state);
+		oldstate = state;
+
+		while (true)
 		{
-			if (buf->usage_count == 0)
-				buf->usage_count = 1;
+			/* spin-wait till lock is free */
+			while (state & BM_LOCKED)
+			{
+				pg_spin_delay();
+				state = pg_atomic_read_u32(&buf->state);
+				oldstate = state;
+			}
+
+			/* increase refcount */
+			state += BUF_REFCOUNT_ONE;
+
+			/* increase usagecount unless already max */
+			if (BUF_STATE_GET_USAGECOUNT(state) != BM_MAX_USAGE_COUNT)
+				state += BUF_USAGECOUNT_ONE;
+
+			if (pg_atomic_compare_exchange_u32(&buf->state, &oldstate, state))
+				break;
+
+			/* get ready for next loop, oldstate has been updated by cas */
+			state = oldstate;
 		}
-		result = (buf->flags & BM_VALID) != 0;
-		UnlockBufHdr(buf);
+		result = (state & BM_VALID) != 0;
 	}
 	else
 	{
@@ -1529,9 +1554,9 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
 
 /*
  * PinBuffer_Locked -- as above, but caller already locked the buffer header.
- * The spinlock is released before return.
+ * The header lock is released before return.
  *
- * As this function is called with the spinlock held, the caller has to
+ * As this function is called with the header lock held, the caller has to
  * previously call ReservePrivateRefCountEntry().
  *
  * Currently, no callers of this function want to modify the buffer's
@@ -1542,7 +1567,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
  * Also all callers only ever use this function when it's known that the
  * buffer can't have a preexisting pin by this backend. That allows us to skip
  * searching the private refcount array & hash, which is a boon, because the
- * spinlock is still held.
+ * header lock is still held.
  *
  * Note: use of this routine is frequently mandatory, not just an optimization
  * to save a spin lock/unlock cycle, because we need to pin a buffer before
@@ -1556,11 +1581,11 @@ PinBuffer_Locked(BufferDesc *buf)
 
 	/*
 	 * As explained, We don't expect any preexisting pins. That allows us to
-	 * manipulate the PrivateRefCount after releasing the spinlock
+	 * manipulate the PrivateRefCount after releasing the header lock
 	 */
 	Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL);
 
-	buf->refcount++;
+	pg_atomic_fetch_add_u32(&buf->state, 1);
 	UnlockBufHdr(buf);
 
 	b = BufferDescriptorGetBuffer(buf);
@@ -1596,29 +1621,39 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
 	ref->refcount--;
 	if (ref->refcount == 0)
 	{
+		uint32 state;
+
 		/* I'd better not still hold any locks on the buffer */
 		Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
 		Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
 
-		LockBufHdr(buf);
-
-		/* Decrement the shared reference count */
-		Assert(buf->refcount > 0);
-		buf->refcount--;
+		/* Decrement the shared reference count.
+		 *
+		 * Arguably it'd be more robust if we checked for BM_LOCKED here, but
+		 * currently all manipulation of ->state for shared buffers is through
+		 * atomics.
+		 */
+		state = pg_atomic_fetch_sub_u32(&buf->state, BUF_REFCOUNT_ONE);
+		Assert(BUF_STATE_GET_REFCOUNT(state) > 0);
 
 		/* Support LockBufferForCleanup() */
-		if ((buf->flags & BM_PIN_COUNT_WAITER) &&
-			buf->refcount == 1)
+		if (state & BM_PIN_COUNT_WAITER)
 		{
-			/* we just released the last pin other than the waiter's */
-			int			wait_backend_pid = buf->wait_backend_pid;
+			state = LockBufHdr(buf);
 
-			buf->flags &= ~BM_PIN_COUNT_WAITER;
-			UnlockBufHdr(buf);
-			ProcSendSignal(wait_backend_pid);
+			if ((state & BM_PIN_COUNT_WAITER) && BUF_STATE_GET_REFCOUNT(state) == 1)
+			{
+				/* we just released the last pin other than the waiter's */
+				int         wait_backend_pid = buf->wait_backend_pid;
+
+				pg_atomic_fetch_and_u32(&buf->state,
+										~BM_PIN_COUNT_WAITER);
+				UnlockBufHdr(buf);
+				ProcSendSignal(wait_backend_pid);
+			}
+			else
+				UnlockBufHdr(buf);
 		}
-		else
-			UnlockBufHdr(buf);
 
 		ForgetPrivateRefCountEntry(ref);
 	}
@@ -1637,6 +1672,7 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
 static void
 BufferSync(int flags)
 {
+	uint32		state;
 	int			buf_id;
 	int			num_to_scan;
 	int			num_to_write;
@@ -1677,14 +1713,15 @@ BufferSync(int flags)
 		BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
 
 		/*
-		 * Header spinlock is enough to examine BM_DIRTY, see comment in
+		 * Header lock is enough to examine BM_DIRTY, see comment in
 		 * SyncOneBuffer.
 		 */
-		LockBufHdr(bufHdr);
+		state = LockBufHdr(bufHdr);
 
-		if ((bufHdr->flags & mask) == mask)
+		if ((state & mask) == mask)
 		{
-			bufHdr->flags |= BM_CHECKPOINT_NEEDED;
+			pg_atomic_fetch_or_u32(&bufHdr->state,
+								   BM_CHECKPOINT_NEEDED);
 			num_to_write++;
 		}
 
@@ -1723,7 +1760,7 @@ BufferSync(int flags)
 		 * write the buffer though we didn't need to.  It doesn't seem worth
 		 * guarding against this, though.
 		 */
-		if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
+		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 		{
 			if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN)
 			{
@@ -2083,6 +2120,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
 {
 	BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
 	int			result = 0;
+	uint32		state;
 
 	ReservePrivateRefCountEntry();
 
@@ -2095,10 +2133,13 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
 	 * don't worry because our checkpoint.redo points before log record for
 	 * upcoming changes and so we are not required to write such dirty buffer.
 	 */
-	LockBufHdr(bufHdr);
+	state = LockBufHdr(bufHdr);
 
-	if (bufHdr->refcount == 0 && bufHdr->usage_count == 0)
+	if (BUF_STATE_GET_REFCOUNT(state) == 0 &&
+		BUF_STATE_GET_USAGECOUNT(state) == 0)
+	{
 		result |= BUF_REUSABLE;
+	}
 	else if (skip_recently_used)
 	{
 		/* Caller told us not to write recently-used buffers */
@@ -2106,7 +2147,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
 		return result;
 	}
 
-	if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY))
+	if (!(state & BM_VALID) || !(state & BM_DIRTY))
 	{
 		/* It's clean, so nothing to do */
 		UnlockBufHdr(bufHdr);
@@ -2258,6 +2299,7 @@ PrintBufferLeakWarning(Buffer buffer)
 	int32		loccount;
 	char	   *path;
 	BackendId	backend;
+	uint32		state;
 
 	Assert(BufferIsValid(buffer));
 	if (BufferIsLocal(buffer))
@@ -2275,12 +2317,13 @@ PrintBufferLeakWarning(Buffer buffer)
 
 	/* theoretically we should lock the bufhdr here */
 	path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
+	state = pg_atomic_read_u32(&buf->state);
 	elog(WARNING,
 		 "buffer refcount leak: [%03d] "
 		 "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
 		 buffer, path,
-		 buf->tag.blockNum, buf->flags,
-		 buf->refcount, loccount);
+		 buf->tag.blockNum, state & BUF_FLAG_MASK,
+		 BUF_STATE_GET_REFCOUNT(state), loccount);
 	pfree(path);
 }
 
@@ -2335,7 +2378,7 @@ BufferGetBlockNumber(Buffer buffer)
 	else
 		bufHdr = GetBufferDescriptor(buffer - 1);
 
-	/* pinned, so OK to read tag without spinlock */
+	/* pinned, so OK to read tag without lock */
 	return bufHdr->tag.blockNum;
 }
 
@@ -2358,7 +2401,7 @@ BufferGetTag(Buffer buffer, RelFileNode *rnode, ForkNumber *forknum,
 	else
 		bufHdr = GetBufferDescriptor(buffer - 1);
 
-	/* pinned, so OK to read tag without spinlock */
+	/* pinned, so OK to read tag without lock */
 	*rnode = bufHdr->tag.rnode;
 	*forknum = bufHdr->tag.forkNum;
 	*blknum = bufHdr->tag.blockNum;
@@ -2426,7 +2469,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 	recptr = BufferGetLSN(buf);
 
 	/* To check if block content changes while flushing. - vadim 01/17/97 */
-	buf->flags &= ~BM_JUST_DIRTIED;
+	pg_atomic_fetch_and_u32(&buf->state, ~BM_JUST_DIRTIED);
 	UnlockBufHdr(buf);
 
 	/*
@@ -2446,7 +2489,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
 	 * disastrous system-wide consequences.  To make sure that can't happen,
 	 * skip the flush if the buffer isn't permanent.
 	 */
-	if (buf->flags & BM_PERMANENT)
+	if (pg_atomic_read_u32(&buf->state) & BM_PERMANENT)
 		XLogFlush(recptr);
 
 	/*
@@ -2534,13 +2577,13 @@ BufferIsPermanent(Buffer buffer)
 
 	/*
 	 * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
-	 * need not bother with the buffer header spinlock.  Even if someone else
+	 * need not bother with the buffer header lock.  Even if someone else
 	 * changes the buffer header flags while we're doing this, we assume that
 	 * changing an aligned 2-byte BufFlags value is atomic, so we'll read the
 	 * old value or the new value, but not random garbage.
 	 */
 	bufHdr = GetBufferDescriptor(buffer - 1);
-	return (bufHdr->flags & BM_PERMANENT) != 0;
+	return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0;
 }
 
 /*
@@ -2640,7 +2683,7 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
 		if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
 			bufHdr->tag.forkNum == forkNum &&
 			bufHdr->tag.blockNum >= firstDelBlock)
-			InvalidateBuffer(bufHdr);	/* releases spinlock */
+			InvalidateBuffer(bufHdr);	/* releases lock */
 		else
 			UnlockBufHdr(bufHdr);
 	}
@@ -2738,7 +2781,7 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
 
 		LockBufHdr(bufHdr);
 		if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
-			InvalidateBuffer(bufHdr);	/* releases spinlock */
+			InvalidateBuffer(bufHdr);	/* releases lock */
 		else
 			UnlockBufHdr(bufHdr);
 	}
@@ -2780,7 +2823,7 @@ DropDatabaseBuffers(Oid dbid)
 
 		LockBufHdr(bufHdr);
 		if (bufHdr->tag.rnode.dbNode == dbid)
-			InvalidateBuffer(bufHdr);	/* releases spinlock */
+			InvalidateBuffer(bufHdr);	/* releases lock */
 		else
 			UnlockBufHdr(bufHdr);
 	}
@@ -2876,7 +2919,8 @@ FlushRelationBuffers(Relation rel)
 		{
 			bufHdr = GetLocalBufferDescriptor(i);
 			if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
-				(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+				(pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY))
+				== (BM_VALID | BM_DIRTY))
 			{
 				ErrorContextCallback errcallback;
 				Page		localpage;
@@ -2897,7 +2941,7 @@ FlushRelationBuffers(Relation rel)
 						  localpage,
 						  false);
 
-				bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+				pg_atomic_fetch_and_u32(&bufHdr->state, ~(BM_DIRTY | BM_JUST_DIRTIED));
 
 				/* Pop the error context stack */
 				error_context_stack = errcallback.previous;
@@ -2925,7 +2969,8 @@ FlushRelationBuffers(Relation rel)
 
 		LockBufHdr(bufHdr);
 		if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
-			(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+			(pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY))
+			== (BM_VALID | BM_DIRTY))
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
@@ -2977,7 +3022,8 @@ FlushDatabaseBuffers(Oid dbid)
 
 		LockBufHdr(bufHdr);
 		if (bufHdr->tag.rnode.dbNode == dbid &&
-			(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
+			(pg_atomic_read_u32(&bufHdr->state) & (BM_VALID | BM_DIRTY))
+			== (BM_VALID | BM_DIRTY))
 		{
 			PinBuffer_Locked(bufHdr);
 			LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
@@ -3109,19 +3155,20 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	 * This routine might get called many times on the same page, if we are
 	 * making the first scan after commit of an xact that added/deleted many
 	 * tuples. So, be as quick as we can if the buffer is already dirty.  We
-	 * do this by not acquiring spinlock if it looks like the status bits are
+	 * do this by not acquiring header lock if it looks like the status bits are
 	 * already set.  Since we make this test unlocked, there's a chance we
 	 * might fail to notice that the flags have just been cleared, and failed
 	 * to reset them, due to memory-ordering issues.  But since this function
 	 * is only intended to be used in cases where failing to write out the
 	 * data would be harmless anyway, it doesn't really matter.
 	 */
-	if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+	if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
 		(BM_DIRTY | BM_JUST_DIRTIED))
 	{
 		XLogRecPtr	lsn = InvalidXLogRecPtr;
 		bool		dirtied = false;
 		bool		delayChkpt = false;
+		uint32		state;
 
 		/*
 		 * If we need to protect hint bit updates from torn writes, WAL-log a
@@ -3132,7 +3179,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 		 * We don't check full_page_writes here because that logic is included
 		 * when we call XLogInsert() since the value changes dynamically.
 		 */
-		if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT))
+		if (XLogHintBitIsNeeded() && (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
 		{
 			/*
 			 * If we're in recovery we cannot dirty a page because of a hint.
@@ -3172,8 +3219,12 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 		}
 
 		LockBufHdr(bufHdr);
-		Assert(bufHdr->refcount > 0);
-		if (!(bufHdr->flags & BM_DIRTY))
+
+		state = pg_atomic_read_u32(&bufHdr->state);
+
+		Assert(BUF_STATE_GET_REFCOUNT(state) > 0);
+
+		if (!(state & BM_DIRTY))
 		{
 			dirtied = true;		/* Means "will be dirtied by this action" */
 
@@ -3193,7 +3244,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 			if (!XLogRecPtrIsInvalid(lsn))
 				PageSetLSN(page, lsn);
 		}
-		bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+
+		pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY | BM_JUST_DIRTIED);
+
 		UnlockBufHdr(bufHdr);
 
 		if (delayChkpt)
@@ -3231,9 +3284,9 @@ UnlockBuffers(void)
 		 * Don't complain if flag bit not set; it could have been reset but we
 		 * got a cancel/die interrupt before getting the signal.
 		 */
-		if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 &&
+		if ((pg_atomic_read_u32(&buf->state) & BM_PIN_COUNT_WAITER) != 0 &&
 			buf->wait_backend_pid == MyProcPid)
-			buf->flags &= ~BM_PIN_COUNT_WAITER;
+			pg_atomic_fetch_and_u32(&buf->state, ~BM_PIN_COUNT_WAITER);
 
 		UnlockBufHdr(buf);
 
@@ -3328,25 +3381,28 @@ LockBufferForCleanup(Buffer buffer)
 
 	for (;;)
 	{
+		int		state;
+
 		/* Try to acquire lock */
 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-		LockBufHdr(bufHdr);
-		Assert(bufHdr->refcount > 0);
-		if (bufHdr->refcount == 1)
+		state = LockBufHdr(bufHdr);
+
+		Assert(BUF_STATE_GET_REFCOUNT(state) > 0);
+		if (BUF_STATE_GET_REFCOUNT(state) == 1)
 		{
 			/* Successfully acquired exclusive lock with pincount 1 */
 			UnlockBufHdr(bufHdr);
 			return;
 		}
 		/* Failed, so mark myself as waiting for pincount 1 */
-		if (bufHdr->flags & BM_PIN_COUNT_WAITER)
+		if (state & BM_PIN_COUNT_WAITER)
 		{
 			UnlockBufHdr(bufHdr);
 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 			elog(ERROR, "multiple backends attempting to wait for pincount 1");
 		}
 		bufHdr->wait_backend_pid = MyProcPid;
-		bufHdr->flags |= BM_PIN_COUNT_WAITER;
+		pg_atomic_fetch_or_u32(&bufHdr->state, BM_PIN_COUNT_WAITER);
 		PinCountWaitBuf = bufHdr;
 		UnlockBufHdr(bufHdr);
 		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
@@ -3373,9 +3429,9 @@ LockBufferForCleanup(Buffer buffer)
 		 * better be safe.
 		 */
 		LockBufHdr(bufHdr);
-		if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 &&
+		if ((pg_atomic_read_u32(&bufHdr->state) & BM_PIN_COUNT_WAITER) != 0 &&
 			bufHdr->wait_backend_pid == MyProcPid)
-			bufHdr->flags &= ~BM_PIN_COUNT_WAITER;
+			pg_atomic_fetch_and_u32(&bufHdr->state, ~BM_PIN_COUNT_WAITER);
 		UnlockBufHdr(bufHdr);
 
 		PinCountWaitBuf = NULL;
@@ -3417,22 +3473,26 @@ bool
 ConditionalLockBufferForCleanup(Buffer buffer)
 {
 	BufferDesc *bufHdr;
+	uint32		state,
+				refcount;
 
 	Assert(BufferIsValid(buffer));
 
 	if (BufferIsLocal(buffer))
 	{
+		refcount = LocalRefCount[-buffer - 1];
 		/* There should be exactly one pin */
-		Assert(LocalRefCount[-buffer - 1] > 0);
-		if (LocalRefCount[-buffer - 1] != 1)
+		Assert(refcount > 0);
+		if (refcount != 1)
 			return false;
 		/* Nobody else to wait for */
 		return true;
 	}
 
 	/* There should be exactly one local pin */
-	Assert(GetPrivateRefCount(buffer) > 0);
-	if (GetPrivateRefCount(buffer) != 1)
+	refcount = GetPrivateRefCount(buffer);
+	Assert(refcount);
+	if (refcount != 1)
 		return false;
 
 	/* Try to acquire lock */
@@ -3440,9 +3500,11 @@ ConditionalLockBufferForCleanup(Buffer buffer)
 		return false;
 
 	bufHdr = GetBufferDescriptor(buffer - 1);
-	LockBufHdr(bufHdr);
-	Assert(bufHdr->refcount > 0);
-	if (bufHdr->refcount == 1)
+	state = LockBufHdr(bufHdr);
+	refcount = BUF_STATE_GET_REFCOUNT(state);
+
+	Assert(refcount > 0);
+	if (refcount == 1)
 	{
 		/* Successfully acquired exclusive lock with pincount 1 */
 		UnlockBufHdr(bufHdr);
@@ -3480,17 +3542,17 @@ WaitIO(BufferDesc *buf)
 	 */
 	for (;;)
 	{
-		BufFlags	sv_flags;
+		uint32		state;
 
 		/*
-		 * It may not be necessary to acquire the spinlock to check the flag
+		 * It may not be necessary to acquire the header lock to check the flag
 		 * here, but since this test is essential for correctness, we'd better
 		 * play it safe.
 		 */
-		LockBufHdr(buf);
-		sv_flags = buf->flags;
+		state = LockBufHdr(buf);
 		UnlockBufHdr(buf);
-		if (!(sv_flags & BM_IO_IN_PROGRESS))
+
+		if (!(state & BM_IO_IN_PROGRESS))
 			break;
 		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
 		LWLockRelease(BufferDescriptorGetIOLock(buf));
@@ -3518,6 +3580,8 @@ WaitIO(BufferDesc *buf)
 static bool
 StartBufferIO(BufferDesc *buf, bool forInput)
 {
+	uint32		state;
+
 	Assert(!InProgressBuf);
 
 	for (;;)
@@ -3528,9 +3592,9 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 		 */
 		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
 
-		LockBufHdr(buf);
+		state = LockBufHdr(buf);
 
-		if (!(buf->flags & BM_IO_IN_PROGRESS))
+		if (!(state & BM_IO_IN_PROGRESS))
 			break;
 
 		/*
@@ -3546,7 +3610,7 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 
 	/* Once we get here, there is definitely no I/O active on this buffer */
 
-	if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY))
+	if (forInput ? (state & BM_VALID) : !(state & BM_DIRTY))
 	{
 		/* someone else already did the I/O */
 		UnlockBufHdr(buf);
@@ -3554,7 +3618,7 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 		return false;
 	}
 
-	buf->flags |= BM_IO_IN_PROGRESS;
+	pg_atomic_fetch_or_u32(&buf->state, BM_IO_IN_PROGRESS);
 
 	UnlockBufHdr(buf);
 
@@ -3584,15 +3648,19 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 static void
 TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits)
 {
+	uint32		state;
+
 	Assert(buf == InProgressBuf);
 
-	LockBufHdr(buf);
+	state = LockBufHdr(buf);
+
+	Assert(state & BM_IO_IN_PROGRESS);
 
-	Assert(buf->flags & BM_IO_IN_PROGRESS);
-	buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
-	if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED))
-		buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
-	buf->flags |= set_flag_bits;
+	pg_atomic_fetch_and_u32(&buf->state, ~(BM_IO_IN_PROGRESS | BM_IO_ERROR));
+	if (clear_dirty && !(pg_atomic_read_u32(&buf->state) & BM_JUST_DIRTIED))
+		pg_atomic_fetch_and_u32(&buf->state, ~(BM_DIRTY | BM_CHECKPOINT_NEEDED));
+
+	pg_atomic_fetch_or_u32(&buf->state, set_flag_bits);
 
 	UnlockBufHdr(buf);
 
@@ -3617,6 +3685,7 @@ AbortBufferIO(void)
 
 	if (buf)
 	{
+		uint32	state;
 		/*
 		 * Since LWLockReleaseAll has already been called, we're not holding
 		 * the buffer's io_in_progress_lock. We have to re-acquire it so that
@@ -3625,26 +3694,24 @@ AbortBufferIO(void)
 		 */
 		LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
 
-		LockBufHdr(buf);
-		Assert(buf->flags & BM_IO_IN_PROGRESS);
+		state = LockBufHdr(buf);
+		Assert(state & BM_IO_IN_PROGRESS);
 		if (IsForInput)
 		{
-			Assert(!(buf->flags & BM_DIRTY));
+			Assert(!(state & BM_DIRTY));
+
 			/* We'd better not think buffer is valid yet */
-			Assert(!(buf->flags & BM_VALID));
+			Assert(!(state & BM_VALID));
 			UnlockBufHdr(buf);
 		}
 		else
 		{
-			BufFlags	sv_flags;
-
-			sv_flags = buf->flags;
-			Assert(sv_flags & BM_DIRTY);
+			Assert(state & BM_DIRTY);
 			UnlockBufHdr(buf);
 			/* Issue notice if this is not the first failure... */
-			if (sv_flags & BM_IO_ERROR)
+			if (state & BM_IO_ERROR)
 			{
-				/* Buffer is pinned, so we can read tag without spinlock */
+				/* Buffer is pinned, so we can read tag without header lock */
 				char	   *path;
 
 				path = relpathperm(buf->tag.rnode, buf->tag.forkNum);
@@ -3668,7 +3735,7 @@ shared_buffer_write_error_callback(void *arg)
 {
 	BufferDesc *bufHdr = (BufferDesc *) arg;
 
-	/* Buffer is pinned, so we can read the tag without locking the spinlock */
+	/* Buffer is pinned, so we can read the tag without locking the header */
 	if (bufHdr != NULL)
 	{
 		char	   *path = relpathperm(bufHdr->tag.rnode, bufHdr->tag.forkNum);
@@ -3724,3 +3791,34 @@ rnode_comparator(const void *p1, const void *p2)
 	else
 		return 0;
 }
+
+uint32
+LockBufHdr(volatile BufferDesc *desc)
+{
+	uint32 state = pg_atomic_read_u32(&desc->state);
+
+	for (;;)
+	{
+		/* wait till lock is free */
+		while (state & BM_LOCKED)
+		{
+			pg_spin_delay();
+			state = pg_atomic_read_u32(&desc->state);
+
+			/* Add exponential backoff? Should seldomly be contended tho. */
+		}
+
+		/* and try to get lock */
+		if (pg_atomic_compare_exchange_u32(&desc->state, &state, state | BM_LOCKED))
+			break;
+	}
+	return state | BM_LOCKED;
+}
+
+void
+UnlockBufHdr(volatile BufferDesc *desc)
+{
+	Assert(pg_atomic_read_u32(&desc->state) & BM_LOCKED);
+
+	pg_atomic_sub_fetch_u32(&desc->state, BM_LOCKED);
+}
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 551d152..148955f 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -98,7 +98,8 @@ typedef struct BufferAccessStrategyData
 
 
 /* Prototypes for internal functions */
-static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy);
+static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy,
+											  uint32 *lockstate);
 static void AddBufferToRing(BufferAccessStrategy strategy,
 				BufferDesc *buf);
 
@@ -180,7 +181,7 @@ ClockSweepTick(void)
  *	return the buffer with the buffer header spinlock still held.
  */
 BufferDesc *
-StrategyGetBuffer(BufferAccessStrategy strategy)
+StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *lockstate)
 {
 	BufferDesc *buf;
 	int			bgwprocno;
@@ -192,7 +193,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
 	 */
 	if (strategy != NULL)
 	{
-		buf = GetBufferFromRing(strategy);
+		buf = GetBufferFromRing(strategy, lockstate);
 		if (buf != NULL)
 			return buf;
 	}
@@ -250,6 +251,8 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
 	{
 		while (true)
 		{
+			uint32	state;
+
 			/* Acquire the spinlock to remove element from the freelist */
 			SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
 
@@ -279,11 +282,13 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
 			 * it before we got to it.  It's probably impossible altogether as
 			 * of 8.3, but we'd better check anyway.)
 			 */
-			LockBufHdr(buf);
-			if (buf->refcount == 0 && buf->usage_count == 0)
+			state = LockBufHdr(buf);
+			if (BUF_STATE_GET_REFCOUNT(state) == 0
+				&& BUF_STATE_GET_USAGECOUNT(state) == 0)
 			{
 				if (strategy != NULL)
 					AddBufferToRing(strategy, buf);
+				*lockstate = state;
 				return buf;
 			}
 			UnlockBufHdr(buf);
@@ -295,6 +300,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
 	trycounter = NBuffers;
 	for (;;)
 	{
+		uint32	state;
 
 		buf = GetBufferDescriptor(ClockSweepTick());
 
@@ -302,12 +308,14 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
 		 * If the buffer is pinned or has a nonzero usage_count, we cannot use
 		 * it; decrement the usage_count (unless pinned) and keep scanning.
 		 */
-		LockBufHdr(buf);
-		if (buf->refcount == 0)
+		state = LockBufHdr(buf);
+
+		if (BUF_STATE_GET_REFCOUNT(state) == 0)
 		{
-			if (buf->usage_count > 0)
+			if (BUF_STATE_GET_USAGECOUNT(state) != 0)
 			{
-				buf->usage_count--;
+				pg_atomic_fetch_sub_u32(&buf->state, BUF_USAGECOUNT_ONE);
+
 				trycounter = NBuffers;
 			}
 			else
@@ -315,6 +323,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
 				/* Found a usable buffer */
 				if (strategy != NULL)
 					AddBufferToRing(strategy, buf);
+				*lockstate = state;
 				return buf;
 			}
 		}
@@ -585,10 +594,11 @@ FreeAccessStrategy(BufferAccessStrategy strategy)
  * The bufhdr spin lock is held on the returned buffer.
  */
 static BufferDesc *
-GetBufferFromRing(BufferAccessStrategy strategy)
+GetBufferFromRing(BufferAccessStrategy strategy, uint32 *lockstate)
 {
 	BufferDesc *buf;
 	Buffer		bufnum;
+	uint32		state;
 
 	/* Advance to next ring slot */
 	if (++strategy->current >= strategy->ring_size)
@@ -616,10 +626,12 @@ GetBufferFromRing(BufferAccessStrategy strategy)
 	 * shouldn't re-use it.
 	 */
 	buf = GetBufferDescriptor(bufnum - 1);
-	LockBufHdr(buf);
-	if (buf->refcount == 0 && buf->usage_count <= 1)
+	state = LockBufHdr(buf);
+	if (BUF_STATE_GET_REFCOUNT(state) == 0
+		&& BUF_STATE_GET_USAGECOUNT(state) <= 1)
 	{
 		strategy->current_was_in_ring = true;
+		*lockstate = state;
 		return buf;
 	}
 	UnlockBufHdr(buf);
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 17640cf..edc0ada 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -108,6 +108,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 	int			b;
 	int			trycounter;
 	bool		found;
+	uint32		state;
 
 	INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
 
@@ -128,16 +129,21 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 		fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
 				smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1);
 #endif
+		state = pg_atomic_read_u32(&bufHdr->state);
+
 		/* this part is equivalent to PinBuffer for a shared buffer */
 		if (LocalRefCount[b] == 0)
 		{
-			if (bufHdr->usage_count < BM_MAX_USAGE_COUNT)
-				bufHdr->usage_count++;
+			if (BUF_STATE_GET_USAGECOUNT(state) < BM_MAX_USAGE_COUNT)
+			{
+				state += BUF_USAGECOUNT_ONE;
+				pg_atomic_write_u32(&bufHdr->state, state);
+			}
 		}
 		LocalRefCount[b]++;
 		ResourceOwnerRememberBuffer(CurrentResourceOwner,
 									BufferDescriptorGetBuffer(bufHdr));
-		if (bufHdr->flags & BM_VALID)
+		if (state & BM_VALID)
 			*foundPtr = TRUE;
 		else
 		{
@@ -169,9 +175,12 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 
 		if (LocalRefCount[b] == 0)
 		{
-			if (bufHdr->usage_count > 0)
+			state = pg_atomic_read_u32(&bufHdr->state);
+
+			if (BUF_STATE_GET_USAGECOUNT(state) > 0)
 			{
-				bufHdr->usage_count--;
+				state -= BUF_USAGECOUNT_ONE;
+				pg_atomic_write_u32(&bufHdr->state, state);
 				trycounter = NLocBuffer;
 			}
 			else
@@ -193,7 +202,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 	 * this buffer is not referenced but it might still be dirty. if that's
 	 * the case, write it out before reusing it!
 	 */
-	if (bufHdr->flags & BM_DIRTY)
+	if (state & BM_DIRTY)
 	{
 		SMgrRelation oreln;
 		Page		localpage = (char *) LocalBufHdrGetBlock(bufHdr);
@@ -211,7 +220,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 				  false);
 
 		/* Mark not-dirty now in case we error out below */
-		bufHdr->flags &= ~BM_DIRTY;
+		state &= ~BM_DIRTY;
+		pg_atomic_write_u32(&bufHdr->state, state);
 
 		pgBufferUsage.local_blks_written++;
 	}
@@ -228,7 +238,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 	/*
 	 * Update the hash table: remove old entry, if any, and make new one.
 	 */
-	if (bufHdr->flags & BM_TAG_VALID)
+	if (state & BM_TAG_VALID)
 	{
 		hresult = (LocalBufferLookupEnt *)
 			hash_search(LocalBufHash, (void *) &bufHdr->tag,
@@ -237,7 +247,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 			elog(ERROR, "local buffer hash table corrupted");
 		/* mark buffer invalid just in case hash insert fails */
 		CLEAR_BUFFERTAG(bufHdr->tag);
-		bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID);
+		state &= ~(BM_VALID | BM_TAG_VALID);
+		pg_atomic_write_u32(&bufHdr->state, state);
 	}
 
 	hresult = (LocalBufferLookupEnt *)
@@ -250,9 +261,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 	 * it's all ours now.
 	 */
 	bufHdr->tag = newTag;
-	bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
-	bufHdr->flags |= BM_TAG_VALID;
-	bufHdr->usage_count = 1;
+	state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
+	state |= BM_TAG_VALID;
+	state &= ~BUF_USAGECOUNT_MASK;
+	state += BUF_USAGECOUNT_ONE;
+	pg_atomic_write_u32(&bufHdr->state, state);
 
 	*foundPtr = FALSE;
 	return bufHdr;
@@ -267,6 +280,7 @@ MarkLocalBufferDirty(Buffer buffer)
 {
 	int			bufid;
 	BufferDesc *bufHdr;
+	uint32		state;
 
 	Assert(BufferIsLocal(buffer));
 
@@ -280,10 +294,10 @@ MarkLocalBufferDirty(Buffer buffer)
 
 	bufHdr = GetLocalBufferDescriptor(bufid);
 
-	if (!(bufHdr->flags & BM_DIRTY))
-		pgBufferUsage.local_blks_dirtied++;
+	state = pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY);
 
-	bufHdr->flags |= BM_DIRTY;
+	if (!(state & BM_DIRTY))
+		pgBufferUsage.local_blks_dirtied++;
 }
 
 /*
@@ -307,8 +321,11 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
 		LocalBufferLookupEnt *hresult;
+		uint32		state;
 
-		if ((bufHdr->flags & BM_TAG_VALID) &&
+		state = pg_atomic_read_u32(&bufHdr->state);
+
+		if ((state & BM_TAG_VALID) &&
 			RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
 			bufHdr->tag.forkNum == forkNum &&
 			bufHdr->tag.blockNum >= firstDelBlock)
@@ -327,8 +344,9 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
 				elog(ERROR, "local buffer hash table corrupted");
 			/* Mark buffer invalid */
 			CLEAR_BUFFERTAG(bufHdr->tag);
-			bufHdr->flags = 0;
-			bufHdr->usage_count = 0;
+			state &= ~BUF_FLAG_MASK;
+			state &= ~BUF_USAGECOUNT_MASK;
+			pg_atomic_write_u32(&bufHdr->state, state);
 		}
 	}
 }
@@ -349,8 +367,11 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode)
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
 		LocalBufferLookupEnt *hresult;
+		uint32		state;
+
+		state = pg_atomic_read_u32(&bufHdr->state);
 
-		if ((bufHdr->flags & BM_TAG_VALID) &&
+		if ((state & BM_TAG_VALID) &&
 			RelFileNodeEquals(bufHdr->tag.rnode, rnode))
 		{
 			if (LocalRefCount[i] != 0)
@@ -367,8 +388,9 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode)
 				elog(ERROR, "local buffer hash table corrupted");
 			/* Mark buffer invalid */
 			CLEAR_BUFFERTAG(bufHdr->tag);
-			bufHdr->flags = 0;
-			bufHdr->usage_count = 0;
+			state &= ~BUF_FLAG_MASK;
+			state &= ~BUF_USAGECOUNT_MASK;
+			pg_atomic_write_u32(&bufHdr->state, state);
 		}
 	}
 }
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index cbc4843..e7d3f66 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -20,29 +20,43 @@
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
+#include "port/atomics.h"
 #include "storage/spin.h"
 #include "utils/relcache.h"
 
 
 /*
+ * State is:
+ * 10 bit flags
+ * 4 bit usage count
+ * 18 bit refcount
+ */
+#define BUF_REFCOUNT_ONE 1
+#define BUF_REFCOUNT_MASK ((1U << 18) - 1)
+#define BUF_STATE_GET_REFCOUNT(state) ((state) & BUF_REFCOUNT_MASK)
+#define BUF_USAGECOUNT_MASK 0x003C0000U
+#define BUF_USAGECOUNT_ONE (1U << 18)
+#define BUF_USAGECOUNT_SHIFT 18
+#define BUF_STATE_GET_USAGECOUNT(state) (((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT)
+#define BUF_FLAG_MASK 0xFFC00000U
+
+/*
  * Flags for buffer descriptors
  *
  * Note: TAG_VALID essentially means that there is a buffer hashtable
  * entry associated with the buffer's tag.
  */
-#define BM_DIRTY				(1 << 0)		/* data needs writing */
-#define BM_VALID				(1 << 1)		/* data is valid */
-#define BM_TAG_VALID			(1 << 2)		/* tag is assigned */
-#define BM_IO_IN_PROGRESS		(1 << 3)		/* read or write in progress */
-#define BM_IO_ERROR				(1 << 4)		/* previous I/O failed */
-#define BM_JUST_DIRTIED			(1 << 5)		/* dirtied since write started */
-#define BM_PIN_COUNT_WAITER		(1 << 6)		/* have waiter for sole pin */
-#define BM_CHECKPOINT_NEEDED	(1 << 7)		/* must write for checkpoint */
-#define BM_PERMANENT			(1 << 8)		/* permanent relation (not
+#define BM_LOCKED				(1U << 22)		/* buffer header is locked */
+#define BM_DIRTY				(1U << 23)		/* data needs writing */
+#define BM_VALID				(1U << 24)		/* data is valid */
+#define BM_TAG_VALID			(1U << 25)		/* tag is assigned */
+#define BM_IO_IN_PROGRESS		(1U << 26)		/* read or write in progress */
+#define BM_IO_ERROR				(1U << 27)		/* previous I/O failed */
+#define BM_JUST_DIRTIED			(1U << 28)		/* dirtied since write started */
+#define BM_PIN_COUNT_WAITER		(1U << 29)		/* have waiter for sole pin */
+#define BM_CHECKPOINT_NEEDED	(1U << 30)		/* must write for checkpoint */
+#define BM_PERMANENT			(1U << 31)		/* permanent relation (not
 												 * unlogged) */
-
-typedef bits16 BufFlags;
-
 /*
  * The maximum allowed value of usage_count represents a tradeoff between
  * accuracy and speed of the clock-sweep buffer management algorithm.  A
@@ -141,10 +155,9 @@ typedef struct buftag
 typedef struct BufferDesc
 {
 	BufferTag	tag;			/* ID of page contained in buffer */
-	BufFlags	flags;			/* see bit definitions above */
-	uint8		usage_count;	/* usage counter for clock sweep code */
-	slock_t		buf_hdr_lock;	/* protects a subset of fields, see above */
-	unsigned	refcount;		/* # of backends holding pins on buffer */
+
+	/* state of the tag, containing flags, refcount and usagecount */
+	pg_atomic_uint32 state;
 	int			wait_backend_pid;		/* backend PID of pin-count waiter */
 
 	int			buf_id;			/* buffer's index number (from 0) */
@@ -201,11 +214,11 @@ extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
 #define FREENEXT_NOT_IN_LIST	(-2)
 
 /*
- * Macros for acquiring/releasing a shared buffer header's spinlock.
- * Do not apply these to local buffers!
+ * Functions for acquiring/releasing a shared buffer header's spinlock.  Do
+ * not apply these to local buffers! FIXUP!
  */
-#define LockBufHdr(bufHdr)		SpinLockAcquire(&(bufHdr)->buf_hdr_lock)
-#define UnlockBufHdr(bufHdr)	SpinLockRelease(&(bufHdr)->buf_hdr_lock)
+extern uint32 LockBufHdr(volatile BufferDesc *desc);
+extern void UnlockBufHdr(volatile BufferDesc *desc);
 
 
 /* in buf_init.c */
@@ -220,7 +233,8 @@ extern BufferDesc *LocalBufferDescriptors;
  */
 
 /* freelist.c */
-extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy);
+extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
+											  uint32 *state);
 extern void StrategyFreeBuffer(BufferDesc *buf);
 extern bool StrategyRejectBuffer(BufferAccessStrategy strategy,
 					 BufferDesc *buf);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to